"""EDC 服务器主入口 — UDP + TCP 异步服务 UDP :5500 设备登录 (Count_Off)、心跳 (Heartbeat)、传感数据上报 (TSReport) UDP :5505 消息监听 (同上) TCP :5550 时间同步 (TimeStamp)、数据上报 (TSReport, SerialNet) 设备接入流程: 1. 设备 TCP 连接 → 发送 TimeStamp 请求 → EDC 返回时间 2. 设备 UDP 上报 Count_Off (设备详细信息) → EDC 注册设备,不回复 3. 设备持续通过 TCP/UDP 上报数据 """ import asyncio import json import logging import signal try: import uvloop # type: ignore except ImportError: uvloop = None from src.config import ( UDP_PORT, UDP_MSG_PORT, TCP_PORT, BIND_HOST, LOG_LEVEL, ) from src.models import init_pool, close_pool from src.protocol import parse_message, make_error_response from src.handlers import ( handle_count_off, handle_heartbeat, handle_timestamp, handle_tsreport, handle_serial_net, parse_loop, serialnet_loop, serialnet_response_loop, set_udp_sender, ) logging.basicConfig( level=getattr(logging, LOG_LEVEL), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("edc") # 全局 UDP transport,供 serialnet_loop 发送指令 _udp_transport: asyncio.DatagramTransport | None = None def send_udp(data: bytes, addr: tuple[str, int]): """通过全局 UDP transport 发送数据""" if _udp_transport: _udp_transport.sendto(data, addr) class EDCProtocol: """asyncio UDP 协议处理器""" def __init__(self): self.transport = None def connection_made(self, transport): self.transport = transport global _udp_transport _udp_transport = transport def datagram_received(self, data, addr): asyncio.ensure_future(self._handle(data, addr)) async def _handle(self, data: bytes, addr: tuple): msg = parse_message(data) if msg is None: return logger.info(f"UDP {msg} from {addr}") method = msg.get("Method", "") logger.debug(f"UDP {method} from {addr}") try: response = None if method == "Count_Off": # 设备登录上报,只处理不回复 await handle_count_off(msg, addr) elif method == "Heartbeat": response = await handle_heartbeat(msg) elif method == "TSReport": response = await handle_tsreport(msg) elif method == "SerialNet": response = await handle_serial_net(msg) if response and self.transport: self.transport.sendto(response.encode("utf-8"), addr) except Exception as e: logger.error(f"UDP 处理 {method} 异常: {e}", exc_info=True) async def handle_tcp_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): """TCP 客户端连接处理 使用缓冲 + JSON 解析方式处理 TCP 流,支持: - 换行分隔的 JSON (NDJSON) - 紧凑 JSON (无换行) """ addr = writer.get_extra_info("peername") logger.info(f"TCP 连接: {addr}") buffer = b"" async def process_message(msg: dict): """处理单条消息并返回响应文本""" logger.info(f"TCP get_rcv {msg} from {addr}") method = msg.get("Method", "") logger.debug(f"TCP {method} from {addr}") try: if method == "TimeStamp": return handle_timestamp(msg) elif method == "TSReport": return await handle_tsreport(msg) elif method == "SerialNet": return await handle_serial_net(msg) elif method == "Heartbeat": return await handle_heartbeat(msg) else: logger.debug(f"TCP 未知方法: {method}") return None except Exception as e: logger.error(f"TCP 处理 {method} 异常: {e}") return make_error_response(method, msg.get("Device_id", ""), str(e)) try: while True: chunk = await reader.read(4096) if not chunk: break buffer += chunk # 尝试从缓冲区提取完整的 JSON 消息 while buffer: # 策略 1: 先尝试换行分隔 nl_pos = buffer.find(b"\n") if nl_pos >= 0: line = buffer[:nl_pos].strip() buffer = buffer[nl_pos + 1:] if not line: continue msg = parse_message(line) if msg: resp = await process_message(msg) if resp: writer.write((resp + "\n").encode("utf-8")) await writer.drain() continue # 策略 2: 尝试直接解析整个缓冲区 try: decoded = buffer.decode("utf-8") msg, end = json.JSONDecoder().raw_decode(decoded) buffer = decoded[end:].encode("utf-8") resp = await process_message(msg) if resp: writer.write((resp + "\n").encode("utf-8")) await writer.drain() except json.JSONDecodeError: # 数据不完整,等待更多数据 if len(buffer) > 65536: logger.warning(f"TCP {addr} 缓冲区过大 ({len(buffer)}B),丢弃") buffer = b"" break except (ConnectionResetError, BrokenPipeError, asyncio.IncompleteReadError): pass finally: logger.info(f"TCP 断开: {addr}") writer.close() await writer.wait_closed() async def main(): logger.info("EDC 服务启动中...") await init_pool() asyncio.create_task(parse_loop()) asyncio.create_task(serialnet_loop()) asyncio.create_task(serialnet_response_loop()) loop = asyncio.get_running_loop() # UDP :5500 udp_transport, _ = await loop.create_datagram_endpoint( lambda: EDCProtocol(), # type: ignore[arg-type] local_addr=(BIND_HOST, UDP_PORT), ) set_udp_sender(send_udp) # 注入到 handlers 供 serialnet_loop 使用 logger.info(f"UDP 服务监听 {BIND_HOST}:{UDP_PORT}") # UDP :5505 udp_msg_transport, _ = await loop.create_datagram_endpoint( lambda: EDCProtocol(), # type: ignore[arg-type] local_addr=(BIND_HOST, UDP_MSG_PORT), ) logger.info(f"UDP 消息监听 {BIND_HOST}:{UDP_MSG_PORT}") # TCP :5550 tcp_server = await asyncio.start_server( handle_tcp_client, BIND_HOST, TCP_PORT, ) logger.info(f"TCP 服务监听 {BIND_HOST}:{TCP_PORT}") stop_event = asyncio.Event() def _shutdown(): logger.info("收到关闭信号,正在退出...") stop_event.set() loop.add_signal_handler(signal.SIGINT, _shutdown) loop.add_signal_handler(signal.SIGTERM, _shutdown) await stop_event.wait() tcp_server.close() await tcp_server.wait_closed() udp_transport.close() udp_msg_transport.close() await close_pool() logger.info("EDC 服务已停止") def run(): if uvloop: uvloop.install() logger.info("uvloop 已启用") asyncio.run(main()) if __name__ == "__main__": run()