Compare commits

...

8 Commits

Author SHA1 Message Date
wangfq
a2f31b3bfe fix: get_pending_detector_serial 不再删除记录,避免 B2 消费后 B4 拿不到序列号 2026-06-15 16:45:49 +08:00
wangfq
ff9482780d feat: tb_state_tst 增加 detector_serial 字段 + tb_pending_detector 辅助表
- tb_state_tst DDL 增加 detector_serial VARCHAR(45) 列 + V2.4.0 迁移
- 新建 tb_pending_detector 表,用于 web 端暂存待插入的序列号
- insert_test_result / insert_wave_data 插入前从辅助表读取序列号(消费后清除)
- 新增 get_pending_detector_serial() 查询函数
2026-06-15 10:02:19 +08:00
wangfq
6e13990386 fix: 设备型号名称改为从 db tb_vechicle_base_test 动态查询,修复新增型号显示 Unknown 的问题
- handlers.py: B2 数据硬编码 dev_model_map {1:PD132,2:DLD110} → await get_dev_type_name()
- models.py: B4 波动数据硬编码 map → 同上
- models.py: 新增 get_dev_type_name() 带内存缓存,首次加载后缓存 type_num→dev_name
- models.py: 新增 refresh_dev_type_names() 供工装配置页新增型号后刷新缓存
2026-06-12 10:00:25 +08:00
wangfq
3580f89552 feat: role COMMENT 增加 analyst 角色 2026-06-11 17:21:42 +08:00
wangfq
25aafd57c8 feat: V2.3.0 role COMMENT 增加 manager 角色,DDL + ALTER TABLE 迁移 2026-06-11 09:00:27 +08:00
wangfq
cdddfac609 fix: 0xB4 继电器 relay_out 字段与 0xB2 使用相同格式
之前 insert_wave_data 的 relay_out 被硬编码为空字符串,
decode_relay_info 的计算结果未写入。现在增加参数 relay_out
并传入格式化后的继电器状态字符串。
2026-06-10 17:27:38 +08:00
wangfq
944870496a fix: 继电器输出状态解析改为完整的 有/无 描述格式
旧格式: '存在信号; 脉冲信号' (仅显示置位的 bit)
新格式: '存在继电器有输出,脉冲继电器有输出' (始终显示两个 bit 的状态)

bit 0 (x & 0x01): 存在继电器有/无输出
bit 1 (x & 0x02): 脉冲继电器有/无输出
2026-06-10 16:25:31 +08:00
wangfq
c875cf383b fix: 修复 HeartBeat 大小写不匹配导致交互未被记录的问题
根本原因: 设备发送 Method='HeartBeat'(大写B), 代码匹配'Heartbeat'(小写b),
        心跳包被静默忽略, record_interaction 未调用, 导致监控误判为离线。

修复:
- UDP/TCP 方法匹配改为 case-insensitive (method_lower)
- handle_timestamp 增加 record_interaction 调用 (TimeStamp 也是设备交互)
- TCP 连接/断开时写入 tb_device_log 事件日志 (tcp_connect/tcp_disconnect)
2026-06-10 10:01:07 +08:00
4 changed files with 142 additions and 39 deletions

View File

@@ -280,12 +280,16 @@ def decode_fault_info(fault: int) -> str:
def decode_relay_info(relay: int) -> str:
"""解码继电器 bitmask"""
items = []
for bit, desc in RELAY_BITS.items():
if relay & (1 << bit):
items.append(desc)
return "; ".join(items) if items else "输出"
"""解码继电器输出状态为可读字符串
0xB2 继电器输出状态原始值 x 的解析规则:
- x & 0x01 为真 → "存在继电器有输出",否则 "存在继电器无输出"
- x & 0x02 为真 → "脉冲继电器有输出",否则 "脉冲继电器无输出"
汇总格式: "存在继电器有输出,脉冲继电器有输出"
"""
exist = "存在继电器有输出" if (relay & 0x01) else "存在继电器无输出"
pulse = "脉冲继电器有输出" if (relay & 0x02) else "脉冲继电器无输出"
return f"{exist}{pulse}"
# ─── 0x4A 获取设备版本号响应 ──────────────────────────────────────

View File

