feat: 设备事件日志 + 在线/离线状态监控
- 新增 tb_device_log 表 (device_serial, device_ip, event_type, event_content, create_time) - dnt_info.state 扩展为 0=离线 1=在线 2=通信不良 - handle_count_off 收到 Count_Off 后写入 login 事件日志 - 新增 device_status_monitor 后台任务,每 5s 检测设备状态: - 3次交互间隔均<10s → 在线 - 1分钟内<4次交互 → 通信不良 - >1分钟无交互 → 离线 - 状态变化时同步写入 tb_device_log + dnt_info - 所有设备交互点 (心跳/TSReport/SerialNet/解析成功) 均记录 interaction 时间戳
This commit is contained in:
147
src/handlers.py
147
src/handlers.py
@@ -12,6 +12,7 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
from collections import deque
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from src.models import (
|
from src.models import (
|
||||||
@@ -29,6 +30,8 @@ from src.models import (
|
|||||||
mark_serialnet_done,
|
mark_serialnet_done,
|
||||||
mark_serialnet_timeout,
|
mark_serialnet_timeout,
|
||||||
upsert_fixture_param,
|
upsert_fixture_param,
|
||||||
|
insert_device_log,
|
||||||
|
update_device_status,
|
||||||
)
|
)
|
||||||
from src.dg430 import (
|
from src.dg430 import (
|
||||||
parse_b2_status,
|
parse_b2_status,
|
||||||
@@ -58,6 +61,12 @@ _registry: dict[str, int] = {}
|
|||||||
# 设备心跳时间: {device_id: last_heartbeat}
|
# 设备心跳时间: {device_id: last_heartbeat}
|
||||||
_heartbeat: dict[str, float] = {}
|
_heartbeat: dict[str, float] = {}
|
||||||
|
|
||||||
|
# 设备交互时间记录: {device_id: deque[float]} (最近 60s 内)
|
||||||
|
_interactions: dict[str, deque] = {}
|
||||||
|
|
||||||
|
# 设备当前状态: {device_id: int} (0=离线 1=在线 2=通信不良)
|
||||||
|
_device_status: dict[str, int] = {}
|
||||||
|
|
||||||
# UDP transport 引用,由 server.py 注入
|
# UDP transport 引用,由 server.py 注入
|
||||||
_udp_sender: object | None = None
|
_udp_sender: object | None = None
|
||||||
|
|
||||||
@@ -68,6 +77,18 @@ def set_udp_sender(sender):
|
|||||||
_udp_sender = sender
|
_udp_sender = sender
|
||||||
|
|
||||||
|
|
||||||
|
def record_interaction(device_id: str):
|
||||||
|
"""记录一次设备交互(心跳/上报/解析成功)"""
|
||||||
|
now = time.time()
|
||||||
|
if device_id not in _interactions:
|
||||||
|
_interactions[device_id] = deque()
|
||||||
|
_interactions[device_id].append(now)
|
||||||
|
# 清理 120s 前的旧记录
|
||||||
|
cutoff = now - 120
|
||||||
|
while _interactions[device_id] and _interactions[device_id][0] < cutoff:
|
||||||
|
_interactions[device_id].popleft()
|
||||||
|
|
||||||
|
|
||||||
async def handle_count_off(data: dict, addr: tuple):
|
async def handle_count_off(data: dict, addr: tuple):
|
||||||
"""处理设备登录/身份上报 (Count_Off 返回格式)
|
"""处理设备登录/身份上报 (Count_Off 返回格式)
|
||||||
|
|
||||||
@@ -125,9 +146,19 @@ async def handle_count_off(data: dict, addr: tuple):
|
|||||||
)
|
)
|
||||||
_registry[serial] = dnt_id
|
_registry[serial] = dnt_id
|
||||||
_heartbeat[serial] = time.time()
|
_heartbeat[serial] = time.time()
|
||||||
|
record_interaction(serial)
|
||||||
|
|
||||||
await ensure_collect_table(serial)
|
await ensure_collect_table(serial)
|
||||||
|
|
||||||
|
# 登录事件日志
|
||||||
|
try:
|
||||||
|
await insert_device_log(
|
||||||
|
serial=serial, ip=dev_ip, event_type="login",
|
||||||
|
content=f"设备上线 type={dev_type} ver={dev_version}",
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"设备登录: {serial} dnt_id={dnt_id} ip={dev_ip}:{dev_port} "
|
f"设备登录: {serial} dnt_id={dnt_id} ip={dev_ip}:{dev_port} "
|
||||||
f"type={dev_type} ver={dev_version}"
|
f"type={dev_type} ver={dev_version}"
|
||||||
@@ -143,6 +174,7 @@ async def handle_heartbeat(data: dict) -> str | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
_heartbeat[device_id] = time.time()
|
_heartbeat[device_id] = time.time()
|
||||||
|
record_interaction(device_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await insert_collect_data(device_id, 0, str(data))
|
await insert_collect_data(device_id, 0, str(data))
|
||||||
@@ -186,6 +218,8 @@ async def handle_tsreport(data: dict) -> str | None:
|
|||||||
if not device_id or not sub_dat:
|
if not device_id or not sub_dat:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
record_interaction(device_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await insert_collect_data(device_id, 8, sub_dat)
|
await insert_collect_data(device_id, 8, sub_dat)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -209,6 +243,8 @@ async def handle_serial_net(data: dict) -> str | None:
|
|||||||
if not device_id or not serial_dat:
|
if not device_id or not serial_dat:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
record_interaction(device_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await insert_collect_data(device_id, 9, serial_dat)
|
await insert_collect_data(device_id, 9, serial_dat)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -409,6 +445,7 @@ async def parse_loop():
|
|||||||
logger.debug(f"跳过未知/不支持指令: 0x{cmd:02X}")
|
logger.debug(f"跳过未知/不支持指令: 0x{cmd:02X}")
|
||||||
|
|
||||||
if has_valid:
|
if has_valid:
|
||||||
|
record_interaction(device_id)
|
||||||
await mark_record_state(device_id, rec["id"], state=1)
|
await mark_record_state(device_id, rec["id"], state=1)
|
||||||
elif all_failed and packets:
|
elif all_failed and packets:
|
||||||
await mark_record_state(device_id, rec["id"], state=3)
|
await mark_record_state(device_id, rec["id"], state=3)
|
||||||
@@ -667,3 +704,113 @@ async def _match_serial_cmd(dnt_id: int, cmd: int, raw_hex: str):
|
|||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"匹配 serialnet cmd 0x{cmd:02X} 失败: {e}")
|
logger.warning(f"匹配 serialnet cmd 0x{cmd:02X} 失败: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 设备状态监控服务 ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
# 在线判定参数
|
||||||
|
INTERACTION_TIMEOUT = 10 # 单次交互超时判定 (秒)
|
||||||
|
ONLINE_MIN_INTERACTIONS = 3 # 连续几次交互在超时内即表示在线
|
||||||
|
OFFLINE_IDLE_SEC = 60 # 超过此时间无交互 → 离线
|
||||||
|
POOR_MIN_INTERACTIONS = 4 # 1 分钟内少于此次数 → 通信不良
|
||||||
|
MONITOR_INTERVAL = 5 # 状态检查间隔 (秒)
|
||||||
|
|
||||||
|
|
||||||
|
async def device_status_monitor():
|
||||||
|
"""后台轮询:监控所有已注册设备在线/离线状态
|
||||||
|
|
||||||
|
判定规则:
|
||||||
|
1. > OFFLINE_IDLE_SEC 无交互 → 离线
|
||||||
|
2. 最近 OFFLINE_IDLE_SEC 内交互次数 < POOR_MIN_INTERACTIONS → 通信不良
|
||||||
|
3. 最近 3 次交互间隔均 < INTERACTION_TIMEOUT → 在线
|
||||||
|
4. 状态变化时写入 tb_device_log + 更新 dnt_info.state
|
||||||
|
"""
|
||||||
|
logger.info("设备状态监控服务启动")
|
||||||
|
await asyncio.sleep(5) # 等其它服务就绪
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
now = time.time()
|
||||||
|
for device_id, dnt_id in list(_registry.items()):
|
||||||
|
interactions = _interactions.get(device_id)
|
||||||
|
if not interactions:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 计算新状态
|
||||||
|
new_state = _calc_device_state(interactions, now)
|
||||||
|
old_state = _device_status.get(device_id, 1) # 默认在线
|
||||||
|
|
||||||
|
if new_state != old_state:
|
||||||
|
_device_status[device_id] = new_state
|
||||||
|
logger.info(
|
||||||
|
f"设备 {device_id} 状态变更: "
|
||||||
|
f"{_state_name(old_state)} → {_state_name(new_state)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 更新 dnt_info
|
||||||
|
try:
|
||||||
|
await update_device_status(device_id, new_state)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"更新设备状态失败: {e}")
|
||||||
|
|
||||||
|
# 写入事件日志
|
||||||
|
dnt = await get_dnt_by_serial(device_id)
|
||||||
|
dev_ip = dnt.get("ip", "") if dnt else ""
|
||||||
|
event_type, content = _state_event(new_state, old_state)
|
||||||
|
try:
|
||||||
|
await insert_device_log(
|
||||||
|
serial=device_id, ip=dev_ip,
|
||||||
|
event_type=event_type, content=content,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"写入设备事件日志失败: {e}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"设备状态监控异常: {e}")
|
||||||
|
|
||||||
|
await asyncio.sleep(MONITOR_INTERVAL)
|
||||||
|
|
||||||
|
|
||||||
|
def _calc_device_state(interactions: deque, now: float) -> int:
|
||||||
|
"""根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)"""
|
||||||
|
# 清理过期记录(仅保留 60s 内)
|
||||||
|
cutoff = now - OFFLINE_IDLE_SEC
|
||||||
|
recent = [t for t in interactions if t >= cutoff]
|
||||||
|
|
||||||
|
if not recent:
|
||||||
|
return 0 # 离线
|
||||||
|
|
||||||
|
# 最近一次交互距今
|
||||||
|
last_interaction = recent[-1]
|
||||||
|
if now - last_interaction > OFFLINE_IDLE_SEC:
|
||||||
|
return 0 # 离线
|
||||||
|
|
||||||
|
# 1 分钟内交互次数
|
||||||
|
if len(recent) < POOR_MIN_INTERACTIONS:
|
||||||
|
return 2 # 通信不良
|
||||||
|
|
||||||
|
# 最近 3 次交互间隔是否都在超时内
|
||||||
|
if len(recent) >= ONLINE_MIN_INTERACTIONS:
|
||||||
|
last_n = recent[-ONLINE_MIN_INTERACTIONS:]
|
||||||
|
gaps = [last_n[i] - last_n[i - 1] for i in range(1, len(last_n))]
|
||||||
|
if gaps and all(g <= INTERACTION_TIMEOUT for g in gaps):
|
||||||
|
return 1 # 在线
|
||||||
|
|
||||||
|
# 默认:有交互但不够密集 → 通信不良
|
||||||
|
return 2
|
||||||
|
|
||||||
|
|
||||||
|
def _state_name(state: int) -> str:
|
||||||
|
return {0: "离线", 1: "在线", 2: "通信不良"}.get(state, f"未知({state})")
|
||||||
|
|
||||||
|
|
||||||
|
def _state_event(new_state: int, old_state: int) -> tuple[str, str]:
|
||||||
|
"""根据状态变化生成事件类型和内容"""
|
||||||
|
name_new = _state_name(new_state)
|
||||||
|
name_old = _state_name(old_state)
|
||||||
|
if new_state == 0:
|
||||||
|
return "offline", f"设备离线(上次状态: {name_old})"
|
||||||
|
elif new_state == 1:
|
||||||
|
return "online", f"设备已上线(上次状态: {name_old})"
|
||||||
|
else:
|
||||||
|
return "poor", f"设备通信不良(上次状态: {name_old})"
|
||||||
|
|||||||
@@ -320,6 +320,30 @@ async def _create_tables(pool: aiomysql.Pool):
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# 11. 设备事件日志表
|
||||||
|
await cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS `tb_device_log` (
|
||||||
|
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
||||||
|
`device_serial` VARCHAR(45) NOT NULL COMMENT '设备序列号',
|
||||||
|
`device_ip` VARCHAR(45) DEFAULT '' COMMENT '设备IP',
|
||||||
|
`event_type` VARCHAR(30) NOT NULL COMMENT 'login/online/offline/poor',
|
||||||
|
`event_content` VARCHAR(500) DEFAULT '' COMMENT '事件详情',
|
||||||
|
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
INDEX `idx_serial` (`device_serial`),
|
||||||
|
INDEX `idx_event_type` (`event_type`),
|
||||||
|
INDEX `idx_create_time` (`create_time`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
|
""")
|
||||||
|
|
||||||
|
# V2.2.0 迁移:扩展 dnt_info 状态值(0=离线 1=在线 2=通信不良)
|
||||||
|
try:
|
||||||
|
await cur.execute(
|
||||||
|
"ALTER TABLE dnt_info MODIFY COLUMN `state` TINYINT DEFAULT 0 "
|
||||||
|
"COMMENT '0 offline, 1 online, 2 poor'"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.info("数据库表初始化完成")
|
logger.info("数据库表初始化完成")
|
||||||
|
|
||||||
|
|
||||||
@@ -531,7 +555,7 @@ async def insert_wave_data(dnt_id: int, dpg430_addr: int,
|
|||||||
|
|
||||||
|
|
||||||
async def set_device_offline(serial: str):
|
async def set_device_offline(serial: str):
|
||||||
"""标记设备离线"""
|
"""标记设备离线(保持向后兼容)"""
|
||||||
pool = await get_pool()
|
pool = await get_pool()
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
@@ -541,6 +565,45 @@ async def set_device_offline(serial: str):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── tb_device_log + 设备状态 ───────────────────────────────────────
|
||||||
|
|
||||||
|
async def insert_device_log(serial: str, ip: str, event_type: str,
|
||||||
|
content: str = ""):
|
||||||
|
"""插入设备事件日志"""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"INSERT INTO tb_device_log (device_serial, device_ip, "
|
||||||
|
"event_type, event_content) VALUES (%s,%s,%s,%s)",
|
||||||
|
(serial, ip, event_type, content),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def update_device_status(serial: str, state: int):
|
||||||
|
"""更新 dnt_info 设备状态(0=离线 1=在线 2=通信不良)"""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
if state == 0:
|
||||||
|
await cur.execute(
|
||||||
|
"UPDATE dnt_info SET state=0, last_off=NOW() "
|
||||||
|
"WHERE serial=%s AND state != 0",
|
||||||
|
(serial,),
|
||||||
|
)
|
||||||
|
elif state == 1:
|
||||||
|
await cur.execute(
|
||||||
|
"UPDATE dnt_info SET state=1, last_login=NOW() "
|
||||||
|
"WHERE serial=%s AND state != 1",
|
||||||
|
(serial,),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await cur.execute(
|
||||||
|
"UPDATE dnt_info SET state=%s WHERE serial=%s",
|
||||||
|
(state, serial),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# ─── tb_serialnet CRUD ─────────────────────────────────────────────
|
# ─── tb_serialnet CRUD ─────────────────────────────────────────────
|
||||||
|
|
||||||
async def get_pending_serialnet(dnt_id: int) -> dict | None:
|
async def get_pending_serialnet(dnt_id: int) -> dict | None:
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ from src.handlers import (
|
|||||||
parse_loop,
|
parse_loop,
|
||||||
serialnet_loop,
|
serialnet_loop,
|
||||||
serialnet_response_loop,
|
serialnet_response_loop,
|
||||||
|
device_status_monitor,
|
||||||
set_udp_sender,
|
set_udp_sender,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -187,6 +188,7 @@ async def main():
|
|||||||
asyncio.create_task(parse_loop())
|
asyncio.create_task(parse_loop())
|
||||||
asyncio.create_task(serialnet_loop())
|
asyncio.create_task(serialnet_loop())
|
||||||
asyncio.create_task(serialnet_response_loop())
|
asyncio.create_task(serialnet_response_loop())
|
||||||
|
asyncio.create_task(device_status_monitor())
|
||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user