Compare commits

..

24 Commits

Author SHA1 Message Date
wangfq
a2f31b3bfe fix: get_pending_detector_serial 不再删除记录,避免 B2 消费后 B4 拿不到序列号 2026-06-15 16:45:49 +08:00
wangfq
ff9482780d feat: tb_state_tst 增加 detector_serial 字段 + tb_pending_detector 辅助表
- tb_state_tst DDL 增加 detector_serial VARCHAR(45) 列 + V2.4.0 迁移
- 新建 tb_pending_detector 表,用于 web 端暂存待插入的序列号
- insert_test_result / insert_wave_data 插入前从辅助表读取序列号(消费后清除)
- 新增 get_pending_detector_serial() 查询函数
2026-06-15 10:02:19 +08:00
wangfq
6e13990386 fix: 设备型号名称改为从 db tb_vechicle_base_test 动态查询,修复新增型号显示 Unknown 的问题
- handlers.py: B2 数据硬编码 dev_model_map {1:PD132,2:DLD110} → await get_dev_type_name()
- models.py: B4 波动数据硬编码 map → 同上
- models.py: 新增 get_dev_type_name() 带内存缓存,首次加载后缓存 type_num→dev_name
- models.py: 新增 refresh_dev_type_names() 供工装配置页新增型号后刷新缓存
2026-06-12 10:00:25 +08:00
wangfq
3580f89552 feat: role COMMENT 增加 analyst 角色 2026-06-11 17:21:42 +08:00
wangfq
25aafd57c8 feat: V2.3.0 role COMMENT 增加 manager 角色,DDL + ALTER TABLE 迁移 2026-06-11 09:00:27 +08:00
wangfq
cdddfac609 fix: 0xB4 继电器 relay_out 字段与 0xB2 使用相同格式
之前 insert_wave_data 的 relay_out 被硬编码为空字符串,
decode_relay_info 的计算结果未写入。现在增加参数 relay_out
并传入格式化后的继电器状态字符串。
2026-06-10 17:27:38 +08:00
wangfq
944870496a fix: 继电器输出状态解析改为完整的 有/无 描述格式
旧格式: '存在信号; 脉冲信号' (仅显示置位的 bit)
新格式: '存在继电器有输出,脉冲继电器有输出' (始终显示两个 bit 的状态)

bit 0 (x & 0x01): 存在继电器有/无输出
bit 1 (x & 0x02): 脉冲继电器有/无输出
2026-06-10 16:25:31 +08:00
wangfq
c875cf383b fix: 修复 HeartBeat 大小写不匹配导致交互未被记录的问题
根本原因: 设备发送 Method='HeartBeat'(大写B), 代码匹配'Heartbeat'(小写b),
        心跳包被静默忽略, record_interaction 未调用, 导致监控误判为离线。

修复:
- UDP/TCP 方法匹配改为 case-insensitive (method_lower)
- handle_timestamp 增加 record_interaction 调用 (TimeStamp 也是设备交互)
- TCP 连接/断开时写入 tb_device_log 事件日志 (tcp_connect/tcp_disconnect)
2026-06-10 10:01:07 +08:00
wangfq
11f1c4f55b fix: device_status_monitor 增加 dnt_info 全表扫描,修正状态不一致
- 新增 get_all_device_serials() 查询 dnt_info 全表
- device_status_monitor 改为三阶段:
  Phase 1: 遍历 _registry 活跃设备
  Phase 2: 扫描 dnt_info 全表,修正 DB 状态与交互实际不符的情况:
    - state=1(在线) 但 >60s 无交互 → 更新为离线
    - state=0(离线) 但有交互 → 根据交互模式更新
  Phase 3: 预留清理位
- 提取 _apply_state_change() 避免重复代码
- Count_Off 登录时主动设 _device_status[serial]=1,
  防止刚登录只有 1 条交互记录时被误判为 通信不良
2026-06-10 09:36:01 +08:00
wangfq
ef890fafc6 feat: 设备事件日志 + 在线/离线状态监控
- 新增 tb_device_log 表 (device_serial, device_ip, event_type, event_content, create_time)
- dnt_info.state 扩展为 0=离线 1=在线 2=通信不良
- handle_count_off 收到 Count_Off 后写入 login 事件日志
- 新增 device_status_monitor 后台任务,每 5s 检测设备状态:
  - 3次交互间隔均<10s → 在线
  - 1分钟内<4次交互 → 通信不良
  - >1分钟无交互 → 离线
- 状态变化时同步写入 tb_device_log + dnt_info
- 所有设备交互点 (心跳/TSReport/SerialNet/解析成功) 均记录 interaction 时间戳
2026-06-10 09:14:24 +08:00
wangfq
3a74759066 fix(B4): 波动测试记录写入 sub_type/str_type,从工装配置 DevType 获取
- 新增 get_fixture_dev_type() 查询工装配的 DevType
- insert_wave_data 自动补充 sub_type(型号编码) 和 str_type(PD132/DLD110)
2026-06-08 11:31:42 +08:00
wangfq
68c6d0bcfe feat(db): 新增线圈参数表/模拟车辆参数表 + tb_fixture_param/tb_state_tst 关联字段
- CREATE TABLE tb_coil_info (线圈编号/名称/电感量/形状/尺寸/圈数/电阻/材质/备注)
- CREATE TABLE tb_simulate_car (模拟编号/名称/形状/尺寸/材质/备注)
- ALTER TABLE tb_fixture_param ADD coil_id, simulate_car_id
- ALTER TABLE tb_state_tst ADD coil_id, simulate_car_id
- 新增 get_fixture_coil_car_ids() 查询当前关联
- insert_test_result/insert_wave_data 自动从 fixture 获取线圈/车辆并记录
2026-06-08 10:42:04 +08:00
wangfq
844de70017 feat: relay_code 存储+解析 — B2/B4 以原始 int 值写入 tb_state_tst
- insert_test_result 增加 relay_code 参数
- insert_wave_data: relay_out 改为 relay_code (int)
- handlers.py: B2/B4 解析传递 status.relay_out 原始值
- relay_out VARCHAR 列保留兼容
2026-06-05 17:56:52 +08:00
wangfq
dc1d2b8871 fix: 恢复 FarStay 为 2 字节 — 与 NearStay 一致 2026-06-03 13:40:18 +08:00
wangfq
7e5fe2cccd fix: FarStay 2B→1B 匹配设备固件,修正 parse_4c_params 偏移 2026-06-03 11:52:37 +08:00
wangfq
6724af7951 fix: ALTER TABLE 迁移补充 tb_fixture_param 缺失的 V2.0.3 波动参数字段
CREATE TABLE IF NOT EXISTS 不会更新已存在的旧表,
新增 ALTER TABLE ADD COLUMN 自动迁移逻辑(列已存在时忽略)
2026-06-02 18:33:22 +08:00
wangfq
2d6c9f03dd feat: DG430 V2.0.3 — 波动测试模式支持
- dg430.py: 新增 DG430WaveStatus + parse_b4_wave_status() 0xB4解析
- dg430.py: 0x4C 扩展6字段(向后兼容旧版长度)
- models.py: tb_fixture_param DDL + upsert 新增6个波动参数
- handlers.py: parse_loop 添加0xB4处理; 0x4C传参扩展
- TestMode=1 模拟过车→波动测试 (注释)
2026-06-02 18:06:07 +08:00
wangfq
6ecc653133 chore: 精简 requirements.txt — 仅保留直接依赖 aiomysql+uvloop,移除未使用的传递依赖 2026-06-01 08:34:39 +08:00
wangfq
e7c20c69d2 feat: 工装配置功能 — 新增 0x4A~0x4E 协议解析、tb_fixture_param/tb_vechicle_base_test 表、SerialNet 响应匹配
- dg430.py: 新增 parse_4a_version, parse_flag_response, parse_4c_params, get_packet_cmd
- handlers.py: parse_loop 增加 0x4C/非B2 指令处理,0x4C 响应自动更新 tb_fixture_param
- handlers.py: 测试指令(B0/B1/BA/BB/BC)Flag 不匹配 serialnet,仅 B2 标记完成
- handlers.py: 新增 serialnet_response_loop 处理 dat_type=9
- models.py: 新增 tb_fixture_param + tb_vechicle_base_test 表 DDL + CRUD
- server.py: 注册 serialnet_response_loop
2026-05-29 17:25:54 +08:00
wangfq
43fd3e7be9 feat: 新增 tb_user、tb_log 表 2026-05-28 13:58:08 +08:00
wangfq
df461362f5 fix: serialnet_loop 改为直接查 tb_serialnet,不依赖 _registry 2026-05-28 12:08:29 +08:00
wangfq
85e7f66b19 fix: SerialNet 下发目标端口改为设备UDP 4900 2026-05-28 10:51:12 +08:00
wangfq
d6e169ce12 chore: 默认 MySQL 用户 dg / 密码 123456 2026-05-28 10:31:21 +08:00
wangfq
a1f4dcd4bf fix: callable 不是类型,改为 object 注解 2026-05-28 10:00:08 +08:00
6 changed files with 1287 additions and 113 deletions

