diff --git a/src/dg430.py b/src/dg430.py index 64637ea..b3d7710 100644 --- a/src/dg430.py +++ b/src/dg430.py @@ -57,6 +57,11 @@ def _le16(data: bytes, offset: int) -> int: return data[offset] | (data[offset + 1] << 8) +def _be16(data: bytes, offset: int) -> int: + """大端 2 字节 → int""" + return (data[offset] << 8) | data[offset + 1] + + def verify_packet(data: bytes) -> bool: """校验数据包完整性""" if len(data) < PKT_MIN_LEN: @@ -217,3 +222,125 @@ def decode_relay_info(relay: int) -> str: if relay & (1 << bit): items.append(desc) return "; ".join(items) if items else "无输出" + + +# ─── 0x4A 获取设备版本号响应 ────────────────────────────────────── + +@dataclass +class DG430Version: + addr: int + hw_major: int + hw_minor: int + hw_patch: int + sw_major: int + sw_minor: int + sw_patch: int + + +def parse_4a_version(data: bytes) -> DG430Version | None: + """解析 0x4A 版本号响应 + + 格式: 7F | ADDR | 08 | 4A | 00 | HW(3B) | SW(3B) | XOR | SUM + """ + if not verify_packet(data): + return None + cmd = data[3] + if cmd != 0x4A: + return None + + payload = data[4:3 + data[2]] + if len(payload) < 7: + return None + + addr = data[1] & 0x7F + return DG430Version( + addr=addr, + hw_major=payload[1], + hw_minor=payload[2], + hw_patch=payload[3], + sw_major=payload[4], + sw_minor=payload[5], + sw_patch=payload[6], + ) + + +# ─── 通用 Flag 响应 (0x4B/0x4D/0x4E) ───────────────────────────── + +def parse_flag_response(data: bytes, expected_cmd: int) -> int | None: + """解析 Flag 响应格式: STX | ADDR | 02 | CMD | Flag | XOR | SUM + + Returns: + Flag 值 (0=正常, 1=故障), 或 None 表示解析失败 + """ + if not verify_packet(data): + return None + cmd = data[3] + if cmd != expected_cmd: + return None + if data[2] < 2: + return None + return data[4] # Flag + + +# ─── 0x4C 查询设备测试参数响应 ──────────────────────────────────── + +@dataclass +class DG430FixtureParams: + addr: int + flag: int # 0=正常, 1=故障 + dev_addr: int # 设备地址 + dev_type: int # 设备型号 + test_mode: int # 0 灵敏度, 1 模拟过车 + reset_dis: int # 复位距离 cm + minus_dis: int # 皮距 cm + sens_min: int # 灵敏度最小值 + sens_max: int # 灵敏度最大值 + fre_min: int # 频率最小值 Hz + fre_max: int # 频率最大值 Hz + peak_min: int # 峰峰值最小值 + peak_max: int # 峰峰值最大值 + + +def parse_4c_params(data: bytes) -> DG430FixtureParams | None: + """解析 0x4C 查询测试参数响应 + + 格式: 7F | ADDR | 13 | 4C | Flag | Addr | DevType | TestMode | + ResetDis | MinusDis | SensMin(2) | SensMax(2) | + FreMin(2) | FreMax(2) | PeakMin(2) | PeakMax(2) | XOR | SUM + """ + if not verify_packet(data): + return None + cmd = data[3] + if cmd != 0x4C: + return None + + payload = data[4:3 + data[2]] + if len(payload) < 18: + return None + + addr = data[1] & 0x7F + # 0x4B/0x4C 多字节字段为小端序 + return DG430FixtureParams( + addr=addr, + flag=payload[0], + dev_addr=payload[1], + dev_type=payload[2], + test_mode=payload[3], + reset_dis=payload[4], + minus_dis=payload[5], + sens_min=_le16(payload, 6), + sens_max=_le16(payload, 8), + fre_min=_le16(payload, 10), + fre_max=_le16(payload, 12), + peak_min=_le16(payload, 14), + peak_max=_le16(payload, 16), + ) + + +# ─── 获取数据包 CMD(用于匹配)──────────────────────────────────── + +def get_packet_cmd(data: bytes) -> int | None: + """从数据包中提取 CMD 字节""" + if len(data) < 4: + return None + return data[3] diff --git a/src/handlers.py b/src/handlers.py index c451044..152b028 100644 --- a/src/handlers.py +++ b/src/handlers.py @@ -19,6 +19,7 @@ from src.models import ( ensure_collect_table, insert_collect_data, fetch_unparsed, + fetch_unparsed_serial, mark_record_state, insert_test_result, get_dnt_by_serial, @@ -26,9 +27,14 @@ from src.models import ( mark_serialnet_sent, mark_serialnet_done, mark_serialnet_timeout, + upsert_fixture_param, ) from src.dg430 import ( parse_b2_status, + parse_4a_version, + parse_flag_response, + parse_4c_params, + get_packet_cmd, hex_str_to_bytes, split_packets, verify_packet, @@ -237,70 +243,132 @@ async def parse_loop(): await mark_record_state(device_id, rec["id"], state=3) continue - has_valid_b2 = False + has_valid = False all_failed = True for pkt in packets: cmd = pkt[3] if len(pkt) > 3 else 0 - # 只处理 B2 状态上报 - if cmd != 0xB2: - logger.debug(f"跳过非 B2 指令: 0x{cmd:02X}") - continue + # ── B2 状态上报 ── + if cmd == 0xB2: + if not verify_packet(pkt): + logger.warning(f"B2 数据包校验失败: {device_id} rec={rec['id']}") + continue - if not verify_packet(pkt): - logger.warning(f"B2 数据包校验失败: {device_id} rec={rec['id']}") - continue + status = parse_b2_status(pkt) + if status is None: + logger.warning(f"B2 解析失败: {device_id} rec={rec['id']}") + continue - status = parse_b2_status(pkt) - if status is None: - logger.warning(f"B2 解析失败: {device_id} rec={rec['id']}") - continue + fault_info = decode_fault_info(status.fault) + relay_info = decode_relay_info(status.relay_out) - fault_info = decode_fault_info(status.fault) - relay_info = decode_relay_info(status.relay_out) + dev_model_map = {1: "PD132", 2: "DLD110"} + str_type = dev_model_map.get(status.dev_model, f"Unknown({status.dev_model})") - dev_model_map = {1: "PD132", 2: "DLD110"} - str_type = dev_model_map.get(status.dev_model, f"Unknown({status.dev_model})") + await insert_test_result( + dnt_id=dnt_id, + dpg430_addr=status.addr, + pcnum=datetime.now().strftime("%Y%m"), + serialnum=0, + sub_type=status.dev_model, + str_type=str_type, + iffinish="1" if status.is_finished else "0", + fault_info=fault_info, + relay_out=relay_info, + ppvalue=status.ppvalue, + idle_freq=status.idle_freq, + enter_freq=status.enter_freq, + exit_freq=status.exit_freq, + enter_dist=status.enter_dist, + exit_dist=status.exit_dist, + enter_speed=status.enter_speed, + exit_speed=status.exit_speed, + ) - await insert_test_result( - dnt_id=dnt_id, - dpg430_addr=status.addr, - pcnum=datetime.now().strftime("%Y%m"), - serialnum=0, - sub_type=status.dev_model, - str_type=str_type, - iffinish="1" if status.is_finished else "0", - fault_info=fault_info, - relay_out=relay_info, - ppvalue=status.ppvalue, - idle_freq=status.idle_freq, - enter_freq=status.enter_freq, - exit_freq=status.exit_freq, - enter_dist=status.enter_dist, - exit_dist=status.exit_dist, - enter_speed=status.enter_speed, - exit_speed=status.exit_speed, - ) + # 匹配 tb_serialnet 中的待确认记录 + await _match_serialnet_response(dnt_id, raw) - # 匹配 tb_serialnet 中的待确认记录 (state=1) - await _match_serialnet_response(dnt_id, raw) + has_valid = True + all_failed = False + logger.info( + f"解析完成: {device_id} dg430={status.addr} " + f"型号={str_type} 峰峰值={status.ppvalue:.2f}V " + f"进入高度={status.enter_dist}mm 故障={fault_info}" + ) - has_valid_b2 = True - all_failed = False - logger.info( - f"解析完成: {device_id} dg430={status.addr} " - f"型号={str_type} 峰峰值={status.ppvalue:.2f}V " - f"进入高度={status.enter_dist}mm 故障={fault_info}" - ) + # ── 0x4C 查询工装参数响应 ── + elif cmd == 0x4C: + if not verify_packet(pkt): + logger.warning(f"0x4C 数据包校验失败: {device_id}") + continue - if has_valid_b2: + params = parse_4c_params(pkt) + if params is None: + logger.warning(f"0x4C 解析失败: {device_id}") + continue + + if params.flag == 0: + await upsert_fixture_param( + dnt_id, + Addr=params.dev_addr, + DevType=params.dev_type, + TestMode=params.test_mode, + RestDis=params.reset_dis, + MinusDis=params.minus_dis, + SensMin=params.sens_min, + SensMax=params.sens_max, + FreMin=params.fre_min, + FreMax=params.fre_max, + PeakMin=params.peak_min, + PeakMax=params.peak_max, + ) + logger.info( + f"0x4C 工装参数已更新 dnt_id={dnt_id} " + f"DevType={params.dev_type} TestMode={params.test_mode}" + ) + + await _match_serialnet_response(dnt_id, raw) + has_valid = True + all_failed = False + + # ── 测试指令 Flag 响应(不匹配 serialnet,等待 B2 完成)── + elif cmd in (0xB0, 0xB1, 0xBA, 0xBB, 0xBC): + if not verify_packet(pkt): + logger.debug(f"测试指令 0x{cmd:02X} 校验失败, " + f"rec={rec['id']}") + continue + # 不匹配 serialnet — B2 状态上报才标记完成 + has_valid = True + all_failed = False + logger.info( + f"测试指令 0x{cmd:02X} Flag 响应 dnt_id={dnt_id}" + ) + + # ── 配置指令响应 ── + elif cmd in (0x4A, 0x4B, 0x4D, 0x4E): + if not verify_packet(pkt): + logger.debug(f"配置指令 0x{cmd:02X} 校验失败, " + f"跳过 rec={rec['id']}") + continue + + await _match_serialnet_response(dnt_id, raw) + has_valid = True + all_failed = False + logger.info( + f"配置指令 0x{cmd:02X} 响应已匹配 " + f"dnt_id={dnt_id}" + ) + + else: + logger.debug(f"跳过未知/不支持指令: 0x{cmd:02X}") + + if has_valid: await mark_record_state(device_id, rec["id"], state=1) elif all_failed and packets: await mark_record_state(device_id, rec["id"], state=3) logger.warning(f"记录 {rec['id']} 所有包校验失败, state=3") else: - # 有非 B2 包但没有 B2,也算已处理(无解析目标) await mark_record_state(device_id, rec["id"], state=1) except Exception as e: @@ -439,3 +507,118 @@ async def _send_serialnet_cmd(device_id: str, dnt_id: int, record: dict): _udp_sender(msg_bytes, addr) await mark_serialnet_sent(record["id"]) logger.info(f"已发送 SerialNet → {device_id} ({addr}): {send_pkg}") + + +# ─── SerialNet 响应处理轮询 ────────────────────────────────────────── + +async def serialnet_response_loop(): + """后台轮询:处理 SerialNet 响应 (dat_type=9) + + 当设备通过 SerialNet 返回 0x4A-0x4E 等指令的响应时, + 匹配 tb_serialnet 中 state=1 的记录并标记完成。 + 对于 0x4C 响应,同时更新 tb_fixture_param。 + """ + logger.info("SerialNet 响应处理服务启动") + await asyncio.sleep(3) # 等前面所有服务就绪 + + while True: + try: + for device_id, dnt_id in list(_registry.items()): + records = await fetch_unparsed_serial(device_id) + for rec in records: + try: + raw = rec["raw_data"] + pkt_bytes = hex_str_to_bytes(raw) + + packets = split_packets(pkt_bytes) + if not packets: + await mark_record_state(device_id, rec["id"], state=3) + continue + + for pkt in packets: + cmd = get_packet_cmd(pkt) + if cmd is None: + continue + + # 跳过 B2 (已在 parse_loop 处理) + if cmd == 0xB2: + continue + + # 匹配 tb_serialnet 中 state=1 且 CMD 匹配的记录 + await _match_serial_cmd(dnt_id, cmd, raw) + + # 对于 0x4C,解析参数并更新数据库 + if cmd == 0x4C and verify_packet(pkt): + params = parse_4c_params(pkt) + if params and params.flag == 0: + await upsert_fixture_param( + dnt_id, + Addr=params.dev_addr, + DevType=params.dev_type, + TestMode=params.test_mode, + RestDis=params.reset_dis, + MinusDis=params.minus_dis, + SensMin=params.sens_min, + SensMax=params.sens_max, + FreMin=params.fre_min, + FreMax=params.fre_max, + PeakMin=params.peak_min, + PeakMax=params.peak_max, + ) + logger.info( + f"已更新工装参数 dnt_id={dnt_id} " + f"DevType={params.dev_type} " + f"TestMode={params.test_mode}" + ) + + # 对于 0x4A,记录版本信息 + if cmd == 0x4A and verify_packet(pkt): + ver = parse_4a_version(pkt) + if ver: + logger.info( + f"DG430 版本: hw={ver.hw_major}.{ver.hw_minor}.{ver.hw_patch} " + f"sw={ver.sw_major}.{ver.sw_minor}.{ver.sw_patch}" + ) + + await mark_record_state(device_id, rec["id"], state=1) + + except Exception as e: + logger.error( + f"SerialNet 响应处理异常 rec={rec['id']}: {e}", + exc_info=True, + ) + try: + await mark_record_state(device_id, rec["id"], state=3) + except Exception: + pass + + except Exception as e: + logger.error(f"SerialNet 响应循环异常: {e}") + + await asyncio.sleep(0.5) + + +async def _match_serial_cmd(dnt_id: int, cmd: int, raw_hex: str): + """匹配 tb_serialnet 中 state=1 且 CMD 匹配的记录""" + try: + from src.models import get_pool + import aiomysql + pool = await get_pool() + cmd_hex = f"{cmd:02X}" + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT id FROM tb_serialnet WHERE dnt_id=%s AND state=1 " + "AND UPPER(SUBSTRING(send_pkg, 7, 2)) = %s " + "ORDER BY id ASC LIMIT 1", + (dnt_id, cmd_hex), + ) + row = await cur.fetchone() + if row: + await mark_serialnet_done(row["id"], raw_hex) + logger.info( + f"tb_serialnet #{row['id']} CMD=0x{cmd:02X} 已确认完成 " + f"(dnt_id={dnt_id})" + ) + except Exception as e: + logger.warning(f"匹配 serialnet cmd 0x{cmd:02X} 失败: {e}") diff --git a/src/models.py b/src/models.py index 4db13eb..5e8bae8 100644 --- a/src/models.py +++ b/src/models.py @@ -147,7 +147,47 @@ async def _create_tables(pool: aiomysql.Pool): ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 """) - # 6. 日志表 + # 6. 工装测试参数表 + await cur.execute(""" + CREATE TABLE IF NOT EXISTS `tb_fixture_param` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `dnt_id` INT NOT NULL COMMENT 'FK → dnt_info.id', + `Addr` TINYINT DEFAULT 1 COMMENT '工装设备地址', + `DevType` TINYINT DEFAULT 0 COMMENT '被检设备型号类型编码', + `TestMode` TINYINT DEFAULT 0 COMMENT '0 灵敏度测试, 1 模拟过车', + `RestDis` INT DEFAULT 0 COMMENT '复位距离 cm', + `MinusDis` INT DEFAULT 0 COMMENT '皮距/开始距离 cm', + `SensMin` INT DEFAULT 0 COMMENT '灵敏度最小值', + `SensMax` INT DEFAULT 0 COMMENT '灵敏度最大值', + `FreMin` INT DEFAULT 0 COMMENT '频率最小值 Hz', + `FreMax` INT DEFAULT 0 COMMENT '频率最大值 Hz', + `PeakMin` INT DEFAULT 0 COMMENT '峰峰值最小值', + `PeakMax` INT DEFAULT 0 COMMENT '峰峰值最大值', + `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, + `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE INDEX `idx_dnt_id` (`dnt_id`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + + # 7. 车检器测试基准参数表 + await cur.execute(""" + CREATE TABLE IF NOT EXISTS `tb_vechicle_base_test` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `dev_name` VARCHAR(100) DEFAULT '' COMMENT '车检器型号/名称', + `type_num` TINYINT DEFAULT 0 COMMENT '类型编码', + `SensMin` INT DEFAULT 0 COMMENT '灵敏度最小值', + `SensMax` INT DEFAULT 0 COMMENT '灵敏度最大值', + `FreMin` INT DEFAULT 0 COMMENT '频率最小值 Hz', + `FreMax` INT DEFAULT 0 COMMENT '频率最大值 Hz', + `PeakMin` INT DEFAULT 0 COMMENT '峰峰值最小值', + `PeakMax` INT DEFAULT 0 COMMENT '峰峰值最大值', + `remark` VARCHAR(500) DEFAULT '' COMMENT '备注', + `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, + `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + + # 8. 日志表 await cur.execute(""" CREATE TABLE IF NOT EXISTS `tb_log` ( `id` INT AUTO_INCREMENT PRIMARY KEY, @@ -258,7 +298,7 @@ async def upsert_dnt(serial: str, ip: str, port: int, mac: str, if existing: # 已有记录:更新 IP / 网关 / 上线时间 if (existing["ip"] != ip or existing["gateway"] != gateway - or existing["port"] != port or existing["subnet"] != subnet): + or existing["port"] != port or existing["subnet"] != subnet or existing["version"] != version): await cur.execute( """UPDATE dnt_info SET ip=%s, port=%s, subnet=%s, gateway=%s, mac=%s, msgport=%s, version=%s, last_login=NOW(), state=1 @@ -410,3 +450,85 @@ async def get_pending_by_device(dnt_id: int) -> list[dict]: (dnt_id,), ) return await cur.fetchall() + + +async def fetch_unparsed_serial(device_id: str) -> list[dict]: + """获取设备未处理的 SerialNet 响应记录 (dat_type=9, state=0)""" + table = collect_table_name(device_id) + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + f"SELECT id, raw_data FROM `{table}` " + f"WHERE state=0 AND dat_type=9 LIMIT 50" + ) + return await cur.fetchall() + + +# ─── tb_fixture_param CRUD ───────────────────────────────────────── + +async def upsert_fixture_param(dnt_id: int, **kwargs): + """插入或更新工装测试参数""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + "SELECT id FROM tb_fixture_param WHERE dnt_id=%s", (dnt_id,), + ) + existing = await cur.fetchone() + fields = [ + "Addr", "DevType", "TestMode", "RestDis", "MinusDis", + "SensMin", "SensMax", "FreMin", "FreMax", "PeakMin", "PeakMax", + ] + if existing: + sets = ", ".join(f"`{f}`=%s" for f in fields) + values = [kwargs.get(f, 0) for f in fields] + [dnt_id] + await cur.execute( + f"UPDATE tb_fixture_param SET {sets} WHERE dnt_id=%s", + values, + ) + else: + placeholders = ", ".join(["%s"] * len(fields)) + col_names = ", ".join(f"`{f}`" for f in fields) + values = [kwargs.get(f, 0) for f in fields] + await cur.execute( + f"INSERT INTO tb_fixture_param (dnt_id, {col_names}) " + f"VALUES (%s, {placeholders})", + [dnt_id] + values, + ) + + +async def get_fixture_param(dnt_id: int) -> dict | None: + """获取设备的工装测试参数""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT * FROM tb_fixture_param WHERE dnt_id=%s", (dnt_id,), + ) + return await cur.fetchone() + + +# ─── tb_vechicle_base_test CRUD ──────────────────────────────────── + +async def get_vehicle_base_tests() -> list[dict]: + """获取所有车检器测试基准参数""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT * FROM tb_vechicle_base_test ORDER BY type_num ASC", + ) + return await cur.fetchall() + + +async def get_vehicle_base_test_by_type(type_num: int) -> dict | None: + """根据类型编码获取车检器测试基准""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + "SELECT * FROM tb_vechicle_base_test WHERE type_num=%s", + (type_num,), + ) + return await cur.fetchone() diff --git a/src/server.py b/src/server.py index 7ad6779..73d07af 100644 --- a/src/server.py +++ b/src/server.py @@ -33,6 +33,7 @@ from src.handlers import ( handle_serial_net, parse_loop, serialnet_loop, + serialnet_response_loop, set_udp_sender, ) @@ -185,6 +186,7 @@ async def main(): asyncio.create_task(parse_loop()) asyncio.create_task(serialnet_loop()) + asyncio.create_task(serialnet_response_loop()) loop = asyncio.get_running_loop()