Files
edc_server/src/handlers.py
wangfq 6e13990386 fix: 设备型号名称改为从 db tb_vechicle_base_test 动态查询,修复新增型号显示 Unknown 的问题
- handlers.py: B2 数据硬编码 dev_model_map {1:PD132,2:DLD110} → await get_dev_type_name()
- models.py: B4 波动数据硬编码 map → 同上
- models.py: 新增 get_dev_type_name() 带内存缓存,首次加载后缓存 type_num→dev_name
- models.py: 新增 refresh_dev_type_names() 供工装配置页新增型号后刷新缓存
2026-06-12 10:00:25 +08:00

845 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""业务逻辑处理 — 设备登录、数据采集、解析调度
流程 (参考 EDC服务.md):
1. 设备 TCP 连接 → 上报 TimeStamp 请求 → EDC 返回时间同步
2. 设备 UDP 上报 Count_Off (设备信息) → EDC 处理注册,不回复
3. 设备通过 TCP/UDP 上报 TSReport/SerialNet → 存入采集表
4. 后台解析服务轮询采集表 → 解析 DG430 协议 → 写入 tb_state_tst
5. 透传轮询服务轮询 tb_serialnet → 下发 SerialNet 指令
"""
import asyncio
import json
import logging
import time
from collections import deque
from datetime import datetime
from src.models import (
upsert_dnt,
ensure_collect_table,
insert_collect_data,
fetch_unparsed,
fetch_unparsed_serial,
mark_record_state,
insert_test_result,
insert_wave_data,
get_dnt_by_serial,
get_pending_serialnet,
mark_serialnet_sent,
mark_serialnet_done,
mark_serialnet_timeout,
upsert_fixture_param,
insert_device_log,
update_device_status,
get_all_device_serials,
get_dev_type_name,
)
from src.dg430 import (
parse_b2_status,
parse_4a_version,
parse_flag_response,
parse_4c_params,
parse_b4_wave_status,
get_packet_cmd,
hex_str_to_bytes,
split_packets,
verify_packet,
decode_fault_info,
decode_relay_info,
)
from src.protocol import (
make_timestamp_response,
make_heartbeat_response,
make_error_response,
make_response,
)
logger = logging.getLogger(__name__)
# 已注册设备: {device_id: dnt_id}
_registry: dict[str, int] = {}
# 设备心跳时间: {device_id: last_heartbeat}
_heartbeat: dict[str, float] = {}
# 设备交互时间记录: {device_id: deque[float]} (最近 60s 内)
_interactions: dict[str, deque] = {}
# 设备当前状态: {device_id: int} (0=离线 1=在线 2=通信不良)
_device_status: dict[str, int] = {}
# UDP transport 引用,由 server.py 注入
_udp_sender: object | None = None
def set_udp_sender(sender):
"""设置 UDP 发送函数(由 server.py 调用)"""
global _udp_sender
_udp_sender = sender
def record_interaction(device_id: str):
"""记录一次设备交互(心跳/上报/解析成功)"""
now = time.time()
if device_id not in _interactions:
_interactions[device_id] = deque()
_interactions[device_id].append(now)
# 清理 120s 前的旧记录
cutoff = now - 120
while _interactions[device_id] and _interactions[device_id][0] < cutoff:
_interactions[device_id].popleft()
async def handle_count_off(data: dict, addr: tuple):
"""处理设备登录/身份上报 (Count_Off 返回格式)
设备 TCP 同步时间后,通过 UDP 上报设备详细信息。
主机收到后处理注册流程,**不回复设备**。
Count_Off 设备返回格式 (来自 PGLC 协议):
{
"Method": "Count_Off",
"Code": 0,
"Message": "success",
"Data": {
"Ip": "192.168.1.188",
"Port": 5500,
"PortMsg": 5505,
"Mac": "",
"SubnetMask": "...",
"Gateway": "...",
"Server_Ip": "...",
"Iot_Host": "...",
"Iot_Port": 1883,
"Device_id": "1234",
"Device_num": "1234",
"Device_Type": "30",
"Version": "0.1.0",
...
}
}
"""
dev = data.get("Data", {})
serial = dev.get("Device_id") or dev.get("Device_num", "")
if not serial:
logger.warning(f"Count_Off 缺少 Device_id, 来自 {addr}")
return
dev_ip = dev.get("Ip", addr[0])
dev_port = dev.get("Port", 0) or addr[1]
dev_mac = dev.get("Mac", "")
dev_subnet = dev.get("SubnetMask", "")
dev_gateway = dev.get("Gateway", "")
dev_msgport = dev.get("PortMsg", 0)
dev_version = dev.get("Version", "")
dev_type = dev.get("Device_Type", "30")
dnt_id = await upsert_dnt(
serial=serial,
ip=dev_ip,
port=dev_port,
mac=dev_mac,
subnet=dev_subnet,
gateway=dev_gateway,
msgport=dev_msgport,
version=dev_version,
dtype=dev_type,
)
_registry[serial] = dnt_id
_heartbeat[serial] = time.time()
record_interaction(serial)
_device_status[serial] = 1 # 登录即视为在线状态
await ensure_collect_table(serial)
# 登录事件日志
try:
await insert_device_log(
serial=serial, ip=dev_ip, event_type="login",
content=f"设备上线 type={dev_type} ver={dev_version}",
)
except Exception:
pass
logger.info(
f"设备登录: {serial} dnt_id={dnt_id} ip={dev_ip}:{dev_port} "
f"type={dev_type} ver={dev_version}"
)
# 不回复设备
async def handle_heartbeat(data: dict) -> str | None:
"""处理心跳包"""
params = data.get("Params", {})
device_id = params.get("Device_id", "")
if not device_id:
return None
_heartbeat[device_id] = time.time()
record_interaction(device_id)
try:
await insert_collect_data(device_id, 0, str(data))
except Exception:
pass
return make_heartbeat_response(device_id, int(time.time()))
def handle_timestamp(data: dict) -> str:
"""处理时间同步请求(也是设备交互)"""
params = data.get("Params", {})
device_id = params.get("Device_id", "")
if device_id:
record_interaction(device_id)
return make_timestamp_response(device_id, int(time.time()))
async def handle_tsreport(data: dict) -> str | None:
"""处理设备主动上报子设备传感数据
TSReport 中的 Sub_Dat 是 DG430 二进制上报数据(hex 字符串)
存入采集表 (dat_type=8),由解析服务异步处理。
返回确认消息。
"""
device_id = data.get("Device_id", "")
sensor_dat = data.get("Sensor_Dat", {})
sub_dat = sensor_dat.get("Sub_Dat", "")
if not device_id or not sub_dat:
return None
record_interaction(device_id)
try:
await insert_collect_data(device_id, 8, sub_dat)
except Exception as e:
logger.error(f"存储 TSReport 失败: {e}")
return make_response("TSReport", 0, device_id)
async def handle_serial_net(data: dict) -> str | None:
"""处理串口透传返回数据
SerialNet 中 SerialDat 是串口设备应答数据(hex 字符串)
存入采集表 (dat_type=9)。
返回确认消息。
"""
params = data.get("Params") or data
device_id = params.get("Device_id", "")
serial_dat = params.get("SerialDat", "")
if not device_id or not serial_dat:
return None
record_interaction(device_id)
try:
await insert_collect_data(device_id, 9, serial_dat)
except Exception as e:
logger.error(f"存储 SerialNet 失败: {e}")
return make_response("SerialNet", 0, device_id)
# ─── 业务解析服务(后台轮询)─────────────────────────────────────────
async def parse_loop():
"""后台轮询:解析未处理的 DG430 上报数据
每条 raw_data 可能包含多条拼接的 DG430 指令。
从中找出 B2 (状态上报) 指令解析,其余指令忽略。
校验失败的记录标记为 state=3。
解析成功的 B2 会匹配 tb_serialnet 中的待确认记录。
"""
logger.info("业务解析服务启动")
while True:
try:
for device_id, dnt_id in list(_registry.items()):
records = await fetch_unparsed(device_id)
for rec in records:
try:
raw = rec["raw_data"]
pkt_bytes = hex_str_to_bytes(raw)
# 拆分拼接的多个数据包
packets = split_packets(pkt_bytes)
if not packets:
logger.warning(f"无法拆分数据包: {device_id} rec={rec['id']}")
await mark_record_state(device_id, rec["id"], state=3)
continue
has_valid = False
all_failed = True
for pkt in packets:
cmd = pkt[3] if len(pkt) > 3 else 0
# ── B2 状态上报 ──
if cmd == 0xB2:
if not verify_packet(pkt):
logger.warning(f"B2 数据包校验失败: {device_id} rec={rec['id']}")
continue
status = parse_b2_status(pkt)
if status is None:
logger.warning(f"B2 解析失败: {device_id} rec={rec['id']}")
continue
fault_info = decode_fault_info(status.fault)
relay_info = decode_relay_info(status.relay_out)
str_type = await get_dev_type_name(status.dev_model)
await insert_test_result(
dnt_id=dnt_id,
dpg430_addr=status.addr,
pcnum=datetime.now().strftime("%Y%m"),
serialnum=0,
sub_type=status.dev_model,
str_type=str_type,
iffinish="1" if status.is_finished else "0",
fault_info=fault_info,
relay_out=relay_info,
ppvalue=status.ppvalue,
idle_freq=status.idle_freq,
enter_freq=status.enter_freq,
exit_freq=status.exit_freq,
enter_dist=status.enter_dist,
exit_dist=status.exit_dist,
enter_speed=status.enter_speed,
exit_speed=status.exit_speed,
test_mode=status.test_mode,
relay_code=status.relay_out,
)
# 匹配 tb_serialnet 中的待确认记录
await _match_serialnet_response(dnt_id, raw)
has_valid = True
all_failed = False
logger.info(
f"解析完成: {device_id} dg430={status.addr} "
f"型号={str_type} 峰峰值={status.ppvalue:.2f}V "
f"进入高度={status.enter_dist}mm 故障={fault_info}"
)
# ── 0x4C 查询工装参数响应 ──
elif cmd == 0x4C:
if not verify_packet(pkt):
logger.warning(f"0x4C 数据包校验失败: {device_id}")
continue
params = parse_4c_params(pkt)
if params is None:
logger.warning(f"0x4C 解析失败: {device_id}")
continue
if params.flag == 0:
await upsert_fixture_param(
dnt_id,
Addr=params.dev_addr,
DevType=params.dev_type,
TestMode=params.test_mode,
RestDis=params.reset_dis,
MinusDis=params.minus_dis,
SensMin=params.sens_min,
SensMax=params.sens_max,
FreMin=params.fre_min,
FreMax=params.fre_max,
PeakMin=params.peak_min,
PeakMax=params.peak_max,
FarTol=params.far_tol,
NearTol=params.near_tol,
StepTol=params.step_tol,
BackForth=params.back_forth,
NearStay=params.near_stay,
FarStay=params.far_stay,
)
logger.info(
f"0x4C 工装参数已更新 dnt_id={dnt_id} "
f"DevType={params.dev_type} TestMode={params.test_mode}"
)
await _match_serialnet_response(dnt_id, raw)
has_valid = True
all_failed = False
# ── 0xB4 波动测试上报 ──
elif cmd == 0xB4:
if not verify_packet(pkt):
logger.debug(f"0xB4 数据包校验失败: {device_id}")
continue
wave = parse_b4_wave_status(pkt)
if wave is None:
logger.warning(f"0xB4 解析失败: {device_id}")
continue
relay_info = decode_relay_info(wave.relay_out)
await insert_wave_data(
dnt_id=dnt_id,
dpg430_addr=wave.addr,
remain_count=wave.remain_count,
relay_code=wave.relay_out,
relay_out=relay_info,
work_freq=wave.work_freq,
curr_dist=wave.curr_dist,
speed=wave.speed,
near_dist=wave.near_dist,
far_dist=wave.far_dist,
enter_dist=wave.enter_dist,
leave_dist=wave.leave_dist,
)
logger.info(
f"B4波动上报: {device_id} 剩余={wave.remain_count} "
f"当前距离={wave.curr_dist}mm 速度={wave.speed}dm/s "
f"最近={wave.near_dist}mm 最远={wave.far_dist}mm "
f"进入={wave.enter_dist}mm 离开={wave.leave_dist}mm "
f"继电器={relay_info}"
)
has_valid = True
all_failed = False
# ── 测试指令 Flag 响应(不匹配 serialnet等待 B2 完成)──
elif cmd in (0xB0, 0xB1, 0xBA, 0xBB, 0xBC):
if not verify_packet(pkt):
logger.debug(f"测试指令 0x{cmd:02X} 校验失败, "
f"rec={rec['id']}")
continue
# 不匹配 serialnet — B2 状态上报才标记完成
has_valid = True
all_failed = False
logger.info(
f"测试指令 0x{cmd:02X} Flag 响应 dnt_id={dnt_id}"
)
# ── 配置指令响应 ──
elif cmd in (0x4A, 0x4B, 0x4D, 0x4E):
if not verify_packet(pkt):
logger.debug(f"配置指令 0x{cmd:02X} 校验失败, "
f"跳过 rec={rec['id']}")
continue
await _match_serialnet_response(dnt_id, raw)
has_valid = True
all_failed = False
logger.info(
f"配置指令 0x{cmd:02X} 响应已匹配 "
f"dnt_id={dnt_id}"
)
else:
logger.debug(f"跳过未知/不支持指令: 0x{cmd:02X}")
if has_valid:
record_interaction(device_id)
await mark_record_state(device_id, rec["id"], state=1)
elif all_failed and packets:
await mark_record_state(device_id, rec["id"], state=3)
logger.warning(f"记录 {rec['id']} 所有包校验失败, state=3")
else:
await mark_record_state(device_id, rec["id"], state=1)
except Exception as e:
logger.error(f"解析记录 {rec['id']} 异常: {e}", exc_info=True)
try:
await mark_record_state(device_id, rec["id"], state=3)
except Exception:
pass
except Exception as e:
logger.error(f"解析循环异常: {e}")
await asyncio.sleep(0.5)
async def _match_serialnet_response(dnt_id: int, raw_hex: str):
"""收到 B2 响应后,匹配 tb_serialnet 中 state=1 的第一条记录"""
try:
from src.models import get_pool
import aiomysql
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 "
"ORDER BY id ASC LIMIT 1",
(dnt_id,),
)
row = await cur.fetchone()
if row:
await mark_serialnet_done(row["id"], raw_hex)
logger.info(f"tb_serialnet #{row['id']} 已确认完成 (dnt_id={dnt_id})")
except Exception as e:
logger.warning(f"匹配 serialnet 响应失败: {e}")
# ─── 透传指令轮询服务 ───────────────────────────────────────────────
SERIALNET_TIMEOUT = 10 # 秒
async def serialnet_loop():
"""后台轮询:检查 tb_serialnet 待发送指令,通过 UDP 下发
直接查询 tb_serialnet 表(不依赖 _registry
1. state=0 → 发送 SerialNet JSON → state=1
2. state=1 且超过 10 秒 → state=3 (超时失败)
"""
logger.info("透传轮询服务启动")
await asyncio.sleep(2) # 等 UDP transport 就绪
while True:
try:
# 1. 查询所有 state=0 的记录
await _process_pending_all()
# 2. 查询所有 state=1 超时的记录
await _check_timeout_all()
except Exception as e:
logger.error(f"透传轮询异常: {e}")
await asyncio.sleep(0.2)
async def _process_pending_all():
"""查询所有 state=0 的记录并发送"""
from src.models import get_pool
import aiomysql
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT sn.*, d.serial, d.ip FROM tb_serialnet sn "
"JOIN dnt_info d ON sn.dnt_id = d.id "
"WHERE sn.state = 0 ORDER BY sn.id ASC LIMIT 10"
)
rows = await cur.fetchall()
for row in rows:
device_id = row["serial"]
dnt_id = row["dnt_id"]
if not row.get("ip"):
logger.warning(f"设备 {device_id} 无 IP跳过")
await mark_serialnet_timeout(row["id"])
continue
await _send_serialnet_cmd(device_id, dnt_id, row)
async def _check_timeout_all():
"""检查所有 state=1 超时记录"""
from src.models import get_pool
import aiomysql
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT id, dnt_id FROM tb_serialnet WHERE state=1 "
"AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) "
"LIMIT 20",
(SERIALNET_TIMEOUT,),
)
rows = await cur.fetchall()
for row in rows:
await mark_serialnet_timeout(row["id"])
logger.warning(f"tb_serialnet #{row['id']} 超时 dnt_id={row['dnt_id']}")
async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict):
"""构造 SerialNet JSON 并通过 UDP 发送给设备"""
if _udp_sender is None:
logger.warning("UDP sender 未就绪,跳过发送")
return
# 获取设备 IP 和 msgport
dnt = await get_dnt_by_serial(device_id)
if not dnt or not dnt.get("ip"):
logger.warning(f"设备 {device_id} 无 IP 信息,跳过")
return
from src.config import DEVICE_UDP_PORT
send_pkg = record["send_pkg"]
addr = (dnt["ip"], DEVICE_UDP_PORT)
# 构造 SerialNet JSON
msg = {
"Method": "SerialNet",
"Params": {
"Device_id": device_id,
"Extra_id": 0,
"Bus_Num": 0,
"SerialDat": send_pkg,
},
}
msg_bytes = json.dumps(msg, ensure_ascii=False).encode("utf-8")
_udp_sender(msg_bytes, addr)
await mark_serialnet_sent(record["id"])
logger.info(f"已发送 SerialNet → {device_id} ({addr}): {send_pkg}")
# ─── SerialNet 响应处理轮询 ──────────────────────────────────────────
async def serialnet_response_loop():
"""后台轮询:处理 SerialNet 响应 (dat_type=9)
当设备通过 SerialNet 返回 0x4A-0x4E 等指令的响应时,
匹配 tb_serialnet 中 state=1 的记录并标记完成。
对于 0x4C 响应,同时更新 tb_fixture_param。
"""
logger.info("SerialNet 响应处理服务启动")
await asyncio.sleep(3) # 等前面所有服务就绪
while True:
try:
for device_id, dnt_id in list(_registry.items()):
records = await fetch_unparsed_serial(device_id)
for rec in records:
try:
raw = rec["raw_data"]
pkt_bytes = hex_str_to_bytes(raw)
packets = split_packets(pkt_bytes)
if not packets:
await mark_record_state(device_id, rec["id"], state=3)
continue
for pkt in packets:
cmd = get_packet_cmd(pkt)
if cmd is None:
continue
# 跳过 B2 (已在 parse_loop 处理)
if cmd == 0xB2:
continue
# 匹配 tb_serialnet 中 state=1 且 CMD 匹配的记录
await _match_serial_cmd(dnt_id, cmd, raw)
# 对于 0x4C解析参数并更新数据库
if cmd == 0x4C and verify_packet(pkt):
params = parse_4c_params(pkt)
if params and params.flag == 0:
await upsert_fixture_param(
dnt_id,
Addr=params.dev_addr,
DevType=params.dev_type,
TestMode=params.test_mode,
RestDis=params.reset_dis,
MinusDis=params.minus_dis,
SensMin=params.sens_min,
SensMax=params.sens_max,
FreMin=params.fre_min,
FreMax=params.fre_max,
PeakMin=params.peak_min,
PeakMax=params.peak_max,
)
logger.info(
f"已更新工装参数 dnt_id={dnt_id} "
f"DevType={params.dev_type} "
f"TestMode={params.test_mode}"
)
# 对于 0x4A记录版本信息
if cmd == 0x4A and verify_packet(pkt):
ver = parse_4a_version(pkt)
if ver:
logger.info(
f"DG430 版本: hw={ver.hw_major}.{ver.hw_minor}.{ver.hw_patch} "
f"sw={ver.sw_major}.{ver.sw_minor}.{ver.sw_patch}"
)
await mark_record_state(device_id, rec["id"], state=1)
except Exception as e:
logger.error(
f"SerialNet 响应处理异常 rec={rec['id']}: {e}",
exc_info=True,
)
try:
await mark_record_state(device_id, rec["id"], state=3)
except Exception:
pass
except Exception as e:
logger.error(f"SerialNet 响应循环异常: {e}")
await asyncio.sleep(0.5)
async def _match_serial_cmd(dnt_id: int, cmd: int, raw_hex: str):
"""匹配 tb_serialnet 中 state=1 且 CMD 匹配的记录"""
try:
from src.models import get_pool
import aiomysql
pool = await get_pool()
cmd_hex = f"{cmd:02X}"
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 "
"AND UPPER(SUBSTRING(send_pkg, 7, 2)) = %s "
"ORDER BY id ASC LIMIT 1",
(dnt_id, cmd_hex),
)
row = await cur.fetchone()
if row:
await mark_serialnet_done(row["id"], raw_hex)
logger.info(
f"tb_serialnet #{row['id']} CMD=0x{cmd:02X} 已确认完成 "
f"(dnt_id={dnt_id})"
)
except Exception as e:
logger.warning(f"匹配 serialnet cmd 0x{cmd:02X} 失败: {e}")
# ─── 设备状态监控服务 ──────────────────────────────────────────────────
# 在线判定参数
INTERACTION_TIMEOUT = 10 # 单次交互超时判定 (秒)
ONLINE_MIN_INTERACTIONS = 3 # 连续几次交互在超时内即表示在线
OFFLINE_IDLE_SEC = 60 # 超过此时间无交互 → 离线
POOR_MIN_INTERACTIONS = 4 # 1 分钟内少于此次数 → 通信不良
MONITOR_INTERVAL = 5 # 状态检查间隔 (秒)
async def device_status_monitor():
"""后台轮询:监控所有设备在线/离线状态
两个阶段:
Phase 1 — 遍历 _registry 中活跃设备,根据交互记录判定状态
Phase 2 — 扫描 dnt_info 全表,修正与实际交互不符的状态:
- state=1(在线) 但 >60s 无交互 → 更新为离线
- state=0(离线) 但有交互记录 → 根据交互模式更新为在线/通信不良
- state 与计算值不一致 → 同步修正
"""
logger.info("设备状态监控服务启动")
await asyncio.sleep(5) # 等其它服务就绪
while True:
try:
now = time.time()
# ── Phase 1: 活跃设备(在 _registry 中)────
for device_id, dnt_id in list(_registry.items()):
interactions = _interactions.get(device_id)
actual_state = _calc_device_state(interactions, now) if interactions else 0
old_state = _device_status.get(device_id, 1) # 注册时默认在线
if actual_state != old_state:
await _apply_state_change(device_id, actual_state, old_state)
# ── Phase 2: 扫描 dnt_info 全表,修正不一致 ──
try:
rows = await get_all_device_serials()
except Exception as e:
logger.error(f"查询 dnt_info 失败: {e}")
rows = []
for serial, db_state, ip in rows:
if not serial:
continue
interactions = _interactions.get(serial)
actual_state = _calc_device_state(interactions, now) if interactions else 0
memory_state = _device_status.get(serial)
# 优先用内存状态做基准(更实时);无内存状态则用 DB 状态
tracked_state = memory_state if memory_state is not None else db_state
if actual_state != tracked_state:
await _apply_state_change(serial, actual_state, tracked_state, ip=ip)
# ── Phase 3: 清理 _registry 中已经不存在的设备 ──
# (已注销 / 长期无交互的设备不清除,继续监控)
except Exception as e:
logger.error(f"设备状态监控异常: {e}")
await asyncio.sleep(MONITOR_INTERVAL)
async def _apply_state_change(device_id: str, new_state: int, old_state: int,
ip: str = ""):
"""应用状态变更更新内存、dnt_info、写入日志"""
_device_status[device_id] = new_state
logger.info(
f"设备 {device_id} 状态变更: "
f"{_state_name(old_state)}{_state_name(new_state)}"
)
# 更新 dnt_info
try:
await update_device_status(device_id, new_state)
except Exception as e:
logger.error(f"更新设备状态失败: {e}")
# 获取设备 IP未传入时从 DB 查)
if not ip:
try:
dnt = await get_dnt_by_serial(device_id)
ip = dnt.get("ip", "") if dnt else ""
except Exception:
pass
# 写入事件日志
event_type, content = _state_event(new_state, old_state)
try:
await insert_device_log(
serial=device_id, ip=ip,
event_type=event_type, content=content,
)
except Exception as e:
logger.error(f"写入设备事件日志失败: {e}")
def _calc_device_state(interactions: deque, now: float) -> int:
"""根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)"""
# 清理过期记录(仅保留 60s 内)
cutoff = now - OFFLINE_IDLE_SEC
recent = [t for t in interactions if t >= cutoff]
if not recent:
return 0 # 离线
# 最近一次交互距今
last_interaction = recent[-1]
if now - last_interaction > OFFLINE_IDLE_SEC:
return 0 # 离线
# 1 分钟内交互次数
if len(recent) < POOR_MIN_INTERACTIONS:
return 2 # 通信不良
# 最近 3 次交互间隔是否都在超时内
if len(recent) >= ONLINE_MIN_INTERACTIONS:
last_n = recent[-ONLINE_MIN_INTERACTIONS:]
gaps = [last_n[i] - last_n[i - 1] for i in range(1, len(last_n))]
if gaps and all(g <= INTERACTION_TIMEOUT for g in gaps):
return 1 # 在线
# 默认:有交互但不够密集 → 通信不良
return 2
def _state_name(state: int) -> str:
return {0: "离线", 1: "在线", 2: "通信不良"}.get(state, f"未知({state})")
def _state_event(new_state: int, old_state: int) -> tuple[str, str]:
"""根据状态变化生成事件类型和内容"""
name_new = _state_name(new_state)
name_old = _state_name(old_state)
if new_state == 0:
return "offline", f"设备离线(上次状态: {name_old}"
elif new_state == 1:
return "online", f"设备已上线(上次状态: {name_old}"
else:
return "poor", f"设备通信不良(上次状态: {name_old}"