From 8c5389670dd9ebe3af8d4bdc6ffeaf24c5b22385 Mon Sep 17 00:00:00 2001 From: wangfq Date: Wed, 27 May 2026 14:17:36 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E6=AD=A3=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E6=8E=A5=E5=85=A5=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Count_Off 改为读取 Data.Device_id (设备返回格式), 不再回复设备 - TimeStamp 改为同步函数 (无异步 IO) - TCP 改用 JSON 流解析 (raw_decode), 支持紧凑 JSON 和 NDJSON - TSReport/SerialNet 返回确认消息 --- src/handlers.py | 121 +++++++++++++++++++++++++++---------------- src/server.py | 133 +++++++++++++++++++++++++++++------------------- 2 files changed, 158 insertions(+), 96 deletions(-) diff --git a/src/handlers.py b/src/handlers.py index 19b17b5..27057d6 100644 --- a/src/handlers.py +++ b/src/handlers.py @@ -1,4 +1,11 @@ -"""业务逻辑处理 — 设备发现、数据采集、解析调度""" +"""业务逻辑处理 — 设备登录、数据采集、解析调度 + +流程 (参考 EDC服务.md): + 1. 设备 TCP 连接 → 上报 TimeStamp 请求 → EDC 返回时间同步 + 2. 设备 UDP 上报 Count_Off (设备信息) → EDC 处理注册,不回复 + 3. 设备通过 TCP/UDP 上报 TSReport/SerialNet → 存入采集表 + 4. 后台解析服务轮询采集表 → 解析 DG430 协议 → 写入 tb_state_tst +""" import asyncio import logging @@ -12,14 +19,13 @@ from src.models import ( fetch_unparsed, mark_parsed, insert_test_result, - get_dnt_by_serial, ) from src.dg430 import parse_b2_status, hex_str_to_bytes, decode_fault_info, decode_relay_info from src.protocol import ( make_timestamp_response, make_heartbeat_response, - make_count_off_success, make_error_response, + make_response, ) logger = logging.getLogger(__name__) @@ -31,29 +37,49 @@ _registry: dict[str, int] = {} _heartbeat: dict[str, float] = {} -async def handle_count_off(data: dict, addr: tuple) -> str | None: - """处理设备发现广播 +async def handle_count_off(data: dict, addr: tuple): + """处理设备登录/身份上报 (Count_Off 返回格式) - 设备上电后主动上报设备信息。EDC 据此注册/更新 dnt_info。 + 设备 TCP 同步时间后,通过 UDP 上报设备详细信息。 + 主机收到后处理注册流程,**不回复设备**。 + + Count_Off 设备返回格式 (来自 PGLC 协议): + { + "Method": "Count_Off", + "Code": 0, + "Message": "success", + "Data": { + "Ip": "192.168.1.188", + "Port": 5500, + "PortMsg": 5505, + "Mac": "", + "SubnetMask": "...", + "Gateway": "...", + "Server_Ip": "...", + "Iot_Host": "...", + "Iot_Port": 1883, + "Device_id": "1234", + "Device_num": "1234", + "Device_Type": "30", + "Version": "0.1.0", + ... + } + } """ - params = data.get("Params", {}) - device_id = params.get("Device_code") - if not device_id: - logger.warning("Count_Off 缺少 Device_code") - return None + dev = data.get("Data", {}) + serial = dev.get("Device_id") or dev.get("Device_num", "") + if not serial: + logger.warning(f"Count_Off 缺少 Device_id, 来自 {addr}") + return - device_data = data.get("Data", {}) - dev_ip = device_data.get("Ip", addr[0]) - dev_port = device_data.get("Port", 0) or addr[1] - dev_mac = device_data.get("Mac", "") - dev_subnet = device_data.get("SubnetMask", "") - dev_gateway = device_data.get("Gateway", "") - dev_msgport = device_data.get("PortMsg", 0) - dev_version = device_data.get("Version", "") - dev_type = device_data.get("Device_Type", "30") # 默认测试工装 - - # 设备唯一标识用 Device_id(在 Data 内),如果没有则用 Device_code - serial = device_data.get("Device_id") or device_id + dev_ip = dev.get("Ip", addr[0]) + dev_port = dev.get("Port", 0) or addr[1] + dev_mac = dev.get("Mac", "") + dev_subnet = dev.get("SubnetMask", "") + dev_gateway = dev.get("Gateway", "") + dev_msgport = dev.get("PortMsg", 0) + dev_version = dev.get("Version", "") + dev_type = dev.get("Device_Type", "30") dnt_id = await upsert_dnt( serial=serial, @@ -69,11 +95,13 @@ async def handle_count_off(data: dict, addr: tuple) -> str | None: _registry[serial] = dnt_id _heartbeat[serial] = time.time() - # 确保采集表存在 await ensure_collect_table(serial) - logger.info(f"设备注册: {serial} (dnt_id={dnt_id}, ip={dev_ip})") - return make_count_off_success() + logger.info( + f"设备登录: {serial} dnt_id={dnt_id} ip={dev_ip}:{dev_port} " + f"type={dev_type} ver={dev_version}" + ) + # 不回复设备 async def handle_heartbeat(data: dict) -> str | None: @@ -85,7 +113,6 @@ async def handle_heartbeat(data: dict) -> str | None: _heartbeat[device_id] = time.time() - # 心跳也插入采集表 (dat_type=0) try: await insert_collect_data(device_id, 0, str(data)) except Exception: @@ -94,8 +121,20 @@ async def handle_heartbeat(data: dict) -> str | None: return make_heartbeat_response(device_id, int(time.time())) -async def handle_timestamp(data: dict) -> str: - """处理时间同步请求""" +def handle_timestamp(data: dict) -> str: + """处理时间同步请求 + + TimeStamp 请求格式: + { + "Method": "TimeStamp", + "Params": { + "Device_id": "2345", + "TimeZone": "Asia/Shanghai" + } + } + + 返回格式参考 PGLC 协议 3.5 节。 + """ params = data.get("Params", {}) device_id = params.get("Device_id", "") return make_timestamp_response(device_id, int(time.time())) @@ -104,8 +143,10 @@ async def handle_timestamp(data: dict) -> str: async def handle_tsreport(data: dict) -> str | None: """处理设备主动上报子设备传感数据 - TSReport 中的 Sub_Dat 是 DG430 的二进制上报数据, + TSReport 中的 Sub_Dat 是 DG430 二进制上报数据(hex 字符串), 存入采集表 (dat_type=8),由解析服务异步处理。 + + 返回确认消息。 """ device_id = data.get("Device_id", "") sensor_dat = data.get("Sensor_Dat", {}) @@ -119,14 +160,16 @@ async def handle_tsreport(data: dict) -> str | None: except Exception as e: logger.error(f"存储 TSReport 失败: {e}") - return None # 协议规定不需要回复(看文档似乎有回复,保守起见返回简单确认) + return make_response("TSReport", 0, device_id) async def handle_serial_net(data: dict) -> str | None: """处理串口透传返回数据 - SerialNet 返回中 SerialDat 是串口设备的应答数据, + SerialNet 中 SerialDat 是串口设备应答数据(hex 字符串), 存入采集表 (dat_type=9)。 + + 返回确认消息。 """ params = data.get("Params") or data device_id = params.get("Device_id", "") @@ -140,15 +183,7 @@ async def handle_serial_net(data: dict) -> str | None: except Exception as e: logger.error(f"存储 SerialNet 失败: {e}") - return None - - -async def handle_device_info(data: dict) -> str | None: - """处理设备信息查询""" - # 当前不做特殊处理,仅记录日志 - device_id = data.get("Device", "") - logger.debug(f"Device_Info 请求: {device_id}") - return None + return make_response("SerialNet", 0, device_id) # ─── 业务解析服务(后台轮询)───────────────────────────────────────── @@ -164,17 +199,14 @@ async def parse_loop(): for rec in records: try: raw = rec["raw_data"] - # raw_data 是十六进制字符串 pkt = hex_str_to_bytes(raw) status = parse_b2_status(pkt) if status is None: continue - # 解码故障和继电器信息 fault_info = decode_fault_info(status.fault) relay_info = decode_relay_info(status.relay_out) - # 设备型号 dev_model_map = {1: "PD132", 2: "DLD110"} str_type = dev_model_map.get(status.dev_model, f"Unknown({status.dev_model})") @@ -207,7 +239,6 @@ async def parse_loop(): except Exception as e: logger.error(f"解析记录 {rec['id']} 失败: {e}") - # 仍然标记为已处理,避免死循环 try: await mark_parsed(device_id, rec["id"]) except Exception: diff --git a/src/server.py b/src/server.py index 724a8a2..ea7fee5 100644 --- a/src/server.py +++ b/src/server.py @@ -1,14 +1,19 @@ """EDC 服务器主入口 — UDP + TCP 异步服务 -UDP 端口 5500: 发现设备 (Count_Off)、心跳 (Heartbeat)、信息查询 -UDP 端口 5505: 消息监听 -TCP 端口 5550: 时间同步 (TimeStamp)、设备数据上报 (TSReport, SerialNet) +UDP :5500 设备登录 (Count_Off)、心跳 (Heartbeat)、传感数据上报 (TSReport) +UDP :5505 消息监听 (同上) +TCP :5550 时间同步 (TimeStamp)、数据上报 (TSReport, SerialNet) + +设备接入流程: + 1. 设备 TCP 连接 → 发送 TimeStamp 请求 → EDC 返回时间 + 2. 设备 UDP 上报 Count_Off (设备详细信息) → EDC 注册设备,不回复 + 3. 设备持续通过 TCP/UDP 上报数据 """ import asyncio +import json import logging import signal -import sys try: import uvloop # type: ignore @@ -26,7 +31,6 @@ from src.handlers import ( handle_timestamp, handle_tsreport, handle_serial_net, - handle_device_info, parse_loop, ) @@ -60,67 +64,98 @@ class EDCProtocol: try: response = None if method == "Count_Off": - response = await handle_count_off(msg, addr) + # 设备登录上报,只处理不回复 + await handle_count_off(msg, addr) elif method == "Heartbeat": response = await handle_heartbeat(msg) - elif method == "Device_Info": - response = await handle_device_info(msg) elif method == "TSReport": - await handle_tsreport(msg) + response = await handle_tsreport(msg) elif method == "SerialNet": - await handle_serial_net(msg) + response = await handle_serial_net(msg) if response and self.transport: self.transport.sendto(response.encode("utf-8"), addr) except Exception as e: - logger.error(f"处理 {method} 异常: {e}", exc_info=True) + logger.error(f"UDP 处理 {method} 异常: {e}", exc_info=True) async def handle_tcp_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - """TCP 客户端连接处理""" + """TCP 客户端连接处理 + + 使用缓冲 + JSON 解析方式处理 TCP 流,支持: + - 换行分隔的 JSON (NDJSON) + - 紧凑 JSON (无换行) + """ addr = writer.get_extra_info("peername") logger.info(f"TCP 连接: {addr}") + buffer = b"" + + async def process_message(msg: dict): + """处理单条消息并返回响应文本""" + method = msg.get("Method", "") + logger.debug(f"TCP {method} from {addr}") + + try: + if method == "TimeStamp": + return handle_timestamp(msg) + elif method == "TSReport": + return await handle_tsreport(msg) + elif method == "SerialNet": + return await handle_serial_net(msg) + elif method == "Heartbeat": + return await handle_heartbeat(msg) + else: + logger.debug(f"TCP 未知方法: {method}") + return None + except Exception as e: + logger.error(f"TCP 处理 {method} 异常: {e}") + return make_error_response(method, msg.get("Device_id", ""), str(e)) + try: while True: - line = await reader.readline() - if not line: + chunk = await reader.read(4096) + if not chunk: break - msg = parse_message(line) - if msg is None: - continue + buffer += chunk - method = msg.get("Method", "") - logger.debug(f"TCP {method} from {addr}") + # 尝试从缓冲区提取完整的 JSON 消息 + while buffer: + # 策略 1: 先尝试换行分隔 + nl_pos = buffer.find(b"\n") + if nl_pos >= 0: + line = buffer[:nl_pos].strip() + buffer = buffer[nl_pos + 1:] + if not line: + continue + msg = parse_message(line) + if msg: + resp = await process_message(msg) + if resp: + writer.write((resp + "\n").encode("utf-8")) + await writer.drain() + continue - try: - response = None - if method == "TimeStamp": - response = await handle_timestamp(msg) - elif method == "TSReport": - await handle_tsreport(msg) - elif method == "SerialNet": - await handle_serial_net(msg) - elif method == "Heartbeat": - response = await handle_heartbeat(msg) - elif method == "Device_Info": - response = await handle_device_info(msg) + # 策略 2: 尝试直接解析整个缓冲区 + try: + decoded = buffer.decode("utf-8") + msg, end = json.JSONDecoder().raw_decode(decoded) + buffer = decoded[end:].encode("utf-8") + resp = await process_message(msg) + if resp: + writer.write((resp + "\n").encode("utf-8")) + await writer.drain() + except json.JSONDecodeError: + # 数据不完整,等待更多数据 + if len(buffer) > 65536: + logger.warning(f"TCP {addr} 缓冲区过大 ({len(buffer)}B),丢弃") + buffer = b"" + break - if response: - writer.write((response + "\n").encode("utf-8")) - await writer.drain() - - except Exception as e: - logger.error(f"TCP 处理 {method} 异常: {e}") - device_id = msg.get("Device_id", "") - err = make_error_response(method, device_id, str(e)) - writer.write((err + "\n").encode("utf-8")) - await writer.drain() - - except (ConnectionResetError, BrokenPipeError): + except (ConnectionResetError, BrokenPipeError, asyncio.IncompleteReadError): pass finally: logger.info(f"TCP 断开: {addr}") @@ -131,28 +166,27 @@ async def handle_tcp_client(reader: asyncio.StreamReader, async def main(): logger.info("EDC 服务启动中...") - # 初始化数据库 await init_pool() - # 启动业务解析后台任务 asyncio.create_task(parse_loop()) - # 启动 UDP 服务 (端口 5500) loop = asyncio.get_running_loop() + + # UDP :5500 udp_transport, _ = await loop.create_datagram_endpoint( lambda: EDCProtocol(), # type: ignore[arg-type] local_addr=(BIND_HOST, UDP_PORT), ) logger.info(f"UDP 服务监听 {BIND_HOST}:{UDP_PORT}") - # 启动 UDP 消息端口 (5505) + # UDP :5505 udp_msg_transport, _ = await loop.create_datagram_endpoint( lambda: EDCProtocol(), # type: ignore[arg-type] local_addr=(BIND_HOST, UDP_MSG_PORT), ) logger.info(f"UDP 消息监听 {BIND_HOST}:{UDP_MSG_PORT}") - # 启动 TCP 服务 (端口 5550) + # TCP :5550 tcp_server = await asyncio.start_server( handle_tcp_client, BIND_HOST, @@ -160,7 +194,6 @@ async def main(): ) logger.info(f"TCP 服务监听 {BIND_HOST}:{TCP_PORT}") - # 优雅退出 stop_event = asyncio.Event() def _shutdown(): @@ -172,7 +205,6 @@ async def main(): await stop_event.wait() - # 清理 tcp_server.close() await tcp_server.wait_closed() udp_transport.close() @@ -182,7 +214,6 @@ async def main(): def run(): - """入口函数""" if uvloop: uvloop.install() logger.info("uvloop 已启用")