diff --git a/src/handlers.py b/src/handlers.py index 5710b82..c451044 100644 --- a/src/handlers.py +++ b/src/handlers.py @@ -345,6 +345,7 @@ SERIALNET_TIMEOUT = 10 # 秒 async def serialnet_loop(): """后台轮询:检查 tb_serialnet 待发送指令,通过 UDP 下发 + 直接查询 tb_serialnet 表(不依赖 _registry), 1. state=0 → 发送 SerialNet JSON → state=1 2. state=1 且超过 10 秒 → state=3 (超时失败) """ @@ -353,14 +354,11 @@ async def serialnet_loop(): while True: try: - for device_id, dnt_id in list(_registry.items()): - # 1. 下发待发送指令 - pending = await get_pending_serialnet(dnt_id) - if pending: - await _send_serialnet_cmd(device_id, dnt_id, pending) + # 1. 查询所有 state=0 的记录 + await _process_pending_all() - # 2. 超时检测 - await _check_serialnet_timeout(dnt_id) + # 2. 查询所有 state=1 超时的记录 + await _check_timeout_all() except Exception as e: logger.error(f"透传轮询异常: {e}") @@ -368,6 +366,49 @@ async def serialnet_loop(): await asyncio.sleep(0.2) +async def _process_pending_all(): + """查询所有 state=0 的记录并发送""" + from src.models import get_pool + import aiomysql + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT sn.*, d.serial, d.ip FROM tb_serialnet sn " + "JOIN dnt_info d ON sn.dnt_id = d.id " + "WHERE sn.state = 0 ORDER BY sn.id ASC LIMIT 10" + ) + rows = await cur.fetchall() + + for row in rows: + device_id = row["serial"] + dnt_id = row["dnt_id"] + if not row.get("ip"): + logger.warning(f"设备 {device_id} 无 IP,跳过") + await mark_serialnet_timeout(row["id"]) + continue + await _send_serialnet_cmd(device_id, dnt_id, row) + + +async def _check_timeout_all(): + """检查所有 state=1 超时记录""" + from src.models import get_pool + import aiomysql + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT id, dnt_id FROM tb_serialnet WHERE state=1 " + "AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) " + "LIMIT 20", + (SERIALNET_TIMEOUT,), + ) + rows = await cur.fetchall() + for row in rows: + await mark_serialnet_timeout(row["id"]) + logger.warning(f"tb_serialnet #{row['id']} 超时 dnt_id={row['dnt_id']}") + + async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict): """构造 SerialNet JSON 并通过 UDP 发送给设备""" if _udp_sender is None: @@ -398,22 +439,3 @@ async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict): _udp_sender(msg_bytes, addr) await mark_serialnet_sent(record["id"]) logger.info(f"已发送 SerialNet → {device_id} ({addr}): {send_pkg}") - - -async def _check_serialnet_timeout(dnt_id: int): - """检查 state=1 超时记录""" - from src.models import get_pool - import aiomysql - pool = await get_pool() - async with pool.acquire() as conn: - async with conn.cursor(aiomysql.DictCursor) as cur: - await cur.execute( - "SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 " - "AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) " - "LIMIT 5", - (dnt_id, SERIALNET_TIMEOUT), - ) - rows = await cur.fetchall() - for row in rows: - await mark_serialnet_timeout(row["id"]) - logger.warning(f"tb_serialnet #{row['id']} 超时 (>{SERIALNET_TIMEOUT}s)") diff --git a/src/server.py b/src/server.py index 3b5953a..7ad6779 100644 --- a/src/server.py +++ b/src/server.py @@ -70,6 +70,7 @@ class EDCProtocol: msg = parse_message(data) if msg is None: return + logger.info(f"UDP {msg} from {addr}") method = msg.get("Method", "") logger.debug(f"UDP {method} from {addr}") @@ -108,6 +109,7 @@ async def handle_tcp_client(reader: asyncio.StreamReader, async def process_message(msg: dict): """处理单条消息并返回响应文本""" + logger.info(f"TCP get_rcv {msg} from {addr}") method = msg.get("Method", "") logger.debug(f"TCP {method} from {addr}")