feat: 新增 tb_serialnet 透传发送表 + serialnet_loop 轮询下发 + B2响应匹配 + 超时检测

This commit is contained in:
wangfq
2026-05-28 09:40:28 +08:00
parent 97302caf6b
commit 7a6f56339c
3 changed files with 248 additions and 1 deletions

View File

@@ -5,9 +5,11 @@
2. 设备 UDP 上报 Count_Off (设备信息) → EDC 处理注册,不回复
3. 设备通过 TCP/UDP 上报 TSReport/SerialNet → 存入采集表
4. 后台解析服务轮询采集表 → 解析 DG430 协议 → 写入 tb_state_tst
5. 透传轮询服务轮询 tb_serialnet → 下发 SerialNet 指令
"""
import asyncio
import json
import logging
import time
from datetime import datetime
@@ -19,6 +21,11 @@ from src.models import (
fetch_unparsed,
mark_record_state,
insert_test_result,
get_dnt_by_serial,
get_pending_serialnet,
mark_serialnet_sent,
mark_serialnet_done,
mark_serialnet_timeout,
)
from src.dg430 import (
parse_b2_status,
@@ -43,6 +50,15 @@ _registry: dict[str, int] = {}
# 设备心跳时间: {device_id: last_heartbeat}
_heartbeat: dict[str, float] = {}
# UDP transport 引用,由 server.py 注入
_udp_sender: callable | None = None
def set_udp_sender(sender):
"""设置 UDP 发送函数(由 server.py 调用)"""
global _udp_sender
_udp_sender = sender
async def handle_count_off(data: dict, addr: tuple):
"""处理设备登录/身份上报 (Count_Off 返回格式)
@@ -201,6 +217,7 @@ async def parse_loop():
每条 raw_data 可能包含多条拼接的 DG430 指令。
从中找出 B2 (状态上报) 指令解析,其余指令忽略。
校验失败的记录标记为 state=3。
解析成功的 B2 会匹配 tb_serialnet 中的待确认记录。
"""
logger.info("业务解析服务启动")
@@ -266,6 +283,9 @@ async def parse_loop():
exit_speed=status.exit_speed,
)
# 匹配 tb_serialnet 中的待确认记录 (state=1)
await _match_serialnet_response(dnt_id, raw)
has_valid_b2 = True
all_failed = False
logger.info(
@@ -294,3 +314,105 @@ async def parse_loop():
logger.error(f"解析循环异常: {e}")
await asyncio.sleep(0.5)
async def _match_serialnet_response(dnt_id: int, raw_hex: str):
"""收到 B2 响应后,匹配 tb_serialnet 中 state=1 的第一条记录"""
try:
from src.models import get_pool
import aiomysql
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 "
"ORDER BY id ASC LIMIT 1",
(dnt_id,),
)
row = await cur.fetchone()
if row:
await mark_serialnet_done(row["id"], raw_hex)
logger.info(f"tb_serialnet #{row['id']} 已确认完成 (dnt_id={dnt_id})")
except Exception as e:
logger.warning(f"匹配 serialnet 响应失败: {e}")
# ─── 透传指令轮询服务 ───────────────────────────────────────────────
SERIALNET_TIMEOUT = 10 # 秒
async def serialnet_loop():
"""后台轮询:检查 tb_serialnet 待发送指令,通过 UDP 下发
1. state=0 → 发送 SerialNet JSON → state=1
2. state=1 且超过 10 秒 → state=3 (超时失败)
"""
logger.info("透传轮询服务启动")
await asyncio.sleep(2) # 等 UDP transport 就绪
while True:
try:
for device_id, dnt_id in list(_registry.items()):
# 1. 下发待发送指令
pending = await get_pending_serialnet(dnt_id)
if pending:
await _send_serialnet_cmd(device_id, dnt_id, pending)
# 2. 超时检测
await _check_serialnet_timeout(dnt_id)
except Exception as e:
logger.error(f"透传轮询异常: {e}")
await asyncio.sleep(0.2)
async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict):
"""构造 SerialNet JSON 并通过 UDP 发送给设备"""
if _udp_sender is None:
logger.warning("UDP sender 未就绪,跳过发送")
return
# 获取设备 IP 和 msgport
dnt = await get_dnt_by_serial(device_id)
if not dnt or not dnt.get("ip") or not dnt.get("msgport"):
logger.warning(f"设备 {device_id} 无 IP/msgport 信息,跳过")
return
send_pkg = record["send_pkg"]
addr = (dnt["ip"], dnt["msgport"])
# 构造 SerialNet JSON
msg = {
"Method": "SerialNet",
"Params": {
"Device_id": device_id,
"Extra_id": 0,
"Bus_Num": 0,
"SerialDat": send_pkg,
},
}
msg_bytes = json.dumps(msg, ensure_ascii=False).encode("utf-8")
_udp_sender(msg_bytes, addr)
await mark_serialnet_sent(record["id"])
logger.info(f"已发送 SerialNet → {device_id} ({addr}): {send_pkg}")
async def _check_serialnet_timeout(dnt_id: int):
"""检查 state=1 超时记录"""
from src.models import get_pool
import aiomysql
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 "
"AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) "
"LIMIT 5",
(dnt_id, SERIALNET_TIMEOUT),
)
rows = await cur.fetchall()
for row in rows:
await mark_serialnet_timeout(row["id"])
logger.warning(f"tb_serialnet #{row['id']} 超时 (>{SERIALNET_TIMEOUT}s)")

View File

@@ -121,7 +121,20 @@ async def _create_tables(pool: aiomysql.Pool):
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# 4. 采集表模板(不直接创建表,设备注册时按此结构动态建表)
# 4. 透传发送表
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_serialnet` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`dnt_id` INT NOT NULL COMMENT 'FK → dnt_info.id',
`send_pkg` VARCHAR(380) DEFAULT '' COMMENT '发送指令包(hex)',
`rcv_pkg` VARCHAR(380) DEFAULT '' COMMENT '接收指令包(hex)',
`state` TINYINT DEFAULT 0 COMMENT '0未发送, 1已发送, 2已完成, 3超时失败',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX `idx_dnt_state` (`dnt_id`, `state`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
logger.info("数据库表初始化完成")
@@ -270,3 +283,100 @@ async def set_device_offline(serial: str):
"UPDATE dnt_info SET state=0, last_off=NOW() WHERE serial=%s",
(serial,),
)
# ─── tb_serialnet CRUD ─────────────────────────────────────────────
async def get_pending_serialnet(dnt_id: int) -> dict | None:
"""获取该设备 state=0 的第一条待发送记录"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT * FROM tb_serialnet WHERE dnt_id=%s AND state=0 "
"ORDER BY id ASC LIMIT 1",
(dnt_id,),
)
return await cur.fetchone()
async def mark_serialnet_sent(record_id: int):
"""标记为 state=1 (已发送)"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"UPDATE tb_serialnet SET state=1 WHERE id=%s", (record_id,),
)
async def mark_serialnet_done(record_id: int, rcv_pkg: str):
"""标记为 state=2 (已完成,收到回复)"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"UPDATE tb_serialnet SET state=2, rcv_pkg=%s WHERE id=%s",
(rcv_pkg, record_id),
)
async def mark_serialnet_timeout(record_id: int):
"""标记为 state=3 (超时失败)"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"UPDATE tb_serialnet SET state=3 WHERE id=%s", (record_id,),
)
async def get_serialnet_stats(dnt_id: int) -> dict:
"""返回 {total, sent, done, failed} 计数"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT state, COUNT(*) as cnt FROM tb_serialnet "
"WHERE dnt_id=%s GROUP BY state",
(dnt_id,),
)
rows = await cur.fetchall()
stats = {"total": 0, "pending": 0, "sent": 0, "done": 0, "failed": 0}
for r in rows:
s = r["state"]
stats["total"] += r["cnt"]
if s == 0:
stats["pending"] = r["cnt"]
elif s == 1:
stats["sent"] = r["cnt"]
elif s == 2:
stats["done"] = r["cnt"]
elif s == 3:
stats["failed"] = r["cnt"]
return stats
async def insert_serialnet(dnt_id: int, send_pkg: str) -> int:
"""插入新透传指令,返回 record_id"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"INSERT INTO tb_serialnet (dnt_id, send_pkg) VALUES (%s, %s)",
(dnt_id, send_pkg),
)
return cur.lastrowid
async def get_pending_by_device(dnt_id: int) -> list[dict]:
"""获取设备所有未完成的记录 (state < 2)"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT * FROM tb_serialnet WHERE dnt_id=%s AND state IN (0,1) "
"ORDER BY id ASC",
(dnt_id,),
)
return await cur.fetchall()