View File

@@ -1,2 +1,2 @@
uvloop>=0.19.0
aiomysql>=0.2.0
aiomysql>=0.3.2
uvloop>=0.22.0

View File

@@ -8,11 +8,15 @@ UDP_MSG_PORT = int(os.getenv("EDC_UDP_MSG_PORT", "5505"))
TCP_PORT = int(os.getenv("EDC_TCP_PORT", "5550"))
BIND_HOST = os.getenv("EDC_BIND_HOST", "0.0.0.0")
# 设备端接收端口
DEVICE_UDP_PORT = int(os.getenv("EDC_DEVICE_UDP_PORT", "4900"))
DEVICE_TCP_PORT = int(os.getenv("EDC_DEVICE_TCP_PORT", "5550"))
# MySQL
MYSQL_HOST = os.getenv("EDC_MYSQL_HOST", "127.0.0.1")
MYSQL_PORT = int(os.getenv("EDC_MYSQL_PORT", "3306"))
MYSQL_USER = os.getenv("EDC_MYSQL_USER", "root")
MYSQL_PASSWORD = os.getenv("EDC_MYSQL_PASSWORD", "")
MYSQL_USER = os.getenv("EDC_MYSQL_USER", "dg")
MYSQL_PASSWORD = os.getenv("EDC_MYSQL_PASSWORD", "123456")
MYSQL_DB = os.getenv("EDC_MYSQL_DB", "edc")
# 连接池

View File