@@ -33,6 +33,7 @@ from src.models import (
insert_device_log,
update_device_status,
get_all_device_serials,
get_dev_type_name,
)
from src.dg430 import (
parse_b2_status,
@@ -187,21 +188,11 @@ async def handle_heartbeat(data: dict) -> str | None:
def handle_timestamp(data: dict) -> str:
"""处理时间同步请求
TimeStamp 请求格式:
{
"Method": "TimeStamp",
"Params": {
"Device_id": "2345",
"TimeZone": "Asia/Shanghai"
}
}
返回格式参考 PGLC 协议 3.5 节。
"""
"""处理时间同步请求(也是设备交互)"""
params = data.get("Params", {})
device_id = params.get("Device_id", "")
if device_id:
record_interaction(device_id)
return make_timestamp_response(device_id, int(time.time()))
@@ -303,8 +294,7 @@ async def parse_loop():
fault_info = decode_fault_info(status.fault)
relay_info = decode_relay_info(status.relay_out)
dev_model_map = {1: "PD132", 2: "DLD110"}
str_type = dev_model_map.get(status.dev_model, f"Unknown({status.dev_model})")
str_type = await get_dev_type_name(status.dev_model)
await insert_test_result(
dnt_id=dnt_id,
@@ -397,6 +387,7 @@ async def parse_loop():
dpg430_addr=wave.addr,
remain_count=wave.remain_count,
relay_code=wave.relay_out,
relay_out=relay_info,
work_freq=wave.work_freq,
curr_dist=wave.curr_dist,
speed=wave.speed,

View File

@@ -100,6 +100,7 @@ async def _create_tables(pool: aiomysql.Pool):
CREATE TABLE IF NOT EXISTS `tb_state_tst` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`dnt_id` INT NOT NULL COMMENT 'FK → dnt_info.id',
`detector_serial` VARCHAR(45) DEFAULT '' COMMENT '车检器序列号',
`dpg430_addr` TINYINT DEFAULT 0,
`pcnum` VARCHAR(10) DEFAULT '' COMMENT '批次号',
`serialnum` INT DEFAULT 0 COMMENT '流水号',
@@ -152,7 +153,7 @@ async def _create_tables(pool: aiomysql.Pool):
`id` INT AUTO_INCREMENT PRIMARY KEY,
`username` VARCHAR(45) UNIQUE NOT NULL,
`password_hash` VARCHAR(256) NOT NULL,
`role` VARCHAR(20) DEFAULT 'operator' COMMENT 'admin/operator',
`role` VARCHAR(20) DEFAULT 'operator' COMMENT 'admin/manager/operator/analyst',
`is_active` TINYINT DEFAULT 1,
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
@@ -320,6 +321,26 @@ async def _create_tables(pool: aiomysql.Pool):
except Exception:
pass
# V2.4.0 迁移tb_state_tst 增加车检器序列号
for col, col_def in [
("detector_serial", "VARCHAR(45) DEFAULT '' COMMENT '车检器序列号'"),
]:
try:
await cur.execute(
f"ALTER TABLE `tb_state_tst` ADD COLUMN `{col}` {col_def}"
)
except Exception:
pass
# 13. 待插入的车检器序列号表 (V2.4.0)
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_pending_detector` (
`dnt_id` INT PRIMARY KEY COMMENT 'FK → dnt_info.id',
`detector_serial` VARCHAR(45) DEFAULT '' COMMENT '待插入的车检器序列号',
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
""")
# 11. 设备事件日志表
await cur.execute("""
CREATE TABLE IF NOT EXISTS `tb_device_log` (
@@ -344,6 +365,15 @@ async def _create_tables(pool: aiomysql.Pool):
except Exception:
pass
# V2.3.0 迁移tb_user 角色增加 manager
try:
await cur.execute(
"ALTER TABLE tb_user MODIFY COLUMN `role` VARCHAR(20) DEFAULT 'operator' "
"COMMENT 'admin/manager/operator/analyst'"
)
except Exception:
pass
logger.info("数据库表初始化完成")
@@ -489,6 +519,42 @@ async def get_fixture_dev_type(dnt_id: int) -> int:
return row[0] if row else 0
# ─── 设备型号名称缓存 ──────────────────────────────────────────────────
_dev_type_name_cache: dict[int, str] = {}
_cache_loaded = False
async def _load_dev_type_names():
"""从 tb_vechicle_base_test 加载 type_num → dev_name 映射"""
global _dev_type_name_cache, _cache_loaded
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"SELECT type_num, dev_name FROM tb_vechicle_base_test"
)
rows = await cur.fetchall()
_dev_type_name_cache = {row[0]: row[1] for row in rows if row[1]}
_cache_loaded = True
logger.debug(f"设备型号名称缓存已加载: {_dev_type_name_cache}")
async def get_dev_type_name(dev_type: int) -> str:
"""根据设备型号编码获取名称(从 tb_vechicle_base_test 查询,带内存缓存)"""
global _cache_loaded
if not _cache_loaded:
await _load_dev_type_names()
return _dev_type_name_cache.get(dev_type, f"Unknown({dev_type})")
async def refresh_dev_type_names():
"""刷新型号名称缓存(工装配置页新增型号后调用)"""
global _cache_loaded
_cache_loaded = False
await _load_dev_type_names()
async def insert_test_result(dnt_id: int, dpg430_addr: int, pcnum: str,
serialnum: int, sub_type: int, str_type: str,
iffinish: str, fault_info: str, relay_out: str,
@@ -499,18 +565,19 @@ async def insert_test_result(dnt_id: int, dpg430_addr: int, pcnum: str,
relay_code: int = 0):
"""插入测试结果到 tb_state_tst"""
coil_id, simulate_car_id = await get_fixture_coil_car_ids(dnt_id)
detector_serial = await get_pending_detector_serial(dnt_id)
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"""INSERT INTO tb_state_tst
(dnt_id, dpg430_addr, pcnum, serialnum, sub_type, str_type,
(dnt_id, detector_serial, dpg430_addr, pcnum, serialnum, sub_type, str_type,
test_mode, data_source,
iffinish, fault_info, relay_out, ppvalue, idle_freq,
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed,
relay_code, coil_id, simulate_car_id)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(dnt_id, dpg430_addr, pcnum, serialnum, sub_type, str_type,
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(dnt_id, detector_serial, dpg430_addr, pcnum, serialnum, sub_type, str_type,
test_mode, data_source,
iffinish, fault_info, relay_out, ppvalue, idle_freq,
enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed,
@@ -522,27 +589,28 @@ async def insert_wave_data(dnt_id: int, dpg430_addr: int,
remain_count: int, relay_code: int,
work_freq: float, curr_dist: int, speed: int,
near_dist: int, far_dist: int,
enter_dist: int, leave_dist: int):
enter_dist: int, leave_dist: int,
relay_out: str = ""):
"""插入 0xB4 波动测试上报数据到 tb_state_tst"""
coil_id, simulate_car_id = await get_fixture_coil_car_ids(dnt_id)
dev_type = await get_fixture_dev_type(dnt_id)
dev_model_map = {1: "PD132", 2: "DLD110"}
str_type = dev_model_map.get(dev_type, f"Unknown({dev_type})") if dev_type else ""
str_type = await get_dev_type_name(dev_type) if dev_type else ""
detector_serial = await get_pending_detector_serial(dnt_id)
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"""INSERT INTO tb_state_tst
(dnt_id, dpg430_addr, sub_type, str_type,
(dnt_id, detector_serial, dpg430_addr, sub_type, str_type,
test_mode, data_source,
relay_out, relay_code,
remain_count, work_freq, curr_dist, speed,
near_dist, far_dist, b4_enter_dist, b4_leave_dist,
coil_id, simulate_car_id)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(dnt_id, dpg430_addr, dev_type, str_type,
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
(dnt_id, detector_serial, dpg430_addr, dev_type, str_type,
1, "B4",
"", relay_code,
relay_out, relay_code,
remain_count, work_freq, curr_dist, speed,
near_dist, far_dist, enter_dist, leave_dist,
coil_id, simulate_car_id),
@@ -793,3 +861,20 @@ async def get_vehicle_base_test_by_type(type_num: int) -> dict | None:
(type_num,),
)
return await cur.fetchone()
# ─── tb_pending_detector ───────────────────────────────────────────
async def get_pending_detector_serial(dnt_id: int) -> str:
"""获取待插入的车检器序列号"""
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(
"SELECT detector_serial FROM tb_pending_detector WHERE dnt_id=%s",
(dnt_id,),
)
row = await cur.fetchone()
if row:
return row["detector_serial"] or ""
return ""

View File

@@ -37,6 +37,7 @@ from src.handlers import (
device_status_monitor,
set_udp_sender,
)
from src.models import insert_device_log
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
@@ -75,6 +76,7 @@ class EDCProtocol:
logger.info(f"UDP {msg} from {addr}")
method = msg.get("Method", "")
method_lower = method.lower()
logger.debug(f"UDP {method} from {addr}")
try:
@@ -82,11 +84,11 @@ class EDCProtocol:
if method == "Count_Off":
# 设备登录上报,只处理不回复
await handle_count_off(msg, addr)
elif method == "Heartbeat":
elif method_lower == "heartbeat":
response = await handle_heartbeat(msg)
elif method == "TSReport":
elif method_lower == "tsreport":
response = await handle_tsreport(msg)
elif method == "SerialNet":
elif method_lower == "serialnet":
response = await handle_serial_net(msg)
if response and self.transport:
@@ -105,24 +107,36 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
- 紧凑 JSON (无换行)
"""
addr = writer.get_extra_info("peername")
addr_ip = addr[0] if addr else ""
logger.info(f"TCP 连接: {addr}")
# TCP 连接事件日志
try:
asyncio.ensure_future(insert_device_log(
serial="", ip=addr_ip,
event_type="tcp_connect",
content=f"TCP 连接: {addr}",
))
except Exception:
pass
buffer = b""
async def process_message(msg: dict):
"""处理单条消息并返回响应文本"""
logger.info(f"TCP get_rcv {msg} from {addr}")
method = msg.get("Method", "")
method_lower = method.lower()
logger.debug(f"TCP {method} from {addr}")
try:
if method == "TimeStamp":
if method_lower == "timestamp":
return handle_timestamp(msg)
elif method == "TSReport":
elif method_lower == "tsreport":
return await handle_tsreport(msg)
elif method == "SerialNet":
elif method_lower == "serialnet":
return await handle_serial_net(msg)
elif method == "Heartbeat":
elif method_lower == "heartbeat":
return await handle_heartbeat(msg)
else:
logger.debug(f"TCP 未知方法: {method}")
@@ -176,6 +190,15 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
pass
finally:
logger.info(f"TCP 断开: {addr}")
# TCP 断开事件日志
try:
asyncio.ensure_future(insert_device_log(
serial="", ip=addr_ip,
event_type="tcp_disconnect",
content=f"TCP 断开: {addr}",
))
except Exception:
pass
writer.close()
await writer.wait_closed()