fix: serialnet_loop 改为直接查 tb_serialnet,不依赖 _registry
This commit is contained in:
@@ -345,6 +345,7 @@ SERIALNET_TIMEOUT = 10 # 秒
|
|||||||
async def serialnet_loop():
|
async def serialnet_loop():
|
||||||
"""后台轮询:检查 tb_serialnet 待发送指令,通过 UDP 下发
|
"""后台轮询:检查 tb_serialnet 待发送指令,通过 UDP 下发
|
||||||
|
|
||||||
|
直接查询 tb_serialnet 表(不依赖 _registry),
|
||||||
1. state=0 → 发送 SerialNet JSON → state=1
|
1. state=0 → 发送 SerialNet JSON → state=1
|
||||||
2. state=1 且超过 10 秒 → state=3 (超时失败)
|
2. state=1 且超过 10 秒 → state=3 (超时失败)
|
||||||
"""
|
"""
|
||||||
@@ -353,14 +354,11 @@ async def serialnet_loop():
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
for device_id, dnt_id in list(_registry.items()):
|
# 1. 查询所有 state=0 的记录
|
||||||
# 1. 下发待发送指令
|
await _process_pending_all()
|
||||||
pending = await get_pending_serialnet(dnt_id)
|
|
||||||
if pending:
|
|
||||||
await _send_serialnet_cmd(device_id, dnt_id, pending)
|
|
||||||
|
|
||||||
# 2. 超时检测
|
# 2. 查询所有 state=1 超时的记录
|
||||||
await _check_serialnet_timeout(dnt_id)
|
await _check_timeout_all()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"透传轮询异常: {e}")
|
logger.error(f"透传轮询异常: {e}")
|
||||||
@@ -368,6 +366,49 @@ async def serialnet_loop():
|
|||||||
await asyncio.sleep(0.2)
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_pending_all():
|
||||||
|
"""查询所有 state=0 的记录并发送"""
|
||||||
|
from src.models import get_pool
|
||||||
|
import aiomysql
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"SELECT sn.*, d.serial, d.ip FROM tb_serialnet sn "
|
||||||
|
"JOIN dnt_info d ON sn.dnt_id = d.id "
|
||||||
|
"WHERE sn.state = 0 ORDER BY sn.id ASC LIMIT 10"
|
||||||
|
)
|
||||||
|
rows = await cur.fetchall()
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
device_id = row["serial"]
|
||||||
|
dnt_id = row["dnt_id"]
|
||||||
|
if not row.get("ip"):
|
||||||
|
logger.warning(f"设备 {device_id} 无 IP,跳过")
|
||||||
|
await mark_serialnet_timeout(row["id"])
|
||||||
|
continue
|
||||||
|
await _send_serialnet_cmd(device_id, dnt_id, row)
|
||||||
|
|
||||||
|
|
||||||
|
async def _check_timeout_all():
|
||||||
|
"""检查所有 state=1 超时记录"""
|
||||||
|
from src.models import get_pool
|
||||||
|
import aiomysql
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"SELECT id, dnt_id FROM tb_serialnet WHERE state=1 "
|
||||||
|
"AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) "
|
||||||
|
"LIMIT 20",
|
||||||
|
(SERIALNET_TIMEOUT,),
|
||||||
|
)
|
||||||
|
rows = await cur.fetchall()
|
||||||
|
for row in rows:
|
||||||
|
await mark_serialnet_timeout(row["id"])
|
||||||
|
logger.warning(f"tb_serialnet #{row['id']} 超时 dnt_id={row['dnt_id']}")
|
||||||
|
|
||||||
|
|
||||||
async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict):
|
async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict):
|
||||||
"""构造 SerialNet JSON 并通过 UDP 发送给设备"""
|
"""构造 SerialNet JSON 并通过 UDP 发送给设备"""
|
||||||
if _udp_sender is None:
|
if _udp_sender is None:
|
||||||
@@ -398,22 +439,3 @@ async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict):
|
|||||||
_udp_sender(msg_bytes, addr)
|
_udp_sender(msg_bytes, addr)
|
||||||
await mark_serialnet_sent(record["id"])
|
await mark_serialnet_sent(record["id"])
|
||||||
logger.info(f"已发送 SerialNet → {device_id} ({addr}): {send_pkg}")
|
logger.info(f"已发送 SerialNet → {device_id} ({addr}): {send_pkg}")
|
||||||
|
|
||||||
|
|
||||||
async def _check_serialnet_timeout(dnt_id: int):
|
|
||||||
"""检查 state=1 超时记录"""
|
|
||||||
from src.models import get_pool
|
|
||||||
import aiomysql
|
|
||||||
pool = await get_pool()
|
|
||||||
async with pool.acquire() as conn:
|
|
||||||
async with conn.cursor(aiomysql.DictCursor) as cur:
|
|
||||||
await cur.execute(
|
|
||||||
"SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 "
|
|
||||||
"AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) "
|
|
||||||
"LIMIT 5",
|
|
||||||
(dnt_id, SERIALNET_TIMEOUT),
|
|
||||||
)
|
|
||||||
rows = await cur.fetchall()
|
|
||||||
for row in rows:
|
|
||||||
await mark_serialnet_timeout(row["id"])
|
|
||||||
logger.warning(f"tb_serialnet #{row['id']} 超时 (>{SERIALNET_TIMEOUT}s)")
|
|
||||||
|
|||||||
@@ -70,6 +70,7 @@ class EDCProtocol:
|
|||||||
msg = parse_message(data)
|
msg = parse_message(data)
|
||||||
if msg is None:
|
if msg is None:
|
||||||
return
|
return
|
||||||
|
logger.info(f"UDP {msg} from {addr}")
|
||||||
|
|
||||||
method = msg.get("Method", "")
|
method = msg.get("Method", "")
|
||||||
logger.debug(f"UDP {method} from {addr}")
|
logger.debug(f"UDP {method} from {addr}")
|
||||||
@@ -108,6 +109,7 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
|
|||||||
|
|
||||||
async def process_message(msg: dict):
|
async def process_message(msg: dict):
|
||||||
"""处理单条消息并返回响应文本"""
|
"""处理单条消息并返回响应文本"""
|
||||||
|
logger.info(f"TCP get_rcv {msg} from {addr}")
|
||||||
method = msg.get("Method", "")
|
method = msg.get("Method", "")
|
||||||
logger.debug(f"TCP {method} from {addr}")
|
logger.debug(f"TCP {method} from {addr}")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user