@@ -19,7 +19,7 @@ class DG430Status:
"""解析后的 DG430 状态数据"""
addr: int # 地址
dev_model: int # 1 PD132, 2 DLD110
test_mode: int # 0 灵敏度, 1 模拟过车
test_mode: int # 0 灵敏度, 1 波动测试
is_finished: bool # 是否正常完成
finish_code: int # 0 正常, 1 未完成, 2 地感死机
fault: int # bitmask
@@ -57,6 +57,11 @@ def _le16(data: bytes, offset: int) -> int:
return data[offset] | (data[offset + 1] << 8)
def _be16(data: bytes, offset: int) -> int:
"""大端 2 字节 → int"""
return (data[offset] << 8) | data[offset + 1]
def verify_packet(data: bytes) -> bool:
"""校验数据包完整性"""
if len(data) < PKT_MIN_LEN:
@@ -145,7 +150,7 @@ def parse_b2_status(data: bytes) -> DG430Status | None:
# 设备型号
dev_model = payload[0] # 1=PD132, 2=DLD110
test_mode = payload[1] # 0=灵敏度, 1=模拟过车
test_mode = payload[1] # 0=灵敏度, 1=波动测试
finish_code = payload[2] # 0=正常, 1=未完成, 2=死机
fault = payload[3] # bitmask
relay_out = payload[4] # bitmask
@@ -186,6 +191,70 @@ def parse_b2_status(data: bytes) -> DG430Status | None:
)
# ─── 0xB4 波动测试上报 ──────────────────────────────────────────
@dataclass
class DG430WaveStatus:
"""波动测试状态上报数据"""
addr: int # 地址
remain_count: int # 剩余波动次数
relay_out: int # 继电器输出 bitmask
work_freq: float # 工作频率 Hz
curr_dist: int # 当前距离 mm
speed: int # 当前速度 dm/s
near_dist: int # 波动最近距离 mm
far_dist: int # 波动最远距离 mm
enter_dist: int # 进入高度 mm
leave_dist: int # 离开高度 mm
def parse_b4_wave_status(data: bytes) -> DG430WaveStatus | None:
"""解析 0xB4 波动测试状态上报包
格式: STX | ADDR | 11 | B4 | DATA(16B) | XOR | SUM
DATA: RemainCount(1) | Relay(1) | WorkFreq(2 LE) | CurrDist(2 LE) |
Speed(2 LE) | NearDist(2 LE) | FarDist(2 LE) |
EnterDist(2 LE) | LeaveDist(2 LE)
"""
if not verify_packet(data):
logger.warning("DG430 B4 数据包校验失败")
return None
cmd = data[3]
if cmd != 0xB4:
logger.debug(f"非 B4 指令: 0x{cmd:02X}")
return None
payload = data[4:3 + data[2]]
if len(payload) < 16:
logger.warning(f"B4 数据长度不足: {len(payload)} < 16")
return None
addr = data[1] & 0x7F
remain_count = payload[0]
relay_out = payload[1]
work_freq = _le16(payload, 2) * 10.0
curr_dist = _le16(payload, 4)
speed = _le16(payload, 6)
near_dist = _le16(payload, 8)
far_dist = _le16(payload, 10)
enter_dist = _le16(payload, 12)
leave_dist = _le16(payload, 14)
return DG430WaveStatus(
addr=addr,
remain_count=remain_count,
relay_out=relay_out,
work_freq=round(work_freq, 1),
curr_dist=curr_dist,
speed=speed,
near_dist=near_dist,
far_dist=far_dist,
enter_dist=enter_dist,
leave_dist=leave_dist,
)
# ─── 故障解码 ───────────────────────────────────────────────────────
FAULT_BITS = {
@@ -211,9 +280,157 @@ def decode_fault_info(fault: int) -> str:
def decode_relay_info(relay: int) -> str:
"""解码继电器 bitmask"""
items = []
for bit, desc in RELAY_BITS.items():
if relay & (1 << bit):
items.append(desc)
return "; ".join(items) if items else "输出"
"""解码继电器输出状态为可读字符串
0xB2 继电器输出状态原始值 x 的解析规则:
- x & 0x01 为真 → "存在继电器有输出",否则 "存在继电器无输出"
- x & 0x02 为真 → "脉冲继电器有输出",否则 "脉冲继电器无输出"
汇总格式: "存在继电器有输出,脉冲继电器有输出"
"""
exist = "存在继电器有输出" if (relay & 0x01) else "存在继电器无输出"
pulse = "脉冲继电器有输出" if (relay & 0x02) else "脉冲继电器无输出"
return f"{exist}{pulse}"
# ─── 0x4A 获取设备版本号响应 ──────────────────────────────────────
@dataclass
class DG430Version:
addr: int
hw_major: int
hw_minor: int
hw_patch: int
sw_major: int
sw_minor: int
sw_patch: int
def parse_4a_version(data: bytes) -> DG430Version | None:
"""解析 0x4A 版本号响应
格式: 7F | ADDR | 08 | 4A | 00 | HW(3B) | SW(3B) | XOR | SUM
"""
if not verify_packet(data):
return None
cmd = data[3]
if cmd != 0x4A:
return None
payload = data[4:3 + data[2]]
if len(payload) < 7:
return None
addr = data[1] & 0x7F
return DG430Version(
addr=addr,
hw_major=payload[1],
hw_minor=payload[2],
hw_patch=payload[3],
sw_major=payload[4],
sw_minor=payload[5],
sw_patch=payload[6],
)
# ─── 通用 Flag 响应 (0x4B/0x4D/0x4E) ─────────────────────────────
def parse_flag_response(data: bytes, expected_cmd: int) -> int | None:
"""解析 Flag 响应格式: STX | ADDR | 02 | CMD | Flag | XOR | SUM
Returns:
Flag 值 (0=正常, 1=故障), 或 None 表示解析失败
"""
if not verify_packet(data):
return None
cmd = data[3]
if cmd != expected_cmd:
return None
if data[2] < 2:
return None
return data[4] # Flag
# ─── 0x4C 查询设备测试参数响应 ────────────────────────────────────
@dataclass
class DG430FixtureParams:
addr: int
flag: int # 0=正常, 1=故障
dev_addr: int # 设备地址
dev_type: int # 设备型号
test_mode: int # 0 灵敏度, 1 波动测试
reset_dis: int # 复位距离 cm
minus_dis: int # 皮距 cm
sens_min: int # 灵敏度最小值
sens_max: int # 灵敏度最大值
fre_min: int # 频率最小值 Hz
fre_max: int # 频率最大值 Hz
peak_min: int # 峰峰值最小值
peak_max: int # 峰峰值最大值
far_tol: int # 最远容差 cm
near_tol: int # 最近容差 cm
step_tol: int # 步进容差 cm
back_forth: int # 来回次数
near_stay: int # 最近停留时间 ms
far_stay: int # 最远停留时间 ms
def parse_4c_params(data: bytes) -> DG430FixtureParams | None:
"""解析 0x4C 查询测试参数响应 (V2.0.3 扩展)
格式: 7F | ADDR | 1B | 4C | Flag | Addr | DevType | TestMode |
ResetDis | MinusDis | SensMin(2) | SensMax(2) |
FreMin(2) | FreMax(2) | PeakMin(2) | PeakMax(2) |
FarTol(1) | NearTol(1) | StepTol(1) | BackForth(1) |
NearStay(2) | FarStay(2) | XOR | SUM
"""
if not verify_packet(data):
return None
cmd = data[3]
if cmd != 0x4C:
return None
payload = data[4:3 + data[2]]
if len(payload) < 18:
return None
addr = data[1] & 0x7F
# 0x4B/0x4C 多字节字段为小端序
# V2.0.3 新增6个波动参数字段兼容旧版长度不足时默认为0
far_tol = payload[18] if len(payload) >= 19 else 0
near_tol = payload[19] if len(payload) >= 20 else 0
step_tol = payload[20] if len(payload) >= 21 else 0
back_forth = payload[21] if len(payload) >= 22 else 0
near_stay = _le16(payload, 22) if len(payload) >= 24 else 0
far_stay = _le16(payload, 24) if len(payload) >= 26 else 0
return DG430FixtureParams(
addr=addr,
flag=payload[0],
dev_addr=payload[1],
dev_type=payload[2],
test_mode=payload[3],
reset_dis=payload[4],
minus_dis=payload[5],
sens_min=_le16(payload, 6),
sens_max=_le16(payload, 8),
fre_min=_le16(payload, 10),
fre_max=_le16(payload, 12),
peak_min=_le16(payload, 14),
peak_max=_le16(payload, 16),
far_tol=far_tol,
near_tol=near_tol,
step_tol=step_tol,
back_forth=back_forth,
near_stay=near_stay,
far_stay=far_stay,
)
# ─── 获取数据包 CMD用于匹配────────────────────────────────────
def get_packet_cmd(data: bytes) -> int | None:
"""从数据包中提取 CMD 字节"""
if len(data) < 4:
return None
return data[3]

View File

@@ -12,6 +12,7 @@ import asyncio
import json
import logging
import time
from collections import deque
from datetime import datetime
from src.models import (
@@ -19,16 +20,28 @@ from src.models import (
ensure_collect_table,
insert_collect_data,
fetch_unparsed,
fetch_unparsed_serial,
mark_record_state,
insert_test_result,
insert_wave_data,
get_dnt_by_serial,
get_pending_serialnet,
mark_serialnet_sent,
mark_serialnet_done,
mark_serialnet_timeout,
upsert_fixture_param,
insert_device_log,
update_device_status,
get_all_device_serials,
get_dev_type_name,
)
from src.dg430 import (
parse_b2_status,
parse_4a_version,
parse_flag_response,
parse_4c_params,
parse_b4_wave_status,
get_packet_cmd,
hex_str_to_bytes,
split_packets,
verify_packet,
@@ -50,8 +63,14 @@ _registry: dict[str, int] = {}
# 设备心跳时间: {device_id: last_heartbeat}
_heartbeat: dict[str, float] = {}
# 设备交互时间记录: {device_id: deque[float]} (最近 60s 内)
_interactions: dict[str, deque] = {}
# 设备当前状态: {device_id: int} (0=离线 1=在线 2=通信不良)
_device_status: dict[str, int] = {}
# UDP transport 引用,由 server.py 注入
_udp_sender: callable | None = None
_udp_sender: object | None = None
def set_udp_sender(sender):
@@ -60,6 +79,18 @@ def set_udp_sender(sender):
_udp_sender = sender
def record_interaction(device_id: str):
"""记录一次设备交互(心跳/上报/解析成功)"""
now = time.time()
if device_id not in _interactions:
_interactions[device_id] = deque()
_interactions[device_id].append(now)
# 清理 120s 前的旧记录
cutoff = now - 120
while _interactions[device_id] and _interactions[device_id][0] < cutoff:
_interactions[device_id].popleft()
async def handle_count_off(data: dict, addr: tuple):
"""处理设备登录/身份上报 (Count_Off 返回格式)
@@ -117,9 +148,20 @@ async def handle_count_off(data: dict, addr: tuple):
)
_registry[serial] = dnt_id
_heartbeat[serial] = time.time()
record_interaction(serial)
_device_status[serial] = 1 # 登录即视为在线状态
await ensure_collect_table(serial)
# 登录事件日志
try:
await insert_device_log(
serial=serial, ip=dev_ip, event_type="login",
content=f"设备上线 type={dev_type} ver={dev_version}",
)
except Exception:
pass
logger.info(
f"设备登录: {serial} dnt_id={dnt_id} ip={dev_ip}:{dev_port} "
f"type={dev_type} ver={dev_version}"
@@ -135,6 +177,7 @@ async def handle_heartbeat(data: dict) -> str | None:
return None
_heartbeat[device_id] = time.time()
record_interaction(device_id)
try:
await insert_collect_data(device_id, 0, str(data))
@@ -145,21 +188,11 @@ async def handle_heartbeat(data: dict) -> str | None:
def handle_timestamp(data: dict) -> str:
"""处理时间同步请求
TimeStamp 请求格式:
{
"Method": "TimeStamp",
"Params": {
"Device_id": "2345",
"TimeZone": "Asia/Shanghai"
}
}
返回格式参考 PGLC 协议 3.5 节。
"""
"""处理时间同步请求(也是设备交互)"""
params = data.get("Params", {})
device_id = params.get("Device_id", "")
if device_id:
record_interaction(device_id)
return make_timestamp_response(device_id, int(time.time()))
@@ -178,6 +211,8 @@ async def handle_tsreport(data: dict) -> str | None:
if not device_id or not sub_dat:
return None
record_interaction(device_id)
try:
await insert_collect_data(device_id, 8, sub_dat)
except Exception as e:
@@ -201,6 +236,8 @@ async def handle_serial_net(data: dict) -> str | None:
if not device_id or not serial_dat:
return None
record_interaction(device_id)
try:
await insert_collect_data(device_id, 9, serial_dat)
except Exception as e:
@@ -237,17 +274,14 @@ async def parse_loop():
await mark_record_state(device_id, rec["id"], state=3)
continue
has_valid_b2 = False
has_valid = False
all_failed = True
for pkt in packets:
cmd = pkt[3] if len(pkt) > 3 else 0
# 只处理 B2 状态上报
if cmd != 0xB2:
logger.debug(f"跳过非 B2 指令: 0x{cmd:02X}")
continue
# ── B2 状态上报 ──
if cmd == 0xB2:
if not verify_packet(pkt):
logger.warning(f"B2 数据包校验失败: {device_id} rec={rec['id']}")
continue
@@ -260,8 +294,7 @@ async def parse_loop():
fault_info = decode_fault_info(status.fault)
relay_info = decode_relay_info(status.relay_out)
dev_model_map = {1: "PD132", 2: "DLD110"}
str_type = dev_model_map.get(status.dev_model, f"Unknown({status.dev_model})")
str_type = await get_dev_type_name(status.dev_model)
await insert_test_result(
dnt_id=dnt_id,
@@ -281,12 +314,14 @@ async def parse_loop():
exit_dist=status.exit_dist,
enter_speed=status.enter_speed,
exit_speed=status.exit_speed,
test_mode=status.test_mode,
relay_code=status.relay_out,
)
# 匹配 tb_serialnet 中的待确认记录 (state=1)
# 匹配 tb_serialnet 中的待确认记录
await _match_serialnet_response(dnt_id, raw)
has_valid_b2 = True
has_valid = True
all_failed = False
logger.info(
f"解析完成: {device_id} dg430={status.addr} "
@@ -294,13 +329,121 @@ async def parse_loop():
f"进入高度={status.enter_dist}mm 故障={fault_info}"
)
if has_valid_b2:
# ── 0x4C 查询工装参数响应 ──
elif cmd == 0x4C:
if not verify_packet(pkt):
logger.warning(f"0x4C 数据包校验失败: {device_id}")
continue
params = parse_4c_params(pkt)
if params is None:
logger.warning(f"0x4C 解析失败: {device_id}")
continue
if params.flag == 0:
await upsert_fixture_param(
dnt_id,
Addr=params.dev_addr,
DevType=params.dev_type,
TestMode=params.test_mode,
RestDis=params.reset_dis,
MinusDis=params.minus_dis,
SensMin=params.sens_min,
SensMax=params.sens_max,
FreMin=params.fre_min,
FreMax=params.fre_max,
PeakMin=params.peak_min,
PeakMax=params.peak_max,
FarTol=params.far_tol,
NearTol=params.near_tol,
StepTol=params.step_tol,
BackForth=params.back_forth,
NearStay=params.near_stay,
FarStay=params.far_stay,
)
logger.info(
f"0x4C 工装参数已更新 dnt_id={dnt_id} "
f"DevType={params.dev_type} TestMode={params.test_mode}"
)
await _match_serialnet_response(dnt_id, raw)
has_valid = True
all_failed = False
# ── 0xB4 波动测试上报 ──
elif cmd == 0xB4:
if not verify_packet(pkt):
logger.debug(f"0xB4 数据包校验失败: {device_id}")
continue
wave = parse_b4_wave_status(pkt)
if wave is None:
logger.warning(f"0xB4 解析失败: {device_id}")
continue
relay_info = decode_relay_info(wave.relay_out)
await insert_wave_data(
dnt_id=dnt_id,
dpg430_addr=wave.addr,
remain_count=wave.remain_count,
relay_code=wave.relay_out,
relay_out=relay_info,
work_freq=wave.work_freq,
curr_dist=wave.curr_dist,
speed=wave.speed,
near_dist=wave.near_dist,
far_dist=wave.far_dist,
enter_dist=wave.enter_dist,
leave_dist=wave.leave_dist,
)
logger.info(
f"B4波动上报: {device_id} 剩余={wave.remain_count} "
f"当前距离={wave.curr_dist}mm 速度={wave.speed}dm/s "
f"最近={wave.near_dist}mm 最远={wave.far_dist}mm "
f"进入={wave.enter_dist}mm 离开={wave.leave_dist}mm "
f"继电器={relay_info}"
)
has_valid = True
all_failed = False
# ── 测试指令 Flag 响应(不匹配 serialnet等待 B2 完成)──
elif cmd in (0xB0, 0xB1, 0xBA, 0xBB, 0xBC):
if not verify_packet(pkt):
logger.debug(f"测试指令 0x{cmd:02X} 校验失败, "
f"rec={rec['id']}")
continue
# 不匹配 serialnet — B2 状态上报才标记完成
has_valid = True
all_failed = False
logger.info(
f"测试指令 0x{cmd:02X} Flag 响应 dnt_id={dnt_id}"
)
# ── 配置指令响应 ──
elif cmd in (0x4A, 0x4B, 0x4D, 0x4E):
if not verify_packet(pkt):
logger.debug(f"配置指令 0x{cmd:02X} 校验失败, "
f"跳过 rec={rec['id']}")
continue
await _match_serialnet_response(dnt_id, raw)
has_valid = True
all_failed = False
logger.info(
f"配置指令 0x{cmd:02X} 响应已匹配 "
f"dnt_id={dnt_id}"
)
else:
logger.debug(f"跳过未知/不支持指令: 0x{cmd:02X}")
if has_valid:
record_interaction(device_id)
await mark_record_state(device_id, rec["id"], state=1)
elif all_failed and packets:
await mark_record_state(device_id, rec["id"], state=3)
logger.warning(f"记录 {rec['id']} 所有包校验失败, state=3")
else:
# 有非 B2 包但没有 B2也算已处理无解析目标
await mark_record_state(device_id, rec["id"], state=1)
except Exception as e:
@@ -345,6 +488,7 @@ SERIALNET_TIMEOUT = 10 # 秒
async def serialnet_loop():
"""后台轮询:检查 tb_serialnet 待发送指令,通过 UDP 下发
直接查询 tb_serialnet 表(不依赖 _registry
1. state=0 → 发送 SerialNet JSON → state=1
2. state=1 且超过 10 秒 → state=3 (超时失败)
"""
@@ -353,14 +497,11 @@ async def serialnet_loop():
while True:
try:
for device_id, dnt_id in list(_registry.items()):
# 1. 下发待发送指令
pending = await get_pending_serialnet(dnt_id)
if pending:
await _send_serialnet_cmd(device_id, dnt_id, pending)
# 1. 查询所有 state=0 的记录
await _process_pending_all()
# 2. 超时检测
await _check_serialnet_timeout(dnt_id)
# 2. 查询所有 state=1 超时的记录
await _check_timeout_all()
except Exception as e:
logger.error(f"透传轮询异常: {e}")
@@ -368,6 +509,49 @@ async def serialnet_loop():
await asyncio.sleep(0.2)
async def _process_pending_all():
"""查询所有 state=0 的记录并发送"""
from src.models import get_pool
import aiomysql
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT sn.*, d.serial, d.ip FROM tb_serialnet sn "
"JOIN dnt_info d ON sn.dnt_id = d.id "
"WHERE sn.state = 0 ORDER BY sn.id ASC LIMIT 10"
)
rows = await cur.fetchall()
for row in rows:
device_id = row["serial"]
dnt_id = row["dnt_id"]
if not row.get("ip"):
logger.warning(f"设备 {device_id} 无 IP跳过")
await mark_serialnet_timeout(row["id"])
continue
await _send_serialnet_cmd(device_id, dnt_id, row)
async def _check_timeout_all():
"""检查所有 state=1 超时记录"""
from src.models import get_pool
import aiomysql
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT id, dnt_id FROM tb_serialnet WHERE state=1 "
"AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) "
"LIMIT 20",
(SERIALNET_TIMEOUT,),
)
rows = await cur.fetchall()
for row in rows:
await mark_serialnet_timeout(row["id"])
logger.warning(f"tb_serialnet #{row['id']} 超时 dnt_id={row['dnt_id']}")
async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict):
"""构造 SerialNet JSON 并通过 UDP 发送给设备"""
if _udp_sender is None:
@@ -376,12 +560,13 @@ async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict):
# 获取设备 IP 和 msgport
dnt = await get_dnt_by_serial(device_id)
if not dnt or not dnt.get("ip") or not dnt.get("msgport"):
logger.warning(f"设备 {device_id} 无 IP/msgport 信息,跳过")
if not dnt or not dnt.get("ip"):
logger.warning(f"设备 {device_id} 无 IP 信息,跳过")
return
from src.config import DEVICE_UDP_PORT
send_pkg = record["send_pkg"]
addr = (dnt["ip"], dnt["msgport"])
addr = (dnt["ip"], DEVICE_UDP_PORT)
# 构造 SerialNet JSON
msg = {
@@ -399,20 +584,261 @@ async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict):
logger.info(f"已发送 SerialNet → {device_id} ({addr}): {send_pkg}")
async def _check_serialnet_timeout(dnt_id: int):
"""检查 state=1 超时记录"""
# ─── SerialNet 响应处理轮询 ──────────────────────────────────────────
async def serialnet_response_loop():
"""后台轮询:处理 SerialNet 响应 (dat_type=9)
当设备通过 SerialNet 返回 0x4A-0x4E 等指令的响应时,
匹配 tb_serialnet 中 state=1 的记录并标记完成。
对于 0x4C 响应,同时更新 tb_fixture_param。
"""
logger.info("SerialNet 响应处理服务启动")
await asyncio.sleep(3) # 等前面所有服务就绪
while True:
try:
for device_id, dnt_id in list(_registry.items()):
records = await fetch_unparsed_serial(device_id)
for rec in records:
try:
raw = rec["raw_data"]
pkt_bytes = hex_str_to_bytes(raw)
packets = split_packets(pkt_bytes)
if not packets:
await mark_record_state(device_id, rec["id"], state=3)
continue
for pkt in packets:
cmd = get_packet_cmd(pkt)
if cmd is None:
continue
# 跳过 B2 (已在 parse_loop 处理)
if cmd == 0xB2:
continue
# 匹配 tb_serialnet 中 state=1 且 CMD 匹配的记录
await _match_serial_cmd(dnt_id, cmd, raw)
# 对于 0x4C解析参数并更新数据库
if cmd == 0x4C and verify_packet(pkt):
params = parse_4c_params(pkt)
if params and params.flag == 0:
await upsert_fixture_param(
dnt_id,
Addr=params.dev_addr,
DevType=params.dev_type,
TestMode=params.test_mode,
RestDis=params.reset_dis,
MinusDis=params.minus_dis,
SensMin=params.sens_min,
SensMax=params.sens_max,
FreMin=params.fre_min,
FreMax=params.fre_max,
PeakMin=params.peak_min,
PeakMax=params.peak_max,
)
logger.info(
f"已更新工装参数 dnt_id={dnt_id} "
f"DevType={params.dev_type} "
f"TestMode={params.test_mode}"
)
# 对于 0x4A记录版本信息
if cmd == 0x4A and verify_packet(pkt):
ver = parse_4a_version(pkt)
if ver:
logger.info(
f"DG430 版本: hw={ver.hw_major}.{ver.hw_minor}.{ver.hw_patch} "
f"sw={ver.sw_major}.{ver.sw_minor}.{ver.sw_patch}"
)
await mark_record_state(device_id, rec["id"], state=1)
except Exception as e:
logger.error(
f"SerialNet 响应处理异常 rec={rec['id']}: {e}",
exc_info=True,
)
try:
await mark_record_state(device_id, rec["id"], state=3)
except Exception:
pass
except Exception as e:
logger.error(f"SerialNet 响应循环异常: {e}")
await asyncio.sleep(0.5)
async def _match_serial_cmd(dnt_id: int, cmd: int, raw_hex: str):
"""匹配 tb_serialnet 中 state=1 且 CMD 匹配的记录"""
try:
from src.models import get_pool
import aiomysql
pool = await get_pool()
cmd_hex = f"{cmd:02X}"
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 "
"AND update_time < DATE_SUB(NOW(), INTERVAL %s SECOND) "
"LIMIT 5",
(dnt_id, SERIALNET_TIMEOUT),
"AND UPPER(SUBSTRING(send_pkg, 7, 2)) = %s "
"ORDER BY id ASC LIMIT 1",
(dnt_id, cmd_hex),
)
rows = await cur.fetchall()
for row in rows:
await mark_serialnet_timeout(row["id"])
logger.warning(f"tb_serialnet #{row['id']} 超时 (>{SERIALNET_TIMEOUT}s)")
row = await cur.fetchone()
if row:
await mark_serialnet_done(row["id"], raw_hex)
logger.info(
f"tb_serialnet #{row['id']} CMD=0x{cmd:02X} 已确认完成 "
f"(dnt_id={dnt_id})"
)
except Exception as e:
logger.warning(f"匹配 serialnet cmd 0x{cmd:02X} 失败: {e}")
# ─── 设备状态监控服务 ──────────────────────────────────────────────────
# 在线判定参数
INTERACTION_TIMEOUT = 10 # 单次交互超时判定 (秒)
ONLINE_MIN_INTERACTIONS = 3 # 连续几次交互在超时内即表示在线
OFFLINE_IDLE_SEC = 60 # 超过此时间无交互 → 离线
POOR_MIN_INTERACTIONS = 4 # 1 分钟内少于此次数 → 通信不良
MONITOR_INTERVAL = 5 # 状态检查间隔 (秒)
async def device_status_monitor():
"""后台轮询:监控所有设备在线/离线状态
两个阶段:
Phase 1 — 遍历 _registry 中活跃设备,根据交互记录判定状态
Phase 2 — 扫描 dnt_info 全表,修正与实际交互不符的状态:
- state=1(在线) 但 >60s 无交互 → 更新为离线
- state=0(离线) 但有交互记录 → 根据交互模式更新为在线/通信不良
- state 与计算值不一致 → 同步修正
"""
logger.info("设备状态监控服务启动")
await asyncio.sleep(5) # 等其它服务就绪
while True:
try:
now = time.time()
# ── Phase 1: 活跃设备(在 _registry 中)────
for device_id, dnt_id in list(_registry.items()):
interactions = _interactions.get(device_id)
actual_state = _calc_device_state(interactions, now) if interactions else 0
old_state = _device_status.get(device_id, 1) # 注册时默认在线
if actual_state != old_state:
await _apply_state_change(device_id, actual_state, old_state)
# ── Phase 2: 扫描 dnt_info 全表,修正不一致 ──
try:
rows = await get_all_device_serials()
except Exception as e:
logger.error(f"查询 dnt_info 失败: {e}")
rows = []
for serial, db_state, ip in rows:
if not serial:
continue
interactions = _interactions.get(serial)
actual_state = _calc_device_state(interactions, now) if interactions else 0
memory_state = _device_status.get(serial)
# 优先用内存状态做基准(更实时);无内存状态则用 DB 状态
tracked_state = memory_state if memory_state is not None else db_state
if actual_state != tracked_state:
await _apply_state_change(serial, actual_state, tracked_state, ip=ip)
# ── Phase 3: 清理 _registry 中已经不存在的设备 ──
# (已注销 / 长期无交互的设备不清除,继续监控)
except Exception as e:
logger.error(f"设备状态监控异常: {e}")
await asyncio.sleep(MONITOR_INTERVAL)
async def _apply_state_change(device_id: str, new_state: int, old_state: int,
ip: str = ""):
"""应用状态变更更新内存、dnt_info、写入日志"""
_device_status[device_id] = new_state
logger.info(
f"设备 {device_id} 状态变更: "
f"{_state_name(old_state)}{_state_name(new_state)}"
)
# 更新 dnt_info
try:
await update_device_status(device_id, new_state)
except Exception as e:
logger.error(f"更新设备状态失败: {e}")
# 获取设备 IP未传入时从 DB 查)
if not ip:
try:
dnt = await get_dnt_by_serial(device_id)
ip = dnt.get("ip", "") if dnt else ""
except Exception:
pass
# 写入事件日志
event_type, content = _state_event(new_state, old_state)
try:
await insert_device_log(
serial=device_id, ip=ip,
event_type=event_type, content=content,
)
except Exception as e:
logger.error(f"写入设备事件日志失败: {e}")
def _calc_device_state(interactions: deque, now: float) -> int:
"""根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)"""
# 清理过期记录(仅保留 60s 内)
cutoff = now - OFFLINE_IDLE_SEC
recent = [t for t in interactions if t >= cutoff]
if not recent:
return 0 # 离线
# 最近一次交互距今
last_interaction = recent[-1]
if now - last_interaction > OFFLINE_IDLE_SEC:
return 0 # 离线
# 1 分钟内交互次数
if len(recent) < POOR_MIN_INTERACTIONS:
return 2 # 通信不良
# 最近 3 次交互间隔是否都在超时内
if len(recent) >= ONLINE_MIN_INTERACTIONS:
last_n = recent[-ONLINE_MIN_INTERACTIONS:]
gaps = [last_n[i] - last_n[i - 1] for i in range(1, len(last_n))]
if gaps and all(g <= INTERACTION_TIMEOUT for g in gaps):
return 1 # 在线
# 默认:有交互但不够密集 → 通信不良
return 2
def _state_name(state: int) -> str:
return {0: "离线", 1: "在线", 2: "通信不良"}.get(state, f"未知({state})")
def _state_event(new_state: int, old_state: int) -> tuple[str, str]:
"""根据状态变化生成事件类型和内容"""
name_new = _state_name(new_state)
name_old = _state_name(old_state)
if new_state == 0:
return "offline", f"设备离线(上次状态: {name_old}"
elif new_state == 1:
return "online", f"设备已上线(上次状态: {name_old}"
else:
return "poor", f"设备通信不良(上次状态: {name_old}"

View File

@@ -100,11 +100,14 @@ async def _create_tables(pool: aiomysql.Pool):
CREATE TABLE IF NOT EXISTS `tb_state_tst` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`dnt_id` INT NOT NULL COMMENT 'FK → dnt_info.id',
`detector_serial` VARCHAR(45) DEFAULT '' COMMENT '车检器序列号',
`dpg430_addr` TINYINT DEFAULT 0,
`pcnum` VARCHAR(10) DEFAULT '' COMMENT '批次号',
`serialnum` INT DEFAULT 0 COMMENT '流水号',
`sub_type` TINYINT DEFAULT 0 COMMENT '1 DLD110, 2 PD132',
`str_type` VARCHAR(30) DEFAULT '',
`test_mode` TINYINT DEFAULT 0 COMMENT '0 灵敏度测试, 1 波动测试',
`data_source` CHAR(2) DEFAULT 'B2' COMMENT '数据来源 B2/B4',
`iffinish` VARCHAR(5) DEFAULT '' COMMENT '是否完成',
`fault_info` VARCHAR(100) DEFAULT '',
`relay_out` VARCHAR(24) DEFAULT '',
@@ -116,6 +119,15 @@ async def _create_tables(pool: aiomysql.Pool):
`exit_dist` INT DEFAULT 0,
`enter_speed` INT DEFAULT 0,
`exit_speed` INT DEFAULT 0,
`remain_count` INT DEFAULT 0 COMMENT '剩余波动次数 (B4)',
`work_freq` FLOAT DEFAULT 0 COMMENT '工作频率 Hz (B4)',
`curr_dist` INT DEFAULT 0 COMMENT '当前距离 mm (B4)',
`speed` INT DEFAULT 0 COMMENT '当前速度 dm/s (B4)',
`near_dist` INT DEFAULT 0 COMMENT '波动最近距离 mm (B4)',
`far_dist` INT DEFAULT 0 COMMENT '波动最远距离 mm (B4)',
`b4_enter_dist` INT DEFAULT 0 COMMENT 'B4 进入高度 mm',
`b4_leave_dist` INT DEFAULT 0 COMMENT 'B4 离开高度 mm',
`relay_code` TINYINT DEFAULT 0 COMMENT '继电器原始值 (0x00-0x03)',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX `idx_dnt_id` (`dnt_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
@@ -135,6 +147,233 @@ async def _create_tables(pool: aiomysql.Pool):
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# 5. 用户表
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_user` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`username` VARCHAR(45) UNIQUE NOT NULL,
`password_hash` VARCHAR(256) NOT NULL,
`role` VARCHAR(20) DEFAULT 'operator' COMMENT 'admin/manager/operator/analyst',
`is_active` TINYINT DEFAULT 1,
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# 6. 工装测试参数表
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_fixture_param` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`dnt_id` INT NOT NULL COMMENT 'FK → dnt_info.id',
`Addr` TINYINT DEFAULT 1 COMMENT '工装设备地址',
`DevType` TINYINT DEFAULT 0 COMMENT '被检设备型号类型编码',
`TestMode` TINYINT DEFAULT 0 COMMENT '0 灵敏度测试, 1 波动测试',
`RestDis` INT DEFAULT 0 COMMENT '复位距离 cm',
`MinusDis` INT DEFAULT 0 COMMENT '皮距/开始距离 cm',
`SensMin` INT DEFAULT 0 COMMENT '灵敏度最小值',
`SensMax` INT DEFAULT 0 COMMENT '灵敏度最大值',
`FreMin` INT DEFAULT 0 COMMENT '频率最小值 Hz',
`FreMax` INT DEFAULT 0 COMMENT '频率最大值 Hz',
`PeakMin` INT DEFAULT 0 COMMENT '峰峰值最小值',
`PeakMax` INT DEFAULT 0 COMMENT '峰峰值最大值',
`FarTol` INT DEFAULT 0 COMMENT '最远容差 cm (V2.0.3)',
`NearTol` INT DEFAULT 0 COMMENT '最近容差 cm (V2.0.3)',
`StepTol` INT DEFAULT 0 COMMENT '步进容差 cm (V2.0.3)',
`BackForth` INT DEFAULT 0 COMMENT '来回次数 (V2.0.3)',
`NearStay` INT DEFAULT 0 COMMENT '最近停留时间 ms (V2.0.3)',
`FarStay` INT DEFAULT 0 COMMENT '最远停留时间 ms (V2.0.3)',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE INDEX `idx_dnt_id` (`dnt_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# V2.0.3 迁移:为旧表补充波动测试参数字段
for col, col_def in [
("FarTol", "INT DEFAULT 0 COMMENT '最远容差 cm'"),
("NearTol", "INT DEFAULT 0 COMMENT '最近容差 cm'"),
("StepTol", "INT DEFAULT 0 COMMENT '步进容差 cm'"),
("BackForth", "INT DEFAULT 0 COMMENT '来回次数'"),
("NearStay", "INT DEFAULT 0 COMMENT '最近停留时间 ms'"),
("FarStay", "INT DEFAULT 0 COMMENT '最远停留时间 ms'"),
]:
try:
await cur.execute(
f"ALTER TABLE `tb_fixture_param` ADD COLUMN `{col}` {col_def}"
)
except Exception:
pass # 列已存在,忽略
# V2.0.4 迁移tb_state_tst 增加波动测试字段
for col, col_def in [
("test_mode", "TINYINT DEFAULT 0 COMMENT '0 灵敏度, 1 波动测试'"),
("data_source", "CHAR(2) DEFAULT 'B2' COMMENT 'B2/B4'"),
("remain_count", "INT DEFAULT 0 COMMENT '剩余波动次数'"),
("work_freq", "FLOAT DEFAULT 0 COMMENT '工作频率 Hz'"),
("curr_dist", "INT DEFAULT 0 COMMENT '当前距离 mm'"),
("speed", "INT DEFAULT 0 COMMENT '当前速度 dm/s'"),
("near_dist", "INT DEFAULT 0 COMMENT '波动最近距离 mm'"),
("far_dist", "INT DEFAULT 0 COMMENT '波动最远距离 mm'"),
("b4_enter_dist", "INT DEFAULT 0 COMMENT 'B4 进入高度 mm'"),
("b4_leave_dist", "INT DEFAULT 0 COMMENT 'B4 离开高度 mm'"),
("relay_code", "TINYINT DEFAULT 0 COMMENT '继电器原始值 0x00-0x03'"),
]:
try:
await cur.execute(
f"ALTER TABLE `tb_state_tst` ADD COLUMN `{col}` {col_def}"
)
except Exception:
pass
# 7. 车检器测试基准参数表
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_vechicle_base_test` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`dev_name` VARCHAR(100) DEFAULT '' COMMENT '车检器型号/名称',
`type_num` TINYINT DEFAULT 0 COMMENT '类型编码',
`SensMin` INT DEFAULT 0 COMMENT '灵敏度最小值',
`SensMax` INT DEFAULT 0 COMMENT '灵敏度最大值',
`FreMin` INT DEFAULT 0 COMMENT '频率最小值 Hz',
`FreMax` INT DEFAULT 0 COMMENT '频率最大值 Hz',
`PeakMin` INT DEFAULT 0 COMMENT '峰峰值最小值',
`PeakMax` INT DEFAULT 0 COMMENT '峰峰值最大值',
`remark` VARCHAR(500) DEFAULT '' COMMENT '备注',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# 8. 日志表
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_log` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`user_id` INT DEFAULT 0,
`username` VARCHAR(45) DEFAULT '',
`action_type` VARCHAR(30) NOT NULL COMMENT 'login/logout/command',
`target` VARCHAR(100) DEFAULT '' COMMENT '操作对象',
`detail` VARCHAR(500) DEFAULT '' COMMENT '详情',
`result` VARCHAR(20) DEFAULT 'ok' COMMENT 'ok/error',
`ip` VARCHAR(45) DEFAULT '',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX `idx_username` (`username`),
INDEX `idx_action_type` (`action_type`),
INDEX `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# 9. 线圈参数表
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_coil_info` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`coil_num` VARCHAR(45) DEFAULT '' COMMENT '线圈编号',
`name` VARCHAR(100) DEFAULT '' COMMENT '名称',
`induct` FLOAT DEFAULT 0 COMMENT '电感量',
`shape` VARCHAR(20) DEFAULT '' COMMENT '形状(矩形、圆形等)',
`length` FLOAT DEFAULT 0 COMMENT '长 cm矩形有效',
`width` FLOAT DEFAULT 0 COMMENT '宽 cm矩形有效',
`radius` FLOAT DEFAULT 0 COMMENT '半径 cm圆形有效',
`turns` INT DEFAULT 0 COMMENT '圈数',
`resistance` FLOAT DEFAULT 0 COMMENT '电阻 欧姆',
`material` VARCHAR(50) DEFAULT '' COMMENT '材质',
`remark` VARCHAR(500) DEFAULT '' COMMENT '备注',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# 10. 模拟车辆参数表
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_simulate_car` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`simulate_num` VARCHAR(45) DEFAULT '' COMMENT '模拟编号',
`name` VARCHAR(100) DEFAULT '' COMMENT '名称',
`shape` VARCHAR(20) DEFAULT '' COMMENT '形状(矩形、圆形等)',
`length` FLOAT DEFAULT 0 COMMENT '长 cm矩形有效',
`width` FLOAT DEFAULT 0 COMMENT '宽 cm矩形有效',
`radius` FLOAT DEFAULT 0 COMMENT '半径 cm圆形有效',
`material` VARCHAR(50) DEFAULT '' COMMENT '材质(铁板、合金等)',
`remark` VARCHAR(500) DEFAULT '' COMMENT '备注',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# V2.1.0 迁移tb_fixture_param 增加线圈/模拟车辆关联
for col, col_def in [
("coil_id", "INT DEFAULT NULL COMMENT 'FK → tb_coil_info.id'"),
("simulate_car_id", "INT DEFAULT NULL COMMENT 'FK → tb_simulate_car.id'"),
]:
try:
await cur.execute(
f"ALTER TABLE `tb_fixture_param` ADD COLUMN `{col}` {col_def}"
)
except Exception:
pass
# V2.1.0 迁移tb_state_tst 增加线圈/模拟车辆关联
for col, col_def in [
("coil_id", "INT DEFAULT NULL COMMENT 'FK → tb_coil_info.id'"),
("simulate_car_id", "INT DEFAULT NULL COMMENT 'FK → tb_simulate_car.id'"),
]:
try:
await cur.execute(
f"ALTER TABLE `tb_state_tst` ADD COLUMN `{col}` {col_def}"
)
except Exception:
pass
# V2.4.0 迁移tb_state_tst 增加车检器序列号
for col, col_def in [
("detector_serial", "VARCHAR(45) DEFAULT '' COMMENT '车检器序列号'"),
]:
try:
await cur.execute(
f"ALTER TABLE `tb_state_tst` ADD COLUMN `{col}` {col_def}"
)
except Exception:
pass
# 13. 待插入的车检器序列号表 (V2.4.0)
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_pending_detector` (
`dnt_id` INT PRIMARY KEY COMMENT 'FK → dnt_info.id',
`detector_serial` VARCHAR(45) DEFAULT '' COMMENT '待插入的车检器序列号',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# 11. 设备事件日志表
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_device_log` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`device_serial` VARCHAR(45) NOT NULL COMMENT '设备序列号',
`device_ip` VARCHAR(45) DEFAULT '' COMMENT '设备IP',
`event_type` VARCHAR(30) NOT NULL COMMENT 'login/online/offline/poor',
`event_content` VARCHAR(500) DEFAULT '' COMMENT '事件详情',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX `idx_serial` (`device_serial`),
INDEX `idx_event_type` (`event_type`),
INDEX `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# V2.2.0 迁移:扩展 dnt_info 状态值0=离线 1=在线 2=通信不良)
try:
await cur.execute(
"ALTER TABLE dnt_info MODIFY COLUMN `state` TINYINT DEFAULT 0 "
"COMMENT '0 offline, 1 online, 2 poor'"
)
except Exception:
pass
# V2.3.0 迁移tb_user 角色增加 manager
try:
await cur.execute(
"ALTER TABLE tb_user MODIFY COLUMN `role` VARCHAR(20) DEFAULT 'operator' "
"COMMENT 'admin/manager/operator/analyst'"
)
except Exception:
pass
logger.info("数据库表初始化完成")
@@ -228,7 +467,7 @@ async def upsert_dnt(serial: str, ip: str, port: int, mac: str,
if existing:
# 已有记录:更新 IP / 网关 / 上线时间
if (existing["ip"] != ip or existing["gateway"] != gateway
or existing["port"] != port or existing["subnet"] != subnet):
or existing["port"] != port or existing["subnet"] != subnet or existing["version"] != version):
await cur.execute(
"""UPDATE dnt_info SET ip=%s, port=%s, subnet=%s, gateway=%s,
mac=%s, msgport=%s, version=%s, last_login=NOW(), state=1
@@ -252,30 +491,139 @@ async def upsert_dnt(serial: str, ip: str, port: int, mac: str,
return cur.lastrowid
async def get_fixture_coil_car_ids(dnt_id: int) -> tuple[int | None, int | None]:
"""从 tb_fixture_param 获取当前线圈和模拟车辆关联 ID"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT coil_id, simulate_car_id FROM tb_fixture_param WHERE dnt_id=%s",
(dnt_id,),
)
row = await cur.fetchone()
if row:
return row.get("coil_id"), row.get("simulate_car_id")
return None, None
async def get_fixture_dev_type(dnt_id: int) -> int:
"""从 tb_fixture_param 获取被检设备型号类型编码 (DevType)"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"SELECT DevType FROM tb_fixture_param WHERE dnt_id=%s",
(dnt_id,),
)
row = await cur.fetchone()
return row[0] if row else 0
# ─── 设备型号名称缓存 ──────────────────────────────────────────────────
_dev_type_name_cache: dict[int, str] = {}
_cache_loaded = False
async def _load_dev_type_names():
"""从 tb_vechicle_base_test 加载 type_num → dev_name 映射"""
global _dev_type_name_cache, _cache_loaded
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"SELECT type_num, dev_name FROM tb_vechicle_base_test"
)
rows = await cur.fetchall()
_dev_type_name_cache = {row[0]: row[1] for row in rows if row[1]}
_cache_loaded = True
logger.debug(f"设备型号名称缓存已加载: {_dev_type_name_cache}")
async def get_dev_type_name(dev_type: int) -> str:
"""根据设备型号编码获取名称(从 tb_vechicle_base_test 查询,带内存缓存)"""
global _cache_loaded
if not _cache_loaded:
await _load_dev_type_names()
return _dev_type_name_cache.get(dev_type, f"Unknown({dev_type})")
async def refresh_dev_type_names():
"""刷新型号名称缓存(工装配置页新增型号后调用)"""
global _cache_loaded
_cache_loaded = False
await _load_dev_type_names()
async def insert_test_result(dnt_id: int, dpg430_addr: int, pcnum: str,
serialnum: int, sub_type: int, str_type: str,
iffinish: str, fault_info: str, relay_out: str,
ppvalue: float, idle_freq: float, enter_freq: float,
exit_freq: float, enter_dist: int, exit_dist: int,
enter_speed: int, exit_speed: int):
enter_speed: int, exit_speed: int,
test_mode: int = 0, data_source: str = "B2",
relay_code: int = 0):
"""插入测试结果到 tb_state_tst"""
coil_id, simulate_car_id = await get_fixture_coil_car_ids(dnt_id)
detector_serial = await get_pending_detector_serial(dnt_id)
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"""INSERT INTO tb_state_tst
(dnt_id, dpg430_addr, pcnum, serialnum, sub_type, str_type,
(dnt_id, detector_serial, dpg430_addr, pcnum, serialnum, sub_type, str_type,
test_mode, data_source,
iffinish, fault_info, relay_out, ppvalue, idle_freq,
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(dnt_id, dpg430_addr, pcnum, serialnum, sub_type, str_type,
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed,
relay_code, coil_id, simulate_car_id)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(dnt_id, detector_serial, dpg430_addr, pcnum, serialnum, sub_type, str_type,
test_mode, data_source,
iffinish, fault_info, relay_out, ppvalue, idle_freq,
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed),
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed,
relay_code, coil_id, simulate_car_id),
)
async def insert_wave_data(dnt_id: int, dpg430_addr: int,
remain_count: int, relay_code: int,
work_freq: float, curr_dist: int, speed: int,
near_dist: int, far_dist: int,
enter_dist: int, leave_dist: int,
relay_out: str = ""):
"""插入 0xB4 波动测试上报数据到 tb_state_tst"""
coil_id, simulate_car_id = await get_fixture_coil_car_ids(dnt_id)
dev_type = await get_fixture_dev_type(dnt_id)
str_type = await get_dev_type_name(dev_type) if dev_type else ""
detector_serial = await get_pending_detector_serial(dnt_id)
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"""INSERT INTO tb_state_tst
(dnt_id, detector_serial, dpg430_addr, sub_type, str_type,
test_mode, data_source,
relay_out, relay_code,
remain_count, work_freq, curr_dist, speed,
near_dist, far_dist, b4_enter_dist, b4_leave_dist,
coil_id, simulate_car_id)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(dnt_id, detector_serial, dpg430_addr, dev_type, str_type,
1, "B4",
relay_out, relay_code,
remain_count, work_freq, curr_dist, speed,
near_dist, far_dist, enter_dist, leave_dist,
coil_id, simulate_car_id),
)
logger.info(
f"B4波动数据已存储 dnt_id={dnt_id} relay=0x{relay_code:02X} "
f"剩余={remain_count} 当前距离={curr_dist}mm 速度={speed}dm/s"
f" 最近={near_dist}mm 最远={far_dist}mm"
)
async def set_device_offline(serial: str):
"""标记设备离线"""
"""标记设备离线(保持向后兼容)"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
@@ -285,6 +633,56 @@ async def set_device_offline(serial: str):
)
# ─── tb_device_log + 设备状态 ───────────────────────────────────────
async def insert_device_log(serial: str, ip: str, event_type: str,
content: str = ""):
"""插入设备事件日志"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"INSERT INTO tb_device_log (device_serial, device_ip, "
"event_type, event_content) VALUES (%s,%s,%s,%s)",
(serial, ip, event_type, content),
)
async def update_device_status(serial: str, state: int):
"""更新 dnt_info 设备状态0=离线 1=在线 2=通信不良)"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
if state == 0:
await cur.execute(
"UPDATE dnt_info SET state=0, last_off=NOW() "
"WHERE serial=%s AND state != 0",
(serial,),
)
elif state == 1:
await cur.execute(
"UPDATE dnt_info SET state=1, last_login=NOW() "
"WHERE serial=%s AND state != 1",
(serial,),
)
else:
await cur.execute(
"UPDATE dnt_info SET state=%s WHERE serial=%s",
(state, serial),
)
async def get_all_device_serials() -> list[tuple[str, int, str]]:
"""获取所有设备 (serial, state, ip),用于状态扫描"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"SELECT serial, state, ip FROM dnt_info ORDER BY id"
)
return await cur.fetchall()
# ─── tb_serialnet CRUD ─────────────────────────────────────────────
async def get_pending_serialnet(dnt_id: int) -> dict | None:
@@ -380,3 +778,103 @@ async def get_pending_by_device(dnt_id: int) -> list[dict]:
(dnt_id,),
)
return await cur.fetchall()
async def fetch_unparsed_serial(device_id: str) -> list[dict]:
"""获取设备未处理的 SerialNet 响应记录 (dat_type=9, state=0)"""
table = collect_table_name(device_id)
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
f"SELECT id, raw_data FROM `{table}` "
f"WHERE state=0 AND dat_type=9 LIMIT 50"
)
return await cur.fetchall()
# ─── tb_fixture_param CRUD ─────────────────────────────────────────
async def upsert_fixture_param(dnt_id: int, **kwargs):
"""插入或更新工装测试参数"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"SELECT id FROM tb_fixture_param WHERE dnt_id=%s", (dnt_id,),
)
existing = await cur.fetchone()
fields = [
"Addr", "DevType", "TestMode", "RestDis", "MinusDis",
"SensMin", "SensMax", "FreMin", "FreMax", "PeakMin", "PeakMax",
"FarTol", "NearTol", "StepTol", "BackForth", "NearStay", "FarStay",
]
if existing:
sets = ", ".join(f"`{f}`=%s" for f in fields)
values = [kwargs.get(f, 0) for f in fields] + [dnt_id]
await cur.execute(
f"UPDATE tb_fixture_param SET {sets} WHERE dnt_id=%s",
values,
)
else:
placeholders = ", ".join(["%s"] * len(fields))
col_names = ", ".join(f"`{f}`" for f in fields)
values = [kwargs.get(f, 0) for f in fields]
await cur.execute(
f"INSERT INTO tb_fixture_param (dnt_id, {col_names}) "
f"VALUES (%s, {placeholders})",
[dnt_id] + values,
)
async def get_fixture_param(dnt_id: int) -> dict | None:
"""获取设备的工装测试参数"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT * FROM tb_fixture_param WHERE dnt_id=%s", (dnt_id,),
)
return await cur.fetchone()
# ─── tb_vechicle_base_test CRUD ────────────────────────────────────
async def get_vehicle_base_tests() -> list[dict]:
"""获取所有车检器测试基准参数"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT * FROM tb_vechicle_base_test ORDER BY type_num ASC",
)
return await cur.fetchall()
async def get_vehicle_base_test_by_type(type_num: int) -> dict | None:
"""根据类型编码获取车检器测试基准"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT * FROM tb_vechicle_base_test WHERE type_num=%s",
(type_num,),
)
return await cur.fetchone()
# ─── tb_pending_detector ───────────────────────────────────────────
async def get_pending_detector_serial(dnt_id: int) -> str:
"""获取待插入的车检器序列号"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT detector_serial FROM tb_pending_detector WHERE dnt_id=%s",
(dnt_id,),
)
row = await cur.fetchone()
if row:
return row["detector_serial"] or ""
return ""

View File

@@ -33,8 +33,11 @@ from src.handlers import (
handle_serial_net,
parse_loop,
serialnet_loop,
serialnet_response_loop,
device_status_monitor,
set_udp_sender,
)
from src.models import insert_device_log
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
@@ -70,8 +73,10 @@ class EDCProtocol:
msg = parse_message(data)
if msg is None:
return
logger.info(f"UDP {msg} from {addr}")
method = msg.get("Method", "")
method_lower = method.lower()
logger.debug(f"UDP {method} from {addr}")
try:
@@ -79,11 +84,11 @@ class EDCProtocol:
if method == "Count_Off":
# 设备登录上报,只处理不回复
await handle_count_off(msg, addr)
elif method == "Heartbeat":
elif method_lower == "heartbeat":
response = await handle_heartbeat(msg)
elif method == "TSReport":
elif method_lower == "tsreport":
response = await handle_tsreport(msg)
elif method == "SerialNet":
elif method_lower == "serialnet":
response = await handle_serial_net(msg)
if response and self.transport:
@@ -102,23 +107,36 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
- 紧凑 JSON (无换行)
"""
addr = writer.get_extra_info("peername")
addr_ip = addr[0] if addr else ""
logger.info(f"TCP 连接: {addr}")
# TCP 连接事件日志
try:
asyncio.ensure_future(insert_device_log(
serial="", ip=addr_ip,
event_type="tcp_connect",
content=f"TCP 连接: {addr}",
))
except Exception:
pass
buffer = b""
async def process_message(msg: dict):
"""处理单条消息并返回响应文本"""
logger.info(f"TCP get_rcv {msg} from {addr}")
method = msg.get("Method", "")
method_lower = method.lower()
logger.debug(f"TCP {method} from {addr}")
try:
if method == "TimeStamp":
if method_lower == "timestamp":
return handle_timestamp(msg)
elif method == "TSReport":
elif method_lower == "tsreport":
return await handle_tsreport(msg)
elif method == "SerialNet":
elif method_lower == "serialnet":
return await handle_serial_net(msg)
elif method == "Heartbeat":
elif method_lower == "heartbeat":
return await handle_heartbeat(msg)
else:
logger.debug(f"TCP 未知方法: {method}")
@@ -172,6 +190,15 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
pass
finally:
logger.info(f"TCP 断开: {addr}")
# TCP 断开事件日志
try:
asyncio.ensure_future(insert_device_log(
serial="", ip=addr_ip,
event_type="tcp_disconnect",
content=f"TCP 断开: {addr}",
))
except Exception:
pass
writer.close()
await writer.wait_closed()
@@ -183,6 +210,8 @@ async def main():
asyncio.create_task(parse_loop())
asyncio.create_task(serialnet_loop())
asyncio.create_task(serialnet_response_loop())
asyncio.create_task(device_status_monitor())
loop = asyncio.get_running_loop()