Compare commits
16 Commits
2d6c9f03dd
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a2f31b3bfe | ||
|
|
ff9482780d | ||
|
|
6e13990386 | ||
|
|
3580f89552 | ||
|
|
25aafd57c8 | ||
|
|
cdddfac609 | ||
|
|
944870496a | ||
|
|
c875cf383b | ||
|
|
11f1c4f55b | ||
|
|
ef890fafc6 | ||
|
|
3a74759066 | ||
|
|
68c6d0bcfe | ||
|
|
844de70017 | ||
|
|
dc1d2b8871 | ||
|
|
7e5fe2cccd | ||
|
|
6724af7951 |
16
src/dg430.py
16
src/dg430.py
@@ -280,12 +280,16 @@ def decode_fault_info(fault: int) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def decode_relay_info(relay: int) -> str:
|
def decode_relay_info(relay: int) -> str:
|
||||||
"""解码继电器 bitmask"""
|
"""解码继电器输出状态为可读字符串
|
||||||
items = []
|
|
||||||
for bit, desc in RELAY_BITS.items():
|
0xB2 继电器输出状态原始值 x 的解析规则:
|
||||||
if relay & (1 << bit):
|
- x & 0x01 为真 → "存在继电器有输出",否则 "存在继电器无输出"
|
||||||
items.append(desc)
|
- x & 0x02 为真 → "脉冲继电器有输出",否则 "脉冲继电器无输出"
|
||||||
return "; ".join(items) if items else "无输出"
|
汇总格式: "存在继电器有输出,脉冲继电器有输出"
|
||||||
|
"""
|
||||||
|
exist = "存在继电器有输出" if (relay & 0x01) else "存在继电器无输出"
|
||||||
|
pulse = "脉冲继电器有输出" if (relay & 0x02) else "脉冲继电器无输出"
|
||||||
|
return f"{exist},{pulse}"
|
||||||
|
|
||||||
|
|
||||||
# ─── 0x4A 获取设备版本号响应 ──────────────────────────────────────
|
# ─── 0x4A 获取设备版本号响应 ──────────────────────────────────────
|
||||||
|
|||||||
221
src/handlers.py
221
src/handlers.py
@@ -12,6 +12,7 @@ import asyncio
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
from collections import deque
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
from src.models import (
|
from src.models import (
|
||||||
@@ -22,12 +23,17 @@ from src.models import (
|
|||||||
fetch_unparsed_serial,
|
fetch_unparsed_serial,
|
||||||
mark_record_state,
|
mark_record_state,
|
||||||
insert_test_result,
|
insert_test_result,
|
||||||
|
insert_wave_data,
|
||||||
get_dnt_by_serial,
|
get_dnt_by_serial,
|
||||||
get_pending_serialnet,
|
get_pending_serialnet,
|
||||||
mark_serialnet_sent,
|
mark_serialnet_sent,
|
||||||
mark_serialnet_done,
|
mark_serialnet_done,
|
||||||
mark_serialnet_timeout,
|
mark_serialnet_timeout,
|
||||||
upsert_fixture_param,
|
upsert_fixture_param,
|
||||||
|
insert_device_log,
|
||||||
|
update_device_status,
|
||||||
|
get_all_device_serials,
|
||||||
|
get_dev_type_name,
|
||||||
)
|
)
|
||||||
from src.dg430 import (
|
from src.dg430 import (
|
||||||
parse_b2_status,
|
parse_b2_status,
|
||||||
@@ -57,6 +63,12 @@ _registry: dict[str, int] = {}
|
|||||||
# 设备心跳时间: {device_id: last_heartbeat}
|
# 设备心跳时间: {device_id: last_heartbeat}
|
||||||
_heartbeat: dict[str, float] = {}
|
_heartbeat: dict[str, float] = {}
|
||||||
|
|
||||||
|
# 设备交互时间记录: {device_id: deque[float]} (最近 60s 内)
|
||||||
|
_interactions: dict[str, deque] = {}
|
||||||
|
|
||||||
|
# 设备当前状态: {device_id: int} (0=离线 1=在线 2=通信不良)
|
||||||
|
_device_status: dict[str, int] = {}
|
||||||
|
|
||||||
# UDP transport 引用,由 server.py 注入
|
# UDP transport 引用,由 server.py 注入
|
||||||
_udp_sender: object | None = None
|
_udp_sender: object | None = None
|
||||||
|
|
||||||
@@ -67,6 +79,18 @@ def set_udp_sender(sender):
|
|||||||
_udp_sender = sender
|
_udp_sender = sender
|
||||||
|
|
||||||
|
|
||||||
|
def record_interaction(device_id: str):
|
||||||
|
"""记录一次设备交互(心跳/上报/解析成功)"""
|
||||||
|
now = time.time()
|
||||||
|
if device_id not in _interactions:
|
||||||
|
_interactions[device_id] = deque()
|
||||||
|
_interactions[device_id].append(now)
|
||||||
|
# 清理 120s 前的旧记录
|
||||||
|
cutoff = now - 120
|
||||||
|
while _interactions[device_id] and _interactions[device_id][0] < cutoff:
|
||||||
|
_interactions[device_id].popleft()
|
||||||
|
|
||||||
|
|
||||||
async def handle_count_off(data: dict, addr: tuple):
|
async def handle_count_off(data: dict, addr: tuple):
|
||||||
"""处理设备登录/身份上报 (Count_Off 返回格式)
|
"""处理设备登录/身份上报 (Count_Off 返回格式)
|
||||||
|
|
||||||
@@ -124,9 +148,20 @@ async def handle_count_off(data: dict, addr: tuple):
|
|||||||
)
|
)
|
||||||
_registry[serial] = dnt_id
|
_registry[serial] = dnt_id
|
||||||
_heartbeat[serial] = time.time()
|
_heartbeat[serial] = time.time()
|
||||||
|
record_interaction(serial)
|
||||||
|
_device_status[serial] = 1 # 登录即视为在线状态
|
||||||
|
|
||||||
await ensure_collect_table(serial)
|
await ensure_collect_table(serial)
|
||||||
|
|
||||||
|
# 登录事件日志
|
||||||
|
try:
|
||||||
|
await insert_device_log(
|
||||||
|
serial=serial, ip=dev_ip, event_type="login",
|
||||||
|
content=f"设备上线 type={dev_type} ver={dev_version}",
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"设备登录: {serial} dnt_id={dnt_id} ip={dev_ip}:{dev_port} "
|
f"设备登录: {serial} dnt_id={dnt_id} ip={dev_ip}:{dev_port} "
|
||||||
f"type={dev_type} ver={dev_version}"
|
f"type={dev_type} ver={dev_version}"
|
||||||
@@ -142,6 +177,7 @@ async def handle_heartbeat(data: dict) -> str | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
_heartbeat[device_id] = time.time()
|
_heartbeat[device_id] = time.time()
|
||||||
|
record_interaction(device_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await insert_collect_data(device_id, 0, str(data))
|
await insert_collect_data(device_id, 0, str(data))
|
||||||
@@ -152,21 +188,11 @@ async def handle_heartbeat(data: dict) -> str | None:
|
|||||||
|
|
||||||
|
|
||||||
def handle_timestamp(data: dict) -> str:
|
def handle_timestamp(data: dict) -> str:
|
||||||
"""处理时间同步请求
|
"""处理时间同步请求(也是设备交互)"""
|
||||||
|
|
||||||
TimeStamp 请求格式:
|
|
||||||
{
|
|
||||||
"Method": "TimeStamp",
|
|
||||||
"Params": {
|
|
||||||
"Device_id": "2345",
|
|
||||||
"TimeZone": "Asia/Shanghai"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
返回格式参考 PGLC 协议 3.5 节。
|
|
||||||
"""
|
|
||||||
params = data.get("Params", {})
|
params = data.get("Params", {})
|
||||||
device_id = params.get("Device_id", "")
|
device_id = params.get("Device_id", "")
|
||||||
|
if device_id:
|
||||||
|
record_interaction(device_id)
|
||||||
return make_timestamp_response(device_id, int(time.time()))
|
return make_timestamp_response(device_id, int(time.time()))
|
||||||
|
|
||||||
|
|
||||||
@@ -185,6 +211,8 @@ async def handle_tsreport(data: dict) -> str | None:
|
|||||||
if not device_id or not sub_dat:
|
if not device_id or not sub_dat:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
record_interaction(device_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await insert_collect_data(device_id, 8, sub_dat)
|
await insert_collect_data(device_id, 8, sub_dat)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -208,6 +236,8 @@ async def handle_serial_net(data: dict) -> str | None:
|
|||||||
if not device_id or not serial_dat:
|
if not device_id or not serial_dat:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
record_interaction(device_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await insert_collect_data(device_id, 9, serial_dat)
|
await insert_collect_data(device_id, 9, serial_dat)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -264,8 +294,7 @@ async def parse_loop():
|
|||||||
fault_info = decode_fault_info(status.fault)
|
fault_info = decode_fault_info(status.fault)
|
||||||
relay_info = decode_relay_info(status.relay_out)
|
relay_info = decode_relay_info(status.relay_out)
|
||||||
|
|
||||||
dev_model_map = {1: "PD132", 2: "DLD110"}
|
str_type = await get_dev_type_name(status.dev_model)
|
||||||
str_type = dev_model_map.get(status.dev_model, f"Unknown({status.dev_model})")
|
|
||||||
|
|
||||||
await insert_test_result(
|
await insert_test_result(
|
||||||
dnt_id=dnt_id,
|
dnt_id=dnt_id,
|
||||||
@@ -285,6 +314,8 @@ async def parse_loop():
|
|||||||
exit_dist=status.exit_dist,
|
exit_dist=status.exit_dist,
|
||||||
enter_speed=status.enter_speed,
|
enter_speed=status.enter_speed,
|
||||||
exit_speed=status.exit_speed,
|
exit_speed=status.exit_speed,
|
||||||
|
test_mode=status.test_mode,
|
||||||
|
relay_code=status.relay_out,
|
||||||
)
|
)
|
||||||
|
|
||||||
# 匹配 tb_serialnet 中的待确认记录
|
# 匹配 tb_serialnet 中的待确认记录
|
||||||
@@ -351,6 +382,20 @@ async def parse_loop():
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
relay_info = decode_relay_info(wave.relay_out)
|
relay_info = decode_relay_info(wave.relay_out)
|
||||||
|
await insert_wave_data(
|
||||||
|
dnt_id=dnt_id,
|
||||||
|
dpg430_addr=wave.addr,
|
||||||
|
remain_count=wave.remain_count,
|
||||||
|
relay_code=wave.relay_out,
|
||||||
|
relay_out=relay_info,
|
||||||
|
work_freq=wave.work_freq,
|
||||||
|
curr_dist=wave.curr_dist,
|
||||||
|
speed=wave.speed,
|
||||||
|
near_dist=wave.near_dist,
|
||||||
|
far_dist=wave.far_dist,
|
||||||
|
enter_dist=wave.enter_dist,
|
||||||
|
leave_dist=wave.leave_dist,
|
||||||
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"B4波动上报: {device_id} 剩余={wave.remain_count} "
|
f"B4波动上报: {device_id} 剩余={wave.remain_count} "
|
||||||
f"当前距离={wave.curr_dist}mm 速度={wave.speed}dm/s "
|
f"当前距离={wave.curr_dist}mm 速度={wave.speed}dm/s "
|
||||||
@@ -393,6 +438,7 @@ async def parse_loop():
|
|||||||
logger.debug(f"跳过未知/不支持指令: 0x{cmd:02X}")
|
logger.debug(f"跳过未知/不支持指令: 0x{cmd:02X}")
|
||||||
|
|
||||||
if has_valid:
|
if has_valid:
|
||||||
|
record_interaction(device_id)
|
||||||
await mark_record_state(device_id, rec["id"], state=1)
|
await mark_record_state(device_id, rec["id"], state=1)
|
||||||
elif all_failed and packets:
|
elif all_failed and packets:
|
||||||
await mark_record_state(device_id, rec["id"], state=3)
|
await mark_record_state(device_id, rec["id"], state=3)
|
||||||
@@ -651,3 +697,148 @@ async def _match_serial_cmd(dnt_id: int, cmd: int, raw_hex: str):
|
|||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"匹配 serialnet cmd 0x{cmd:02X} 失败: {e}")
|
logger.warning(f"匹配 serialnet cmd 0x{cmd:02X} 失败: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 设备状态监控服务 ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
# 在线判定参数
|
||||||
|
INTERACTION_TIMEOUT = 10 # 单次交互超时判定 (秒)
|
||||||
|
ONLINE_MIN_INTERACTIONS = 3 # 连续几次交互在超时内即表示在线
|
||||||
|
OFFLINE_IDLE_SEC = 60 # 超过此时间无交互 → 离线
|
||||||
|
POOR_MIN_INTERACTIONS = 4 # 1 分钟内少于此次数 → 通信不良
|
||||||
|
MONITOR_INTERVAL = 5 # 状态检查间隔 (秒)
|
||||||
|
|
||||||
|
|
||||||
|
async def device_status_monitor():
|
||||||
|
"""后台轮询:监控所有设备在线/离线状态
|
||||||
|
|
||||||
|
两个阶段:
|
||||||
|
Phase 1 — 遍历 _registry 中活跃设备,根据交互记录判定状态
|
||||||
|
Phase 2 — 扫描 dnt_info 全表,修正与实际交互不符的状态:
|
||||||
|
- state=1(在线) 但 >60s 无交互 → 更新为离线
|
||||||
|
- state=0(离线) 但有交互记录 → 根据交互模式更新为在线/通信不良
|
||||||
|
- state 与计算值不一致 → 同步修正
|
||||||
|
"""
|
||||||
|
logger.info("设备状态监控服务启动")
|
||||||
|
await asyncio.sleep(5) # 等其它服务就绪
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
|
# ── Phase 1: 活跃设备(在 _registry 中)────
|
||||||
|
for device_id, dnt_id in list(_registry.items()):
|
||||||
|
interactions = _interactions.get(device_id)
|
||||||
|
actual_state = _calc_device_state(interactions, now) if interactions else 0
|
||||||
|
old_state = _device_status.get(device_id, 1) # 注册时默认在线
|
||||||
|
|
||||||
|
if actual_state != old_state:
|
||||||
|
await _apply_state_change(device_id, actual_state, old_state)
|
||||||
|
|
||||||
|
# ── Phase 2: 扫描 dnt_info 全表,修正不一致 ──
|
||||||
|
try:
|
||||||
|
rows = await get_all_device_serials()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"查询 dnt_info 失败: {e}")
|
||||||
|
rows = []
|
||||||
|
|
||||||
|
for serial, db_state, ip in rows:
|
||||||
|
if not serial:
|
||||||
|
continue
|
||||||
|
|
||||||
|
interactions = _interactions.get(serial)
|
||||||
|
actual_state = _calc_device_state(interactions, now) if interactions else 0
|
||||||
|
memory_state = _device_status.get(serial)
|
||||||
|
|
||||||
|
# 优先用内存状态做基准(更实时);无内存状态则用 DB 状态
|
||||||
|
tracked_state = memory_state if memory_state is not None else db_state
|
||||||
|
|
||||||
|
if actual_state != tracked_state:
|
||||||
|
await _apply_state_change(serial, actual_state, tracked_state, ip=ip)
|
||||||
|
|
||||||
|
# ── Phase 3: 清理 _registry 中已经不存在的设备 ──
|
||||||
|
# (已注销 / 长期无交互的设备不清除,继续监控)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"设备状态监控异常: {e}")
|
||||||
|
|
||||||
|
await asyncio.sleep(MONITOR_INTERVAL)
|
||||||
|
|
||||||
|
|
||||||
|
async def _apply_state_change(device_id: str, new_state: int, old_state: int,
|
||||||
|
ip: str = ""):
|
||||||
|
"""应用状态变更:更新内存、dnt_info、写入日志"""
|
||||||
|
_device_status[device_id] = new_state
|
||||||
|
logger.info(
|
||||||
|
f"设备 {device_id} 状态变更: "
|
||||||
|
f"{_state_name(old_state)} → {_state_name(new_state)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# 更新 dnt_info
|
||||||
|
try:
|
||||||
|
await update_device_status(device_id, new_state)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"更新设备状态失败: {e}")
|
||||||
|
|
||||||
|
# 获取设备 IP(未传入时从 DB 查)
|
||||||
|
if not ip:
|
||||||
|
try:
|
||||||
|
dnt = await get_dnt_by_serial(device_id)
|
||||||
|
ip = dnt.get("ip", "") if dnt else ""
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 写入事件日志
|
||||||
|
event_type, content = _state_event(new_state, old_state)
|
||||||
|
try:
|
||||||
|
await insert_device_log(
|
||||||
|
serial=device_id, ip=ip,
|
||||||
|
event_type=event_type, content=content,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"写入设备事件日志失败: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def _calc_device_state(interactions: deque, now: float) -> int:
|
||||||
|
"""根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)"""
|
||||||
|
# 清理过期记录(仅保留 60s 内)
|
||||||
|
cutoff = now - OFFLINE_IDLE_SEC
|
||||||
|
recent = [t for t in interactions if t >= cutoff]
|
||||||
|
|
||||||
|
if not recent:
|
||||||
|
return 0 # 离线
|
||||||
|
|
||||||
|
# 最近一次交互距今
|
||||||
|
last_interaction = recent[-1]
|
||||||
|
if now - last_interaction > OFFLINE_IDLE_SEC:
|
||||||
|
return 0 # 离线
|
||||||
|
|
||||||
|
# 1 分钟内交互次数
|
||||||
|
if len(recent) < POOR_MIN_INTERACTIONS:
|
||||||
|
return 2 # 通信不良
|
||||||
|
|
||||||
|
# 最近 3 次交互间隔是否都在超时内
|
||||||
|
if len(recent) >= ONLINE_MIN_INTERACTIONS:
|
||||||
|
last_n = recent[-ONLINE_MIN_INTERACTIONS:]
|
||||||
|
gaps = [last_n[i] - last_n[i - 1] for i in range(1, len(last_n))]
|
||||||
|
if gaps and all(g <= INTERACTION_TIMEOUT for g in gaps):
|
||||||
|
return 1 # 在线
|
||||||
|
|
||||||
|
# 默认:有交互但不够密集 → 通信不良
|
||||||
|
return 2
|
||||||
|
|
||||||
|
|
||||||
|
def _state_name(state: int) -> str:
|
||||||
|
return {0: "离线", 1: "在线", 2: "通信不良"}.get(state, f"未知({state})")
|
||||||
|
|
||||||
|
|
||||||
|
def _state_event(new_state: int, old_state: int) -> tuple[str, str]:
|
||||||
|
"""根据状态变化生成事件类型和内容"""
|
||||||
|
name_new = _state_name(new_state)
|
||||||
|
name_old = _state_name(old_state)
|
||||||
|
if new_state == 0:
|
||||||
|
return "offline", f"设备离线(上次状态: {name_old})"
|
||||||
|
elif new_state == 1:
|
||||||
|
return "online", f"设备已上线(上次状态: {name_old})"
|
||||||
|
else:
|
||||||
|
return "poor", f"设备通信不良(上次状态: {name_old})"
|
||||||
|
|||||||
355
src/models.py
355
src/models.py
@@ -100,11 +100,14 @@ async def _create_tables(pool: aiomysql.Pool):
|
|||||||
CREATE TABLE IF NOT EXISTS `tb_state_tst` (
|
CREATE TABLE IF NOT EXISTS `tb_state_tst` (
|
||||||
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
||||||
`dnt_id` INT NOT NULL COMMENT 'FK → dnt_info.id',
|
`dnt_id` INT NOT NULL COMMENT 'FK → dnt_info.id',
|
||||||
|
`detector_serial` VARCHAR(45) DEFAULT '' COMMENT '车检器序列号',
|
||||||
`dpg430_addr` TINYINT DEFAULT 0,
|
`dpg430_addr` TINYINT DEFAULT 0,
|
||||||
`pcnum` VARCHAR(10) DEFAULT '' COMMENT '批次号',
|
`pcnum` VARCHAR(10) DEFAULT '' COMMENT '批次号',
|
||||||
`serialnum` INT DEFAULT 0 COMMENT '流水号',
|
`serialnum` INT DEFAULT 0 COMMENT '流水号',
|
||||||
`sub_type` TINYINT DEFAULT 0 COMMENT '1 DLD110, 2 PD132',
|
`sub_type` TINYINT DEFAULT 0 COMMENT '1 DLD110, 2 PD132',
|
||||||
`str_type` VARCHAR(30) DEFAULT '',
|
`str_type` VARCHAR(30) DEFAULT '',
|
||||||
|
`test_mode` TINYINT DEFAULT 0 COMMENT '0 灵敏度测试, 1 波动测试',
|
||||||
|
`data_source` CHAR(2) DEFAULT 'B2' COMMENT '数据来源 B2/B4',
|
||||||
`iffinish` VARCHAR(5) DEFAULT '' COMMENT '是否完成',
|
`iffinish` VARCHAR(5) DEFAULT '' COMMENT '是否完成',
|
||||||
`fault_info` VARCHAR(100) DEFAULT '',
|
`fault_info` VARCHAR(100) DEFAULT '',
|
||||||
`relay_out` VARCHAR(24) DEFAULT '',
|
`relay_out` VARCHAR(24) DEFAULT '',
|
||||||
@@ -116,6 +119,15 @@ async def _create_tables(pool: aiomysql.Pool):
|
|||||||
`exit_dist` INT DEFAULT 0,
|
`exit_dist` INT DEFAULT 0,
|
||||||
`enter_speed` INT DEFAULT 0,
|
`enter_speed` INT DEFAULT 0,
|
||||||
`exit_speed` INT DEFAULT 0,
|
`exit_speed` INT DEFAULT 0,
|
||||||
|
`remain_count` INT DEFAULT 0 COMMENT '剩余波动次数 (B4)',
|
||||||
|
`work_freq` FLOAT DEFAULT 0 COMMENT '工作频率 Hz (B4)',
|
||||||
|
`curr_dist` INT DEFAULT 0 COMMENT '当前距离 mm (B4)',
|
||||||
|
`speed` INT DEFAULT 0 COMMENT '当前速度 dm/s (B4)',
|
||||||
|
`near_dist` INT DEFAULT 0 COMMENT '波动最近距离 mm (B4)',
|
||||||
|
`far_dist` INT DEFAULT 0 COMMENT '波动最远距离 mm (B4)',
|
||||||
|
`b4_enter_dist` INT DEFAULT 0 COMMENT 'B4 进入高度 mm',
|
||||||
|
`b4_leave_dist` INT DEFAULT 0 COMMENT 'B4 离开高度 mm',
|
||||||
|
`relay_code` TINYINT DEFAULT 0 COMMENT '继电器原始值 (0x00-0x03)',
|
||||||
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
|
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
INDEX `idx_dnt_id` (`dnt_id`)
|
INDEX `idx_dnt_id` (`dnt_id`)
|
||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
@@ -141,7 +153,7 @@ async def _create_tables(pool: aiomysql.Pool):
|
|||||||
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
||||||
`username` VARCHAR(45) UNIQUE NOT NULL,
|
`username` VARCHAR(45) UNIQUE NOT NULL,
|
||||||
`password_hash` VARCHAR(256) NOT NULL,
|
`password_hash` VARCHAR(256) NOT NULL,
|
||||||
`role` VARCHAR(20) DEFAULT 'operator' COMMENT 'admin/operator',
|
`role` VARCHAR(20) DEFAULT 'operator' COMMENT 'admin/manager/operator/analyst',
|
||||||
`is_active` TINYINT DEFAULT 1,
|
`is_active` TINYINT DEFAULT 1,
|
||||||
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP
|
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
@@ -175,6 +187,43 @@ async def _create_tables(pool: aiomysql.Pool):
|
|||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
# V2.0.3 迁移:为旧表补充波动测试参数字段
|
||||||
|
for col, col_def in [
|
||||||
|
("FarTol", "INT DEFAULT 0 COMMENT '最远容差 cm'"),
|
||||||
|
("NearTol", "INT DEFAULT 0 COMMENT '最近容差 cm'"),
|
||||||
|
("StepTol", "INT DEFAULT 0 COMMENT '步进容差 cm'"),
|
||||||
|
("BackForth", "INT DEFAULT 0 COMMENT '来回次数'"),
|
||||||
|
("NearStay", "INT DEFAULT 0 COMMENT '最近停留时间 ms'"),
|
||||||
|
("FarStay", "INT DEFAULT 0 COMMENT '最远停留时间 ms'"),
|
||||||
|
]:
|
||||||
|
try:
|
||||||
|
await cur.execute(
|
||||||
|
f"ALTER TABLE `tb_fixture_param` ADD COLUMN `{col}` {col_def}"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass # 列已存在,忽略
|
||||||
|
|
||||||
|
# V2.0.4 迁移:tb_state_tst 增加波动测试字段
|
||||||
|
for col, col_def in [
|
||||||
|
("test_mode", "TINYINT DEFAULT 0 COMMENT '0 灵敏度, 1 波动测试'"),
|
||||||
|
("data_source", "CHAR(2) DEFAULT 'B2' COMMENT 'B2/B4'"),
|
||||||
|
("remain_count", "INT DEFAULT 0 COMMENT '剩余波动次数'"),
|
||||||
|
("work_freq", "FLOAT DEFAULT 0 COMMENT '工作频率 Hz'"),
|
||||||
|
("curr_dist", "INT DEFAULT 0 COMMENT '当前距离 mm'"),
|
||||||
|
("speed", "INT DEFAULT 0 COMMENT '当前速度 dm/s'"),
|
||||||
|
("near_dist", "INT DEFAULT 0 COMMENT '波动最近距离 mm'"),
|
||||||
|
("far_dist", "INT DEFAULT 0 COMMENT '波动最远距离 mm'"),
|
||||||
|
("b4_enter_dist", "INT DEFAULT 0 COMMENT 'B4 进入高度 mm'"),
|
||||||
|
("b4_leave_dist", "INT DEFAULT 0 COMMENT 'B4 离开高度 mm'"),
|
||||||
|
("relay_code", "TINYINT DEFAULT 0 COMMENT '继电器原始值 0x00-0x03'"),
|
||||||
|
]:
|
||||||
|
try:
|
||||||
|
await cur.execute(
|
||||||
|
f"ALTER TABLE `tb_state_tst` ADD COLUMN `{col}` {col_def}"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
# 7. 车检器测试基准参数表
|
# 7. 车检器测试基准参数表
|
||||||
await cur.execute("""
|
await cur.execute("""
|
||||||
CREATE TABLE IF NOT EXISTS `tb_vechicle_base_test` (
|
CREATE TABLE IF NOT EXISTS `tb_vechicle_base_test` (
|
||||||
@@ -211,6 +260,120 @@ async def _create_tables(pool: aiomysql.Pool):
|
|||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
# 9. 线圈参数表
|
||||||
|
await cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS `tb_coil_info` (
|
||||||
|
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
||||||
|
`coil_num` VARCHAR(45) DEFAULT '' COMMENT '线圈编号',
|
||||||
|
`name` VARCHAR(100) DEFAULT '' COMMENT '名称',
|
||||||
|
`induct` FLOAT DEFAULT 0 COMMENT '电感量',
|
||||||
|
`shape` VARCHAR(20) DEFAULT '' COMMENT '形状(矩形、圆形等)',
|
||||||
|
`length` FLOAT DEFAULT 0 COMMENT '长 cm(矩形有效)',
|
||||||
|
`width` FLOAT DEFAULT 0 COMMENT '宽 cm(矩形有效)',
|
||||||
|
`radius` FLOAT DEFAULT 0 COMMENT '半径 cm(圆形有效)',
|
||||||
|
`turns` INT DEFAULT 0 COMMENT '圈数',
|
||||||
|
`resistance` FLOAT DEFAULT 0 COMMENT '电阻 欧姆',
|
||||||
|
`material` VARCHAR(50) DEFAULT '' COMMENT '材质',
|
||||||
|
`remark` VARCHAR(500) DEFAULT '' COMMENT '备注',
|
||||||
|
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
|
""")
|
||||||
|
|
||||||
|
# 10. 模拟车辆参数表
|
||||||
|
await cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS `tb_simulate_car` (
|
||||||
|
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
||||||
|
`simulate_num` VARCHAR(45) DEFAULT '' COMMENT '模拟编号',
|
||||||
|
`name` VARCHAR(100) DEFAULT '' COMMENT '名称',
|
||||||
|
`shape` VARCHAR(20) DEFAULT '' COMMENT '形状(矩形、圆形等)',
|
||||||
|
`length` FLOAT DEFAULT 0 COMMENT '长 cm(矩形有效)',
|
||||||
|
`width` FLOAT DEFAULT 0 COMMENT '宽 cm(矩形有效)',
|
||||||
|
`radius` FLOAT DEFAULT 0 COMMENT '半径 cm(圆形有效)',
|
||||||
|
`material` VARCHAR(50) DEFAULT '' COMMENT '材质(铁板、合金等)',
|
||||||
|
`remark` VARCHAR(500) DEFAULT '' COMMENT '备注',
|
||||||
|
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
|
""")
|
||||||
|
|
||||||
|
# V2.1.0 迁移:tb_fixture_param 增加线圈/模拟车辆关联
|
||||||
|
for col, col_def in [
|
||||||
|
("coil_id", "INT DEFAULT NULL COMMENT 'FK → tb_coil_info.id'"),
|
||||||
|
("simulate_car_id", "INT DEFAULT NULL COMMENT 'FK → tb_simulate_car.id'"),
|
||||||
|
]:
|
||||||
|
try:
|
||||||
|
await cur.execute(
|
||||||
|
f"ALTER TABLE `tb_fixture_param` ADD COLUMN `{col}` {col_def}"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# V2.1.0 迁移:tb_state_tst 增加线圈/模拟车辆关联
|
||||||
|
for col, col_def in [
|
||||||
|
("coil_id", "INT DEFAULT NULL COMMENT 'FK → tb_coil_info.id'"),
|
||||||
|
("simulate_car_id", "INT DEFAULT NULL COMMENT 'FK → tb_simulate_car.id'"),
|
||||||
|
]:
|
||||||
|
try:
|
||||||
|
await cur.execute(
|
||||||
|
f"ALTER TABLE `tb_state_tst` ADD COLUMN `{col}` {col_def}"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# V2.4.0 迁移:tb_state_tst 增加车检器序列号
|
||||||
|
for col, col_def in [
|
||||||
|
("detector_serial", "VARCHAR(45) DEFAULT '' COMMENT '车检器序列号'"),
|
||||||
|
]:
|
||||||
|
try:
|
||||||
|
await cur.execute(
|
||||||
|
f"ALTER TABLE `tb_state_tst` ADD COLUMN `{col}` {col_def}"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 13. 待插入的车检器序列号表 (V2.4.0)
|
||||||
|
await cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS `tb_pending_detector` (
|
||||||
|
`dnt_id` INT PRIMARY KEY COMMENT 'FK → dnt_info.id',
|
||||||
|
`detector_serial` VARCHAR(45) DEFAULT '' COMMENT '待插入的车检器序列号',
|
||||||
|
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
|
""")
|
||||||
|
|
||||||
|
# 11. 设备事件日志表
|
||||||
|
await cur.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS `tb_device_log` (
|
||||||
|
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
||||||
|
`device_serial` VARCHAR(45) NOT NULL COMMENT '设备序列号',
|
||||||
|
`device_ip` VARCHAR(45) DEFAULT '' COMMENT '设备IP',
|
||||||
|
`event_type` VARCHAR(30) NOT NULL COMMENT 'login/online/offline/poor',
|
||||||
|
`event_content` VARCHAR(500) DEFAULT '' COMMENT '事件详情',
|
||||||
|
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
INDEX `idx_serial` (`device_serial`),
|
||||||
|
INDEX `idx_event_type` (`event_type`),
|
||||||
|
INDEX `idx_create_time` (`create_time`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
|
""")
|
||||||
|
|
||||||
|
# V2.2.0 迁移:扩展 dnt_info 状态值(0=离线 1=在线 2=通信不良)
|
||||||
|
try:
|
||||||
|
await cur.execute(
|
||||||
|
"ALTER TABLE dnt_info MODIFY COLUMN `state` TINYINT DEFAULT 0 "
|
||||||
|
"COMMENT '0 offline, 1 online, 2 poor'"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# V2.3.0 迁移:tb_user 角色增加 manager
|
||||||
|
try:
|
||||||
|
await cur.execute(
|
||||||
|
"ALTER TABLE tb_user MODIFY COLUMN `role` VARCHAR(20) DEFAULT 'operator' "
|
||||||
|
"COMMENT 'admin/manager/operator/analyst'"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.info("数据库表初始化完成")
|
logger.info("数据库表初始化完成")
|
||||||
|
|
||||||
|
|
||||||
@@ -328,30 +491,139 @@ async def upsert_dnt(serial: str, ip: str, port: int, mac: str,
|
|||||||
return cur.lastrowid
|
return cur.lastrowid
|
||||||
|
|
||||||
|
|
||||||
|
async def get_fixture_coil_car_ids(dnt_id: int) -> tuple[int | None, int | None]:
|
||||||
|
"""从 tb_fixture_param 获取当前线圈和模拟车辆关联 ID"""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"SELECT coil_id, simulate_car_id FROM tb_fixture_param WHERE dnt_id=%s",
|
||||||
|
(dnt_id,),
|
||||||
|
)
|
||||||
|
row = await cur.fetchone()
|
||||||
|
if row:
|
||||||
|
return row.get("coil_id"), row.get("simulate_car_id")
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
|
async def get_fixture_dev_type(dnt_id: int) -> int:
|
||||||
|
"""从 tb_fixture_param 获取被检设备型号类型编码 (DevType)"""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"SELECT DevType FROM tb_fixture_param WHERE dnt_id=%s",
|
||||||
|
(dnt_id,),
|
||||||
|
)
|
||||||
|
row = await cur.fetchone()
|
||||||
|
return row[0] if row else 0
|
||||||
|
|
||||||
|
|
||||||
|
# ─── 设备型号名称缓存 ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
_dev_type_name_cache: dict[int, str] = {}
|
||||||
|
_cache_loaded = False
|
||||||
|
|
||||||
|
|
||||||
|
async def _load_dev_type_names():
|
||||||
|
"""从 tb_vechicle_base_test 加载 type_num → dev_name 映射"""
|
||||||
|
global _dev_type_name_cache, _cache_loaded
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"SELECT type_num, dev_name FROM tb_vechicle_base_test"
|
||||||
|
)
|
||||||
|
rows = await cur.fetchall()
|
||||||
|
_dev_type_name_cache = {row[0]: row[1] for row in rows if row[1]}
|
||||||
|
_cache_loaded = True
|
||||||
|
logger.debug(f"设备型号名称缓存已加载: {_dev_type_name_cache}")
|
||||||
|
|
||||||
|
|
||||||
|
async def get_dev_type_name(dev_type: int) -> str:
|
||||||
|
"""根据设备型号编码获取名称(从 tb_vechicle_base_test 查询,带内存缓存)"""
|
||||||
|
global _cache_loaded
|
||||||
|
if not _cache_loaded:
|
||||||
|
await _load_dev_type_names()
|
||||||
|
return _dev_type_name_cache.get(dev_type, f"Unknown({dev_type})")
|
||||||
|
|
||||||
|
|
||||||
|
async def refresh_dev_type_names():
|
||||||
|
"""刷新型号名称缓存(工装配置页新增型号后调用)"""
|
||||||
|
global _cache_loaded
|
||||||
|
_cache_loaded = False
|
||||||
|
await _load_dev_type_names()
|
||||||
|
|
||||||
|
|
||||||
async def insert_test_result(dnt_id: int, dpg430_addr: int, pcnum: str,
|
async def insert_test_result(dnt_id: int, dpg430_addr: int, pcnum: str,
|
||||||
serialnum: int, sub_type: int, str_type: str,
|
serialnum: int, sub_type: int, str_type: str,
|
||||||
iffinish: str, fault_info: str, relay_out: str,
|
iffinish: str, fault_info: str, relay_out: str,
|
||||||
ppvalue: float, idle_freq: float, enter_freq: float,
|
ppvalue: float, idle_freq: float, enter_freq: float,
|
||||||
exit_freq: float, enter_dist: int, exit_dist: int,
|
exit_freq: float, enter_dist: int, exit_dist: int,
|
||||||
enter_speed: int, exit_speed: int):
|
enter_speed: int, exit_speed: int,
|
||||||
|
test_mode: int = 0, data_source: str = "B2",
|
||||||
|
relay_code: int = 0):
|
||||||
"""插入测试结果到 tb_state_tst"""
|
"""插入测试结果到 tb_state_tst"""
|
||||||
|
coil_id, simulate_car_id = await get_fixture_coil_car_ids(dnt_id)
|
||||||
|
detector_serial = await get_pending_detector_serial(dnt_id)
|
||||||
pool = await get_pool()
|
pool = await get_pool()
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
await cur.execute(
|
await cur.execute(
|
||||||
"""INSERT INTO tb_state_tst
|
"""INSERT INTO tb_state_tst
|
||||||
(dnt_id, dpg430_addr, pcnum, serialnum, sub_type, str_type,
|
(dnt_id, detector_serial, dpg430_addr, pcnum, serialnum, sub_type, str_type,
|
||||||
|
test_mode, data_source,
|
||||||
iffinish, fault_info, relay_out, ppvalue, idle_freq,
|
iffinish, fault_info, relay_out, ppvalue, idle_freq,
|
||||||
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed)
|
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed,
|
||||||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
relay_code, coil_id, simulate_car_id)
|
||||||
(dnt_id, dpg430_addr, pcnum, serialnum, sub_type, str_type,
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
||||||
|
(dnt_id, detector_serial, dpg430_addr, pcnum, serialnum, sub_type, str_type,
|
||||||
|
test_mode, data_source,
|
||||||
iffinish, fault_info, relay_out, ppvalue, idle_freq,
|
iffinish, fault_info, relay_out, ppvalue, idle_freq,
|
||||||
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed),
|
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed,
|
||||||
|
relay_code, coil_id, simulate_car_id),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def insert_wave_data(dnt_id: int, dpg430_addr: int,
|
||||||
|
remain_count: int, relay_code: int,
|
||||||
|
work_freq: float, curr_dist: int, speed: int,
|
||||||
|
near_dist: int, far_dist: int,
|
||||||
|
enter_dist: int, leave_dist: int,
|
||||||
|
relay_out: str = ""):
|
||||||
|
"""插入 0xB4 波动测试上报数据到 tb_state_tst"""
|
||||||
|
coil_id, simulate_car_id = await get_fixture_coil_car_ids(dnt_id)
|
||||||
|
dev_type = await get_fixture_dev_type(dnt_id)
|
||||||
|
str_type = await get_dev_type_name(dev_type) if dev_type else ""
|
||||||
|
detector_serial = await get_pending_detector_serial(dnt_id)
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"""INSERT INTO tb_state_tst
|
||||||
|
(dnt_id, detector_serial, dpg430_addr, sub_type, str_type,
|
||||||
|
test_mode, data_source,
|
||||||
|
relay_out, relay_code,
|
||||||
|
remain_count, work_freq, curr_dist, speed,
|
||||||
|
near_dist, far_dist, b4_enter_dist, b4_leave_dist,
|
||||||
|
coil_id, simulate_car_id)
|
||||||
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
||||||
|
(dnt_id, detector_serial, dpg430_addr, dev_type, str_type,
|
||||||
|
1, "B4",
|
||||||
|
relay_out, relay_code,
|
||||||
|
remain_count, work_freq, curr_dist, speed,
|
||||||
|
near_dist, far_dist, enter_dist, leave_dist,
|
||||||
|
coil_id, simulate_car_id),
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
f"B4波动数据已存储 dnt_id={dnt_id} relay=0x{relay_code:02X} "
|
||||||
|
f"剩余={remain_count} 当前距离={curr_dist}mm 速度={speed}dm/s"
|
||||||
|
f" 最近={near_dist}mm 最远={far_dist}mm"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def set_device_offline(serial: str):
|
async def set_device_offline(serial: str):
|
||||||
"""标记设备离线"""
|
"""标记设备离线(保持向后兼容)"""
|
||||||
pool = await get_pool()
|
pool = await get_pool()
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
@@ -361,6 +633,56 @@ async def set_device_offline(serial: str):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ─── tb_device_log + 设备状态 ───────────────────────────────────────
|
||||||
|
|
||||||
|
async def insert_device_log(serial: str, ip: str, event_type: str,
|
||||||
|
content: str = ""):
|
||||||
|
"""插入设备事件日志"""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"INSERT INTO tb_device_log (device_serial, device_ip, "
|
||||||
|
"event_type, event_content) VALUES (%s,%s,%s,%s)",
|
||||||
|
(serial, ip, event_type, content),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def update_device_status(serial: str, state: int):
|
||||||
|
"""更新 dnt_info 设备状态(0=离线 1=在线 2=通信不良)"""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
if state == 0:
|
||||||
|
await cur.execute(
|
||||||
|
"UPDATE dnt_info SET state=0, last_off=NOW() "
|
||||||
|
"WHERE serial=%s AND state != 0",
|
||||||
|
(serial,),
|
||||||
|
)
|
||||||
|
elif state == 1:
|
||||||
|
await cur.execute(
|
||||||
|
"UPDATE dnt_info SET state=1, last_login=NOW() "
|
||||||
|
"WHERE serial=%s AND state != 1",
|
||||||
|
(serial,),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await cur.execute(
|
||||||
|
"UPDATE dnt_info SET state=%s WHERE serial=%s",
|
||||||
|
(state, serial),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_all_device_serials() -> list[tuple[str, int, str]]:
|
||||||
|
"""获取所有设备 (serial, state, ip),用于状态扫描"""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"SELECT serial, state, ip FROM dnt_info ORDER BY id"
|
||||||
|
)
|
||||||
|
return await cur.fetchall()
|
||||||
|
|
||||||
|
|
||||||
# ─── tb_serialnet CRUD ─────────────────────────────────────────────
|
# ─── tb_serialnet CRUD ─────────────────────────────────────────────
|
||||||
|
|
||||||
async def get_pending_serialnet(dnt_id: int) -> dict | None:
|
async def get_pending_serialnet(dnt_id: int) -> dict | None:
|
||||||
@@ -539,3 +861,20 @@ async def get_vehicle_base_test_by_type(type_num: int) -> dict | None:
|
|||||||
(type_num,),
|
(type_num,),
|
||||||
)
|
)
|
||||||
return await cur.fetchone()
|
return await cur.fetchone()
|
||||||
|
|
||||||
|
|
||||||
|
# ─── tb_pending_detector ───────────────────────────────────────────
|
||||||
|
|
||||||
|
async def get_pending_detector_serial(dnt_id: int) -> str:
|
||||||
|
"""获取待插入的车检器序列号"""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor(aiomysql.DictCursor) as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"SELECT detector_serial FROM tb_pending_detector WHERE dnt_id=%s",
|
||||||
|
(dnt_id,),
|
||||||
|
)
|
||||||
|
row = await cur.fetchone()
|
||||||
|
if row:
|
||||||
|
return row["detector_serial"] or ""
|
||||||
|
return ""
|
||||||
|
|||||||
@@ -34,8 +34,10 @@ from src.handlers import (
|
|||||||
parse_loop,
|
parse_loop,
|
||||||
serialnet_loop,
|
serialnet_loop,
|
||||||
serialnet_response_loop,
|
serialnet_response_loop,
|
||||||
|
device_status_monitor,
|
||||||
set_udp_sender,
|
set_udp_sender,
|
||||||
)
|
)
|
||||||
|
from src.models import insert_device_log
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=getattr(logging, LOG_LEVEL),
|
level=getattr(logging, LOG_LEVEL),
|
||||||
@@ -74,6 +76,7 @@ class EDCProtocol:
|
|||||||
logger.info(f"UDP {msg} from {addr}")
|
logger.info(f"UDP {msg} from {addr}")
|
||||||
|
|
||||||
method = msg.get("Method", "")
|
method = msg.get("Method", "")
|
||||||
|
method_lower = method.lower()
|
||||||
logger.debug(f"UDP {method} from {addr}")
|
logger.debug(f"UDP {method} from {addr}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -81,11 +84,11 @@ class EDCProtocol:
|
|||||||
if method == "Count_Off":
|
if method == "Count_Off":
|
||||||
# 设备登录上报,只处理不回复
|
# 设备登录上报,只处理不回复
|
||||||
await handle_count_off(msg, addr)
|
await handle_count_off(msg, addr)
|
||||||
elif method == "Heartbeat":
|
elif method_lower == "heartbeat":
|
||||||
response = await handle_heartbeat(msg)
|
response = await handle_heartbeat(msg)
|
||||||
elif method == "TSReport":
|
elif method_lower == "tsreport":
|
||||||
response = await handle_tsreport(msg)
|
response = await handle_tsreport(msg)
|
||||||
elif method == "SerialNet":
|
elif method_lower == "serialnet":
|
||||||
response = await handle_serial_net(msg)
|
response = await handle_serial_net(msg)
|
||||||
|
|
||||||
if response and self.transport:
|
if response and self.transport:
|
||||||
@@ -104,24 +107,36 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
|
|||||||
- 紧凑 JSON (无换行)
|
- 紧凑 JSON (无换行)
|
||||||
"""
|
"""
|
||||||
addr = writer.get_extra_info("peername")
|
addr = writer.get_extra_info("peername")
|
||||||
|
addr_ip = addr[0] if addr else ""
|
||||||
logger.info(f"TCP 连接: {addr}")
|
logger.info(f"TCP 连接: {addr}")
|
||||||
|
|
||||||
|
# TCP 连接事件日志
|
||||||
|
try:
|
||||||
|
asyncio.ensure_future(insert_device_log(
|
||||||
|
serial="", ip=addr_ip,
|
||||||
|
event_type="tcp_connect",
|
||||||
|
content=f"TCP 连接: {addr}",
|
||||||
|
))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
buffer = b""
|
buffer = b""
|
||||||
|
|
||||||
async def process_message(msg: dict):
|
async def process_message(msg: dict):
|
||||||
"""处理单条消息并返回响应文本"""
|
"""处理单条消息并返回响应文本"""
|
||||||
logger.info(f"TCP get_rcv {msg} from {addr}")
|
logger.info(f"TCP get_rcv {msg} from {addr}")
|
||||||
method = msg.get("Method", "")
|
method = msg.get("Method", "")
|
||||||
|
method_lower = method.lower()
|
||||||
logger.debug(f"TCP {method} from {addr}")
|
logger.debug(f"TCP {method} from {addr}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if method == "TimeStamp":
|
if method_lower == "timestamp":
|
||||||
return handle_timestamp(msg)
|
return handle_timestamp(msg)
|
||||||
elif method == "TSReport":
|
elif method_lower == "tsreport":
|
||||||
return await handle_tsreport(msg)
|
return await handle_tsreport(msg)
|
||||||
elif method == "SerialNet":
|
elif method_lower == "serialnet":
|
||||||
return await handle_serial_net(msg)
|
return await handle_serial_net(msg)
|
||||||
elif method == "Heartbeat":
|
elif method_lower == "heartbeat":
|
||||||
return await handle_heartbeat(msg)
|
return await handle_heartbeat(msg)
|
||||||
else:
|
else:
|
||||||
logger.debug(f"TCP 未知方法: {method}")
|
logger.debug(f"TCP 未知方法: {method}")
|
||||||
@@ -175,6 +190,15 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
|
|||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
logger.info(f"TCP 断开: {addr}")
|
logger.info(f"TCP 断开: {addr}")
|
||||||
|
# TCP 断开事件日志
|
||||||
|
try:
|
||||||
|
asyncio.ensure_future(insert_device_log(
|
||||||
|
serial="", ip=addr_ip,
|
||||||
|
event_type="tcp_disconnect",
|
||||||
|
content=f"TCP 断开: {addr}",
|
||||||
|
))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
writer.close()
|
writer.close()
|
||||||
await writer.wait_closed()
|
await writer.wait_closed()
|
||||||
|
|
||||||
@@ -187,6 +211,7 @@ async def main():
|
|||||||
asyncio.create_task(parse_loop())
|
asyncio.create_task(parse_loop())
|
||||||
asyncio.create_task(serialnet_loop())
|
asyncio.create_task(serialnet_loop())
|
||||||
asyncio.create_task(serialnet_response_loop())
|
asyncio.create_task(serialnet_response_loop())
|
||||||
|
asyncio.create_task(device_status_monitor())
|
||||||
|
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user