View File

@@ -32,6 +32,8 @@ from src.handlers import (
handle_tsreport,
handle_serial_net,
parse_loop,
serialnet_loop,
set_udp_sender,
)
logging.basicConfig(
@@ -40,6 +42,15 @@ logging.basicConfig(
)
logger = logging.getLogger("edc")
# 全局 UDP transport供 serialnet_loop 发送指令
_udp_transport: asyncio.DatagramTransport | None = None
def send_udp(data: bytes, addr: tuple[str, int]):
"""通过全局 UDP transport 发送数据"""
if _udp_transport:
_udp_transport.sendto(data, addr)
class EDCProtocol:
"""asyncio UDP 协议处理器"""
@@ -49,6 +60,8 @@ class EDCProtocol:
def connection_made(self, transport):
self.transport = transport
global _udp_transport
_udp_transport = transport
def datagram_received(self, data, addr):
asyncio.ensure_future(self._handle(data, addr))
@@ -169,6 +182,7 @@ async def main():
await init_pool()
asyncio.create_task(parse_loop())
asyncio.create_task(serialnet_loop())
loop = asyncio.get_running_loop()
@@ -177,6 +191,7 @@ async def main():
lambda: EDCProtocol(), # type: ignore[arg-type]
local_addr=(BIND_HOST, UDP_PORT),
)
set_udp_sender(send_udp) # 注入到 handlers 供 serialnet_loop 使用
logger.info(f"UDP 服务监听 {BIND_HOST}:{UDP_PORT}")
# UDP :5505