@@ -345,6 +345,7 @@ SERIALNET_TIMEOUT = 10 # 秒
async def serialnet_loop ( ) :
""" 后台轮询:检查 tb_serialnet 待发送指令,通过 UDP 下发
直接查询 tb_serialnet 表(不依赖 _registry) ,
1. state=0 → 发送 SerialNet JSON → state=1
2. state=1 且超过 10 秒 → state=3 (超时失败)
"""
@@ -353,14 +354,11 @@ async def serialnet_loop():
while True :
try :
for device_id , dnt_id in list ( _registry . items ( ) ) :
# 1. 下发待发送指令
pending = await get_pending_serialnet ( dnt_id )
if pending :
await _send_serialnet_cmd ( device_id , dnt_id , pending )
# 1. 查询所有 state=0 的记录
await _process_pending_all ( )
# 2. 超时检测
await _check_serialnet_timeout ( dnt_id )
# 2. 查询所有 state=1 超时的记录
await _check_timeout_all ( )
except Exception as e :
logger . error ( f " 透传轮询异常: { e } " )
@@ -368,6 +366,49 @@ async def serialnet_loop():
await asyncio . sleep ( 0.2 )
async def _process_pending_all ( ) :
""" 查询所有 state=0 的记录并发送 """
from src . models import get_pool
import aiomysql
pool = await get_pool ( )
async with pool . acquire ( ) as conn :
async with conn . cursor ( aiomysql . DictCursor ) as cur :
await cur . execute (
" SELECT sn.*, d.serial, d.ip FROM tb_serialnet sn "
" JOIN dnt_info d ON sn.dnt_id = d.id "
" WHERE sn.state = 0 ORDER BY sn.id ASC LIMIT 10 "
)
rows = await cur . fetchall ( )
for row in rows :
device_id = row [ " serial " ]
dnt_id = row [ " dnt_id " ]
if not row . get ( " ip " ) :
logger . warning ( f " 设备 { device_id } 无 IP, 跳过 " )
await mark_serialnet_timeout ( row [ " id " ] )
continue
await _send_serialnet_cmd ( device_id , dnt_id , row )
async def _check_timeout_all ( ) :
""" 检查所有 state=1 超时记录 """
from src . models import get_pool
import aiomysql
pool = await get_pool ( )
async with pool . acquire ( ) as conn :
async with conn . cursor ( aiomysql . DictCursor ) as cur :
await cur . execute (
" SELECT id, dnt_id FROM tb_serialnet WHERE state=1 "
" AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) "
" LIMIT 20 " ,
( SERIALNET_TIMEOUT , ) ,
)
rows = await cur . fetchall ( )
for row in rows :
await mark_serialnet_timeout ( row [ " id " ] )
logger . warning ( f " tb_serialnet # { row [ ' id ' ] } 超时 dnt_id= { row [ ' dnt_id ' ] } " )
async def _send_serialnet_cmd ( device_id : str , dnt_id : int , record : dict ) :
""" 构造 SerialNet JSON 并通过 UDP 发送给设备 """
if _udp_sender is None :
@@ -398,22 +439,3 @@ async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict):
_udp_sender ( msg_bytes , addr )
await mark_serialnet_sent ( record [ " id " ] )
logger . info ( f " 已发送 SerialNet → { device_id } ( { addr } ): { send_pkg } " )
async def _check_serialnet_timeout ( dnt_id : int ) :
""" 检查 state=1 超时记录 """
from src . models import get_pool
import aiomysql
pool = await get_pool ( )
async with pool . acquire ( ) as conn :
async with conn . cursor ( aiomysql . DictCursor ) as cur :
await cur . execute (
" SELECT id FROM tb_serialnet WHERE dnt_id= %s AND state=1 "
" AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) "
" LIMIT 5 " ,
( dnt_id , SERIALNET_TIMEOUT ) ,
)
rows = await cur . fetchall ( )
for row in rows :
await mark_serialnet_timeout ( row [ " id " ] )
logger . warning ( f " tb_serialnet # { row [ ' id ' ] } 超时 (> { SERIALNET_TIMEOUT } s) " )