diff --git a/src/handlers.py b/src/handlers.py index e1dc92c..c768e0d 100644 --- a/src/handlers.py +++ b/src/handlers.py @@ -32,6 +32,7 @@ from src.models import ( upsert_fixture_param, insert_device_log, update_device_status, + get_all_device_serials, ) from src.dg430 import ( parse_b2_status, @@ -147,6 +148,7 @@ async def handle_count_off(data: dict, addr: tuple): _registry[serial] = dnt_id _heartbeat[serial] = time.time() record_interaction(serial) + _device_status[serial] = 1 # 登录即视为在线状态 await ensure_collect_table(serial) @@ -717,13 +719,14 @@ MONITOR_INTERVAL = 5 # 状态检查间隔 (秒) async def device_status_monitor(): - """后台轮询:监控所有已注册设备在线/离线状态 + """后台轮询:监控所有设备在线/离线状态 - 判定规则: - 1. > OFFLINE_IDLE_SEC 无交互 → 离线 - 2. 最近 OFFLINE_IDLE_SEC 内交互次数 < POOR_MIN_INTERACTIONS → 通信不良 - 3. 最近 3 次交互间隔均 < INTERACTION_TIMEOUT → 在线 - 4. 状态变化时写入 tb_device_log + 更新 dnt_info.state + 两个阶段: + Phase 1 — 遍历 _registry 中活跃设备,根据交互记录判定状态 + Phase 2 — 扫描 dnt_info 全表,修正与实际交互不符的状态: + - state=1(在线) 但 >60s 无交互 → 更新为离线 + - state=0(离线) 但有交互记录 → 根据交互模式更新为在线/通信不良 + - state 与计算值不一致 → 同步修正 """ logger.info("设备状态监控服务启动") await asyncio.sleep(5) # 等其它服务就绪 @@ -731,39 +734,39 @@ async def device_status_monitor(): while True: try: now = time.time() + + # ── Phase 1: 活跃设备(在 _registry 中)──── for device_id, dnt_id in list(_registry.items()): interactions = _interactions.get(device_id) - if not interactions: + actual_state = _calc_device_state(interactions, now) if interactions else 0 + old_state = _device_status.get(device_id, 1) # 注册时默认在线 + + if actual_state != old_state: + await _apply_state_change(device_id, actual_state, old_state) + + # ── Phase 2: 扫描 dnt_info 全表,修正不一致 ── + try: + rows = await get_all_device_serials() + except Exception as e: + logger.error(f"查询 dnt_info 失败: {e}") + rows = [] + + for serial, db_state, ip in rows: + if not serial: continue - # 计算新状态 - new_state = _calc_device_state(interactions, now) - old_state = _device_status.get(device_id, 1) # 默认在线 + interactions = _interactions.get(serial) + actual_state = _calc_device_state(interactions, now) if interactions else 0 + memory_state = _device_status.get(serial) - if new_state != old_state: - _device_status[device_id] = new_state - logger.info( - f"设备 {device_id} 状态变更: " - f"{_state_name(old_state)} → {_state_name(new_state)}" - ) + # 优先用内存状态做基准(更实时);无内存状态则用 DB 状态 + tracked_state = memory_state if memory_state is not None else db_state - # 更新 dnt_info - try: - await update_device_status(device_id, new_state) - except Exception as e: - logger.error(f"更新设备状态失败: {e}") + if actual_state != tracked_state: + await _apply_state_change(serial, actual_state, tracked_state, ip=ip) - # 写入事件日志 - dnt = await get_dnt_by_serial(device_id) - dev_ip = dnt.get("ip", "") if dnt else "" - event_type, content = _state_event(new_state, old_state) - try: - await insert_device_log( - serial=device_id, ip=dev_ip, - event_type=event_type, content=content, - ) - except Exception as e: - logger.error(f"写入设备事件日志失败: {e}") + # ── Phase 3: 清理 _registry 中已经不存在的设备 ── + # (已注销 / 长期无交互的设备不清除,继续监控) except Exception as e: logger.error(f"设备状态监控异常: {e}") @@ -771,6 +774,40 @@ async def device_status_monitor(): await asyncio.sleep(MONITOR_INTERVAL) +async def _apply_state_change(device_id: str, new_state: int, old_state: int, + ip: str = ""): + """应用状态变更:更新内存、dnt_info、写入日志""" + _device_status[device_id] = new_state + logger.info( + f"设备 {device_id} 状态变更: " + f"{_state_name(old_state)} → {_state_name(new_state)}" + ) + + # 更新 dnt_info + try: + await update_device_status(device_id, new_state) + except Exception as e: + logger.error(f"更新设备状态失败: {e}") + + # 获取设备 IP(未传入时从 DB 查) + if not ip: + try: + dnt = await get_dnt_by_serial(device_id) + ip = dnt.get("ip", "") if dnt else "" + except Exception: + pass + + # 写入事件日志 + event_type, content = _state_event(new_state, old_state) + try: + await insert_device_log( + serial=device_id, ip=ip, + event_type=event_type, content=content, + ) + except Exception as e: + logger.error(f"写入设备事件日志失败: {e}") + + def _calc_device_state(interactions: deque, now: float) -> int: """根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)""" # 清理过期记录(仅保留 60s 内) diff --git a/src/models.py b/src/models.py index 877f915..b854b55 100644 --- a/src/models.py +++ b/src/models.py @@ -604,6 +604,17 @@ async def update_device_status(serial: str, state: int): ) +async def get_all_device_serials() -> list[tuple[str, int, str]]: + """获取所有设备 (serial, state, ip),用于状态扫描""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + "SELECT serial, state, ip FROM dnt_info ORDER BY id" + ) + return await cur.fetchall() + + # ─── tb_serialnet CRUD ───────────────────────────────────────────── async def get_pending_serialnet(dnt_id: int) -> dict | None: