fix: device_status_monitor 增加 dnt_info 全表扫描,修正状态不一致

- 新增 get_all_device_serials() 查询 dnt_info 全表
- device_status_monitor 改为三阶段:
  Phase 1: 遍历 _registry 活跃设备
  Phase 2: 扫描 dnt_info 全表,修正 DB 状态与交互实际不符的情况:
    - state=1(在线) 但 >60s 无交互 → 更新为离线
    - state=0(离线) 但有交互 → 根据交互模式更新
  Phase 3: 预留清理位
- 提取 _apply_state_change() 避免重复代码
- Count_Off 登录时主动设 _device_status[serial]=1,
  防止刚登录只有 1 条交互记录时被误判为 通信不良
This commit is contained in:
wangfq
2026-06-10 09:36:01 +08:00
parent ef890fafc6
commit 11f1c4f55b
2 changed files with 80 additions and 32 deletions

View File

@@ -32,6 +32,7 @@ from src.models import (
upsert_fixture_param, upsert_fixture_param,
insert_device_log, insert_device_log,
update_device_status, update_device_status,
get_all_device_serials,
) )
from src.dg430 import ( from src.dg430 import (
parse_b2_status, parse_b2_status,
@@ -147,6 +148,7 @@ async def handle_count_off(data: dict, addr: tuple):
_registry[serial] = dnt_id _registry[serial] = dnt_id
_heartbeat[serial] = time.time() _heartbeat[serial] = time.time()
record_interaction(serial) record_interaction(serial)
_device_status[serial] = 1 # 登录即视为在线状态
await ensure_collect_table(serial) await ensure_collect_table(serial)
@@ -717,13 +719,14 @@ MONITOR_INTERVAL = 5 # 状态检查间隔 (秒)
async def device_status_monitor(): async def device_status_monitor():
"""后台轮询:监控所有已注册设备在线/离线状态 """后台轮询:监控所有设备在线/离线状态
判定规则 两个阶段
1. > OFFLINE_IDLE_SEC 无交互 → 离线 Phase 1 — 遍历 _registry 中活跃设备,根据交互记录判定状态
2. 最近 OFFLINE_IDLE_SEC 内交互次数 < POOR_MIN_INTERACTIONS → 通信不良 Phase 2 — 扫描 dnt_info 全表,修正与实际交互不符的状态:
3. 最近 3 次交互间隔均 < INTERACTION_TIMEOUT → 在线 - state=1(在线) 但 >60s 无交互 → 更新为离线
4. 状态变化时写入 tb_device_log + 更新 dnt_info.state - state=0(离线) 但有交互记录 → 根据交互模式更新为在线/通信不良
- state 与计算值不一致 → 同步修正
""" """
logger.info("设备状态监控服务启动") logger.info("设备状态监控服务启动")
await asyncio.sleep(5) # 等其它服务就绪 await asyncio.sleep(5) # 等其它服务就绪
@@ -731,39 +734,39 @@ async def device_status_monitor():
while True: while True:
try: try:
now = time.time() now = time.time()
# ── Phase 1: 活跃设备(在 _registry 中)────
for device_id, dnt_id in list(_registry.items()): for device_id, dnt_id in list(_registry.items()):
interactions = _interactions.get(device_id) interactions = _interactions.get(device_id)
if not interactions: actual_state = _calc_device_state(interactions, now) if interactions else 0
old_state = _device_status.get(device_id, 1) # 注册时默认在线
if actual_state != old_state:
await _apply_state_change(device_id, actual_state, old_state)
# ── Phase 2: 扫描 dnt_info 全表,修正不一致 ──
try:
rows = await get_all_device_serials()
except Exception as e:
logger.error(f"查询 dnt_info 失败: {e}")
rows = []
for serial, db_state, ip in rows:
if not serial:
continue continue
# 计算新状态 interactions = _interactions.get(serial)
new_state = _calc_device_state(interactions, now) actual_state = _calc_device_state(interactions, now) if interactions else 0
old_state = _device_status.get(device_id, 1) # 默认在线 memory_state = _device_status.get(serial)
if new_state != old_state: # 优先用内存状态做基准(更实时);无内存状态则用 DB 状态
_device_status[device_id] = new_state tracked_state = memory_state if memory_state is not None else db_state
logger.info(
f"设备 {device_id} 状态变更: "
f"{_state_name(old_state)}{_state_name(new_state)}"
)
# 更新 dnt_info if actual_state != tracked_state:
try: await _apply_state_change(serial, actual_state, tracked_state, ip=ip)
await update_device_status(device_id, new_state)
except Exception as e:
logger.error(f"更新设备状态失败: {e}")
# 写入事件日志 # ── Phase 3: 清理 _registry 中已经不存在的设备 ──
dnt = await get_dnt_by_serial(device_id) # (已注销 / 长期无交互的设备不清除,继续监控)
dev_ip = dnt.get("ip", "") if dnt else ""
event_type, content = _state_event(new_state, old_state)
try:
await insert_device_log(
serial=device_id, ip=dev_ip,
event_type=event_type, content=content,
)
except Exception as e:
logger.error(f"写入设备事件日志失败: {e}")
except Exception as e: except Exception as e:
logger.error(f"设备状态监控异常: {e}") logger.error(f"设备状态监控异常: {e}")
@@ -771,6 +774,40 @@ async def device_status_monitor():
await asyncio.sleep(MONITOR_INTERVAL) await asyncio.sleep(MONITOR_INTERVAL)
async def _apply_state_change(device_id: str, new_state: int, old_state: int,
ip: str = ""):
"""应用状态变更更新内存、dnt_info、写入日志"""
_device_status[device_id] = new_state
logger.info(
f"设备 {device_id} 状态变更: "
f"{_state_name(old_state)}{_state_name(new_state)}"
)
# 更新 dnt_info
try:
await update_device_status(device_id, new_state)
except Exception as e:
logger.error(f"更新设备状态失败: {e}")
# 获取设备 IP未传入时从 DB 查)
if not ip:
try:
dnt = await get_dnt_by_serial(device_id)
ip = dnt.get("ip", "") if dnt else ""
except Exception:
pass
# 写入事件日志
event_type, content = _state_event(new_state, old_state)
try:
await insert_device_log(
serial=device_id, ip=ip,
event_type=event_type, content=content,
)
except Exception as e:
logger.error(f"写入设备事件日志失败: {e}")
def _calc_device_state(interactions: deque, now: float) -> int: def _calc_device_state(interactions: deque, now: float) -> int:
"""根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)""" """根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)"""
# 清理过期记录(仅保留 60s 内) # 清理过期记录(仅保留 60s 内)

View File

@@ -604,6 +604,17 @@ async def update_device_status(serial: str, state: int):
) )
async def get_all_device_serials() -> list[tuple[str, int, str]]:
"""获取所有设备 (serial, state, ip),用于状态扫描"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"SELECT serial, state, ip FROM dnt_info ORDER BY id"
)
return await cur.fetchall()
# ─── tb_serialnet CRUD ───────────────────────────────────────────── # ─── tb_serialnet CRUD ─────────────────────────────────────────────
async def get_pending_serialnet(dnt_id: int) -> dict | None: async def get_pending_serialnet(dnt_id: int) -> dict | None: