From 7a6f56339ca934d747783ad764f79dfde2a9397b Mon Sep 17 00:00:00 2001 From: wangfq Date: Thu, 28 May 2026 09:40:28 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=20tb=5Fserialnet=20?= =?UTF-8?q?=E9=80=8F=E4=BC=A0=E5=8F=91=E9=80=81=E8=A1=A8=20+=20serialnet?= =?UTF-8?q?=5Floop=20=E8=BD=AE=E8=AF=A2=E4=B8=8B=E5=8F=91=20+=20B2?= =?UTF-8?q?=E5=93=8D=E5=BA=94=E5=8C=B9=E9=85=8D=20+=20=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E6=A3=80=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/handlers.py | 122 ++++++++++++++++++++++++++++++++++++++++++++++++ src/models.py | 112 +++++++++++++++++++++++++++++++++++++++++++- src/server.py | 15 ++++++ 3 files changed, 248 insertions(+), 1 deletion(-) diff --git a/src/handlers.py b/src/handlers.py index ce58733..d7f11b4 100644 --- a/src/handlers.py +++ b/src/handlers.py @@ -5,9 +5,11 @@ 2. 设备 UDP 上报 Count_Off (设备信息) → EDC 处理注册,不回复 3. 设备通过 TCP/UDP 上报 TSReport/SerialNet → 存入采集表 4. 后台解析服务轮询采集表 → 解析 DG430 协议 → 写入 tb_state_tst + 5. 透传轮询服务轮询 tb_serialnet → 下发 SerialNet 指令 """ import asyncio +import json import logging import time from datetime import datetime @@ -19,6 +21,11 @@ from src.models import ( fetch_unparsed, mark_record_state, insert_test_result, + get_dnt_by_serial, + get_pending_serialnet, + mark_serialnet_sent, + mark_serialnet_done, + mark_serialnet_timeout, ) from src.dg430 import ( parse_b2_status, @@ -43,6 +50,15 @@ _registry: dict[str, int] = {} # 设备心跳时间: {device_id: last_heartbeat} _heartbeat: dict[str, float] = {} +# UDP transport 引用,由 server.py 注入 +_udp_sender: callable | None = None + + +def set_udp_sender(sender): + """设置 UDP 发送函数(由 server.py 调用)""" + global _udp_sender + _udp_sender = sender + async def handle_count_off(data: dict, addr: tuple): """处理设备登录/身份上报 (Count_Off 返回格式) @@ -201,6 +217,7 @@ async def parse_loop(): 每条 raw_data 可能包含多条拼接的 DG430 指令。 从中找出 B2 (状态上报) 指令解析,其余指令忽略。 校验失败的记录标记为 state=3。 + 解析成功的 B2 会匹配 tb_serialnet 中的待确认记录。 """ logger.info("业务解析服务启动") @@ -266,6 +283,9 @@ async def parse_loop(): exit_speed=status.exit_speed, ) + # 匹配 tb_serialnet 中的待确认记录 (state=1) + await _match_serialnet_response(dnt_id, raw) + has_valid_b2 = True all_failed = False logger.info( @@ -294,3 +314,105 @@ async def parse_loop(): logger.error(f"解析循环异常: {e}") await asyncio.sleep(0.5) + + +async def _match_serialnet_response(dnt_id: int, raw_hex: str): + """收到 B2 响应后,匹配 tb_serialnet 中 state=1 的第一条记录""" + try: + from src.models import get_pool + import aiomysql + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 " + "ORDER BY id ASC LIMIT 1", + (dnt_id,), + ) + row = await cur.fetchone() + if row: + await mark_serialnet_done(row["id"], raw_hex) + logger.info(f"tb_serialnet #{row['id']} 已确认完成 (dnt_id={dnt_id})") + except Exception as e: + logger.warning(f"匹配 serialnet 响应失败: {e}") + + +# ─── 透传指令轮询服务 ─────────────────────────────────────────────── + +SERIALNET_TIMEOUT = 10 # 秒 + + +async def serialnet_loop(): + """后台轮询:检查 tb_serialnet 待发送指令,通过 UDP 下发 + + 1. state=0 → 发送 SerialNet JSON → state=1 + 2. state=1 且超过 10 秒 → state=3 (超时失败) + """ + logger.info("透传轮询服务启动") + await asyncio.sleep(2) # 等 UDP transport 就绪 + + while True: + try: + for device_id, dnt_id in list(_registry.items()): + # 1. 下发待发送指令 + pending = await get_pending_serialnet(dnt_id) + if pending: + await _send_serialnet_cmd(device_id, dnt_id, pending) + + # 2. 超时检测 + await _check_serialnet_timeout(dnt_id) + + except Exception as e: + logger.error(f"透传轮询异常: {e}") + + await asyncio.sleep(0.2) + + +async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict): + """构造 SerialNet JSON 并通过 UDP 发送给设备""" + if _udp_sender is None: + logger.warning("UDP sender 未就绪,跳过发送") + return + + # 获取设备 IP 和 msgport + dnt = await get_dnt_by_serial(device_id) + if not dnt or not dnt.get("ip") or not dnt.get("msgport"): + logger.warning(f"设备 {device_id} 无 IP/msgport 信息,跳过") + return + + send_pkg = record["send_pkg"] + addr = (dnt["ip"], dnt["msgport"]) + + # 构造 SerialNet JSON + msg = { + "Method": "SerialNet", + "Params": { + "Device_id": device_id, + "Extra_id": 0, + "Bus_Num": 0, + "SerialDat": send_pkg, + }, + } + msg_bytes = json.dumps(msg, ensure_ascii=False).encode("utf-8") + _udp_sender(msg_bytes, addr) + await mark_serialnet_sent(record["id"]) + logger.info(f"已发送 SerialNet → {device_id} ({addr}): {send_pkg}") + + +async def _check_serialnet_timeout(dnt_id: int): + """检查 state=1 超时记录""" + from src.models import get_pool + import aiomysql + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 " + "AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) " + "LIMIT 5", + (dnt_id, SERIALNET_TIMEOUT), + ) + rows = await cur.fetchall() + for row in rows: + await mark_serialnet_timeout(row["id"]) + logger.warning(f"tb_serialnet #{row['id']} 超时 (>{SERIALNET_TIMEOUT}s)") diff --git a/src/models.py b/src/models.py index 3625bce..239cd0c 100644 --- a/src/models.py +++ b/src/models.py @@ -121,7 +121,20 @@ async def _create_tables(pool: aiomysql.Pool): ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) - # 4. 采集表模板(不直接创建表,设备注册时按此结构动态建表) + # 4. 透传发送表 + await cur.execute(""" + CREATE TABLE IF NOT EXISTS `tb_serialnet` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `dnt_id` INT NOT NULL COMMENT 'FK → dnt_info.id', + `send_pkg` VARCHAR(380) DEFAULT '' COMMENT '发送指令包(hex)', + `rcv_pkg` VARCHAR(380) DEFAULT '' COMMENT '接收指令包(hex)', + `state` TINYINT DEFAULT 0 COMMENT '0未发送, 1已发送, 2已完成, 3超时失败', + `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, + `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX `idx_dnt_state` (`dnt_id`, `state`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + logger.info("数据库表初始化完成") @@ -270,3 +283,100 @@ async def set_device_offline(serial: str): "UPDATE dnt_info SET state=0, last_off=NOW() WHERE serial=%s", (serial,), ) + + +# ─── tb_serialnet CRUD ───────────────────────────────────────────── + +async def get_pending_serialnet(dnt_id: int) -> dict | None: + """获取该设备 state=0 的第一条待发送记录""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT * FROM tb_serialnet WHERE dnt_id=%s AND state=0 " + "ORDER BY id ASC LIMIT 1", + (dnt_id,), + ) + return await cur.fetchone() + + +async def mark_serialnet_sent(record_id: int): + """标记为 state=1 (已发送)""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + "UPDATE tb_serialnet SET state=1 WHERE id=%s", (record_id,), + ) + + +async def mark_serialnet_done(record_id: int, rcv_pkg: str): + """标记为 state=2 (已完成,收到回复)""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + "UPDATE tb_serialnet SET state=2, rcv_pkg=%s WHERE id=%s", + (rcv_pkg, record_id), + ) + + +async def mark_serialnet_timeout(record_id: int): + """标记为 state=3 (超时失败)""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + "UPDATE tb_serialnet SET state=3 WHERE id=%s", (record_id,), + ) + + +async def get_serialnet_stats(dnt_id: int) -> dict: + """返回 {total, sent, done, failed} 计数""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT state, COUNT(*) as cnt FROM tb_serialnet " + "WHERE dnt_id=%s GROUP BY state", + (dnt_id,), + ) + rows = await cur.fetchall() + stats = {"total": 0, "pending": 0, "sent": 0, "done": 0, "failed": 0} + for r in rows: + s = r["state"] + stats["total"] += r["cnt"] + if s == 0: + stats["pending"] = r["cnt"] + elif s == 1: + stats["sent"] = r["cnt"] + elif s == 2: + stats["done"] = r["cnt"] + elif s == 3: + stats["failed"] = r["cnt"] + return stats + + +async def insert_serialnet(dnt_id: int, send_pkg: str) -> int: + """插入新透传指令,返回 record_id""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + "INSERT INTO tb_serialnet (dnt_id, send_pkg) VALUES (%s, %s)", + (dnt_id, send_pkg), + ) + return cur.lastrowid + + +async def get_pending_by_device(dnt_id: int) -> list[dict]: + """获取设备所有未完成的记录 (state < 2)""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT * FROM tb_serialnet WHERE dnt_id=%s AND state IN (0,1) " + "ORDER BY id ASC", + (dnt_id,), + ) + return await cur.fetchall() diff --git a/src/server.py b/src/server.py index ea7fee5..3b5953a 100644 --- a/src/server.py +++ b/src/server.py @@ -32,6 +32,8 @@ from src.handlers import ( handle_tsreport, handle_serial_net, parse_loop, + serialnet_loop, + set_udp_sender, ) logging.basicConfig( @@ -40,6 +42,15 @@ logging.basicConfig( ) logger = logging.getLogger("edc") +# 全局 UDP transport,供 serialnet_loop 发送指令 +_udp_transport: asyncio.DatagramTransport | None = None + + +def send_udp(data: bytes, addr: tuple[str, int]): + """通过全局 UDP transport 发送数据""" + if _udp_transport: + _udp_transport.sendto(data, addr) + class EDCProtocol: """asyncio UDP 协议处理器""" @@ -49,6 +60,8 @@ class EDCProtocol: def connection_made(self, transport): self.transport = transport + global _udp_transport + _udp_transport = transport def datagram_received(self, data, addr): asyncio.ensure_future(self._handle(data, addr)) @@ -169,6 +182,7 @@ async def main(): await init_pool() asyncio.create_task(parse_loop()) + asyncio.create_task(serialnet_loop()) loop = asyncio.get_running_loop() @@ -177,6 +191,7 @@ async def main(): lambda: EDCProtocol(), # type: ignore[arg-type] local_addr=(BIND_HOST, UDP_PORT), ) + set_udp_sender(send_udp) # 注入到 handlers 供 serialnet_loop 使用 logger.info(f"UDP 服务监听 {BIND_HOST}:{UDP_PORT}") # UDP :5505