From ef890fafc6a9daac4a334f9ed8014d8b4195c0f6 Mon Sep 17 00:00:00 2001 From: wangfq Date: Wed, 10 Jun 2026 09:14:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=AE=BE=E5=A4=87=E4=BA=8B=E4=BB=B6?= =?UTF-8?q?=E6=97=A5=E5=BF=97=20+=20=E5=9C=A8=E7=BA=BF/=E7=A6=BB=E7=BA=BF?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 tb_device_log 表 (device_serial, device_ip, event_type, event_content, create_time) - dnt_info.state 扩展为 0=离线 1=在线 2=通信不良 - handle_count_off 收到 Count_Off 后写入 login 事件日志 - 新增 device_status_monitor 后台任务,每 5s 检测设备状态: - 3次交互间隔均<10s → 在线 - 1分钟内<4次交互 → 通信不良 - >1分钟无交互 → 离线 - 状态变化时同步写入 tb_device_log + dnt_info - 所有设备交互点 (心跳/TSReport/SerialNet/解析成功) 均记录 interaction 时间戳 --- src/handlers.py | 147 ++++++++++++++++++++++++++++++++++++++++++++++++ src/models.py | 65 ++++++++++++++++++++- src/server.py | 2 + 3 files changed, 213 insertions(+), 1 deletion(-) diff --git a/src/handlers.py b/src/handlers.py index 66533bf..e1dc92c 100644 --- a/src/handlers.py +++ b/src/handlers.py @@ -12,6 +12,7 @@ import asyncio import json import logging import time +from collections import deque from datetime import datetime from src.models import ( @@ -29,6 +30,8 @@ from src.models import ( mark_serialnet_done, mark_serialnet_timeout, upsert_fixture_param, + insert_device_log, + update_device_status, ) from src.dg430 import ( parse_b2_status, @@ -58,6 +61,12 @@ _registry: dict[str, int] = {} # 设备心跳时间: {device_id: last_heartbeat} _heartbeat: dict[str, float] = {} +# 设备交互时间记录: {device_id: deque[float]} (最近 60s 内) +_interactions: dict[str, deque] = {} + +# 设备当前状态: {device_id: int} (0=离线 1=在线 2=通信不良) +_device_status: dict[str, int] = {} + # UDP transport 引用,由 server.py 注入 _udp_sender: object | None = None @@ -68,6 +77,18 @@ def set_udp_sender(sender): _udp_sender = sender +def record_interaction(device_id: str): + """记录一次设备交互(心跳/上报/解析成功)""" + now = time.time() + if device_id not in _interactions: + _interactions[device_id] = deque() + _interactions[device_id].append(now) + # 清理 120s 前的旧记录 + cutoff = now - 120 + while _interactions[device_id] and _interactions[device_id][0] < cutoff: + _interactions[device_id].popleft() + + async def handle_count_off(data: dict, addr: tuple): """处理设备登录/身份上报 (Count_Off 返回格式) @@ -125,9 +146,19 @@ async def handle_count_off(data: dict, addr: tuple): ) _registry[serial] = dnt_id _heartbeat[serial] = time.time() + record_interaction(serial) await ensure_collect_table(serial) + # 登录事件日志 + try: + await insert_device_log( + serial=serial, ip=dev_ip, event_type="login", + content=f"设备上线 type={dev_type} ver={dev_version}", + ) + except Exception: + pass + logger.info( f"设备登录: {serial} dnt_id={dnt_id} ip={dev_ip}:{dev_port} " f"type={dev_type} ver={dev_version}" @@ -143,6 +174,7 @@ async def handle_heartbeat(data: dict) -> str | None: return None _heartbeat[device_id] = time.time() + record_interaction(device_id) try: await insert_collect_data(device_id, 0, str(data)) @@ -186,6 +218,8 @@ async def handle_tsreport(data: dict) -> str | None: if not device_id or not sub_dat: return None + record_interaction(device_id) + try: await insert_collect_data(device_id, 8, sub_dat) except Exception as e: @@ -209,6 +243,8 @@ async def handle_serial_net(data: dict) -> str | None: if not device_id or not serial_dat: return None + record_interaction(device_id) + try: await insert_collect_data(device_id, 9, serial_dat) except Exception as e: @@ -409,6 +445,7 @@ async def parse_loop(): logger.debug(f"跳过未知/不支持指令: 0x{cmd:02X}") if has_valid: + record_interaction(device_id) await mark_record_state(device_id, rec["id"], state=1) elif all_failed and packets: await mark_record_state(device_id, rec["id"], state=3) @@ -667,3 +704,113 @@ async def _match_serial_cmd(dnt_id: int, cmd: int, raw_hex: str): ) except Exception as e: logger.warning(f"匹配 serialnet cmd 0x{cmd:02X} 失败: {e}") + + +# ─── 设备状态监控服务 ────────────────────────────────────────────────── + +# 在线判定参数 +INTERACTION_TIMEOUT = 10 # 单次交互超时判定 (秒) +ONLINE_MIN_INTERACTIONS = 3 # 连续几次交互在超时内即表示在线 +OFFLINE_IDLE_SEC = 60 # 超过此时间无交互 → 离线 +POOR_MIN_INTERACTIONS = 4 # 1 分钟内少于此次数 → 通信不良 +MONITOR_INTERVAL = 5 # 状态检查间隔 (秒) + + +async def device_status_monitor(): + """后台轮询:监控所有已注册设备在线/离线状态 + + 判定规则: + 1. > OFFLINE_IDLE_SEC 无交互 → 离线 + 2. 最近 OFFLINE_IDLE_SEC 内交互次数 < POOR_MIN_INTERACTIONS → 通信不良 + 3. 最近 3 次交互间隔均 < INTERACTION_TIMEOUT → 在线 + 4. 状态变化时写入 tb_device_log + 更新 dnt_info.state + """ + logger.info("设备状态监控服务启动") + await asyncio.sleep(5) # 等其它服务就绪 + + while True: + try: + now = time.time() + for device_id, dnt_id in list(_registry.items()): + interactions = _interactions.get(device_id) + if not interactions: + continue + + # 计算新状态 + new_state = _calc_device_state(interactions, now) + old_state = _device_status.get(device_id, 1) # 默认在线 + + if new_state != old_state: + _device_status[device_id] = new_state + logger.info( + f"设备 {device_id} 状态变更: " + f"{_state_name(old_state)} → {_state_name(new_state)}" + ) + + # 更新 dnt_info + try: + await update_device_status(device_id, new_state) + except Exception as e: + logger.error(f"更新设备状态失败: {e}") + + # 写入事件日志 + dnt = await get_dnt_by_serial(device_id) + dev_ip = dnt.get("ip", "") if dnt else "" + event_type, content = _state_event(new_state, old_state) + try: + await insert_device_log( + serial=device_id, ip=dev_ip, + event_type=event_type, content=content, + ) + except Exception as e: + logger.error(f"写入设备事件日志失败: {e}") + + except Exception as e: + logger.error(f"设备状态监控异常: {e}") + + await asyncio.sleep(MONITOR_INTERVAL) + + +def _calc_device_state(interactions: deque, now: float) -> int: + """根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)""" + # 清理过期记录(仅保留 60s 内) + cutoff = now - OFFLINE_IDLE_SEC + recent = [t for t in interactions if t >= cutoff] + + if not recent: + return 0 # 离线 + + # 最近一次交互距今 + last_interaction = recent[-1] + if now - last_interaction > OFFLINE_IDLE_SEC: + return 0 # 离线 + + # 1 分钟内交互次数 + if len(recent) < POOR_MIN_INTERACTIONS: + return 2 # 通信不良 + + # 最近 3 次交互间隔是否都在超时内 + if len(recent) >= ONLINE_MIN_INTERACTIONS: + last_n = recent[-ONLINE_MIN_INTERACTIONS:] + gaps = [last_n[i] - last_n[i - 1] for i in range(1, len(last_n))] + if gaps and all(g <= INTERACTION_TIMEOUT for g in gaps): + return 1 # 在线 + + # 默认:有交互但不够密集 → 通信不良 + return 2 + + +def _state_name(state: int) -> str: + return {0: "离线", 1: "在线", 2: "通信不良"}.get(state, f"未知({state})") + + +def _state_event(new_state: int, old_state: int) -> tuple[str, str]: + """根据状态变化生成事件类型和内容""" + name_new = _state_name(new_state) + name_old = _state_name(old_state) + if new_state == 0: + return "offline", f"设备离线(上次状态: {name_old})" + elif new_state == 1: + return "online", f"设备已上线(上次状态: {name_old})" + else: + return "poor", f"设备通信不良(上次状态: {name_old})" diff --git a/src/models.py b/src/models.py index 810272d..877f915 100644 --- a/src/models.py +++ b/src/models.py @@ -320,6 +320,30 @@ async def _create_tables(pool: aiomysql.Pool): except Exception: pass + # 11. 设备事件日志表 + await cur.execute(""" + CREATE TABLE IF NOT EXISTS `tb_device_log` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `device_serial` VARCHAR(45) NOT NULL COMMENT '设备序列号', + `device_ip` VARCHAR(45) DEFAULT '' COMMENT '设备IP', + `event_type` VARCHAR(30) NOT NULL COMMENT 'login/online/offline/poor', + `event_content` VARCHAR(500) DEFAULT '' COMMENT '事件详情', + `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, + INDEX `idx_serial` (`device_serial`), + INDEX `idx_event_type` (`event_type`), + INDEX `idx_create_time` (`create_time`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + + # V2.2.0 迁移:扩展 dnt_info 状态值(0=离线 1=在线 2=通信不良) + try: + await cur.execute( + "ALTER TABLE dnt_info MODIFY COLUMN `state` TINYINT DEFAULT 0 " + "COMMENT '0 offline, 1 online, 2 poor'" + ) + except Exception: + pass + logger.info("数据库表初始化完成") @@ -531,7 +555,7 @@ async def insert_wave_data(dnt_id: int, dpg430_addr: int, async def set_device_offline(serial: str): - """标记设备离线""" + """标记设备离线(保持向后兼容)""" pool = await get_pool() async with pool.acquire() as conn: async with conn.cursor() as cur: @@ -541,6 +565,45 @@ async def set_device_offline(serial: str): ) +# ─── tb_device_log + 设备状态 ─────────────────────────────────────── + +async def insert_device_log(serial: str, ip: str, event_type: str, + content: str = ""): + """插入设备事件日志""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + "INSERT INTO tb_device_log (device_serial, device_ip, " + "event_type, event_content) VALUES (%s,%s,%s,%s)", + (serial, ip, event_type, content), + ) + + +async def update_device_status(serial: str, state: int): + """更新 dnt_info 设备状态(0=离线 1=在线 2=通信不良)""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + if state == 0: + await cur.execute( + "UPDATE dnt_info SET state=0, last_off=NOW() " + "WHERE serial=%s AND state != 0", + (serial,), + ) + elif state == 1: + await cur.execute( + "UPDATE dnt_info SET state=1, last_login=NOW() " + "WHERE serial=%s AND state != 1", + (serial,), + ) + else: + await cur.execute( + "UPDATE dnt_info SET state=%s WHERE serial=%s", + (state, serial), + ) + + # ─── tb_serialnet CRUD ───────────────────────────────────────────── async def get_pending_serialnet(dnt_id: int) -> dict | None: diff --git a/src/server.py b/src/server.py index 73d07af..c5c4711 100644 --- a/src/server.py +++ b/src/server.py @@ -34,6 +34,7 @@ from src.handlers import ( parse_loop, serialnet_loop, serialnet_response_loop, + device_status_monitor, set_udp_sender, ) @@ -187,6 +188,7 @@ async def main(): asyncio.create_task(parse_loop()) asyncio.create_task(serialnet_loop()) asyncio.create_task(serialnet_response_loop()) + asyncio.create_task(device_status_monitor()) loop = asyncio.get_running_loop()