From 97302caf6bb389a09dcafbc67c1fbc54d8bf060a Mon Sep 17 00:00:00 2001 From: wangfq Date: Wed, 27 May 2026 16:39:39 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=94=AF=E6=8C=81=E6=8B=BC=E6=8E=A5?= =?UTF-8?q?=E5=A4=9A=E5=8C=85=E8=A7=A3=E6=9E=90=20+=20=E6=A0=A1=E9=AA=8C?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E6=A0=87=E8=AE=B0=20state=3D3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - dg430.py: 新增 split_packets() 按 STX+LEN 拆分拼接的 DG430 数据包 - handlers.py: parse_loop 拆分后只解析 B2 状态上报包,非 B2 跳过 - models.py: mark_parsed 改为 mark_record_state(state) 支持自定义状态 - 校验失败 → state=3; 解析成功 → state=1 --- src/dg430.py | 35 ++++++++++++++ src/handlers.py | 120 +++++++++++++++++++++++++++++++++--------------- src/models.py | 10 ++-- 3 files changed, 125 insertions(+), 40 deletions(-) diff --git a/src/dg430.py b/src/dg430.py index f8d2c60..64637ea 100644 --- a/src/dg430.py +++ b/src/dg430.py @@ -86,6 +86,41 @@ def hex_str_to_bytes(hex_str: str) -> bytes: return bytes.fromhex(hex_str) +def split_packets(data: bytes) -> list[bytes]: + """从拼接的字节流中拆分出各个 DG430 数据包 + + 每条记录可能包含多条串口指令拼在一起,如: + B1 复位回复 + B2 状态上报 + ... + 根据 STX + LEN 字段确定包边界,逐个拆分。 + + Returns: + 独立的数据包列表,每个元素为一个完整包 (含 STX .. SUM) + """ + packets = [] + i = 0 + while i < len(data): + # 找 STX + stx_pos = data.find(STX, i) + if stx_pos < 0: + break + + # 需要至少读到 LEN 字段 (STX + ADDR + LEN = 3 bytes) + if stx_pos + 3 > len(data): + break + + pkt_len = 5 + data[stx_pos + 2] # STX + ADDR + LEN + DATA(LEN) + XOR + SUM + end = stx_pos + pkt_len + + if end > len(data): + logger.warning(f"数据包不完整: stx_pos={stx_pos}, pkt_len={pkt_len}, data_len={len(data)}") + break + + packets.append(data[stx_pos:end]) + i = end + + return packets + + # ─── 解析指令 ─────────────────────────────────────────────────────── def parse_b2_status(data: bytes) -> DG430Status | None: diff --git a/src/handlers.py b/src/handlers.py index 27057d6..ce58733 100644 --- a/src/handlers.py +++ b/src/handlers.py @@ -17,10 +17,17 @@ from src.models import ( ensure_collect_table, insert_collect_data, fetch_unparsed, - mark_parsed, + mark_record_state, insert_test_result, ) -from src.dg430 import parse_b2_status, hex_str_to_bytes, decode_fault_info, decode_relay_info +from src.dg430 import ( + parse_b2_status, + hex_str_to_bytes, + split_packets, + verify_packet, + decode_fault_info, + decode_relay_info, +) from src.protocol import ( make_timestamp_response, make_heartbeat_response, @@ -189,7 +196,12 @@ async def handle_serial_net(data: dict) -> str | None: # ─── 业务解析服务(后台轮询)───────────────────────────────────────── async def parse_loop(): - """后台轮询:解析未处理的 DG430 上报数据""" + """后台轮询:解析未处理的 DG430 上报数据 + + 每条 raw_data 可能包含多条拼接的 DG430 指令。 + 从中找出 B2 (状态上报) 指令解析,其余指令忽略。 + 校验失败的记录标记为 state=3。 + """ logger.info("业务解析服务启动") while True: @@ -199,48 +211,82 @@ async def parse_loop(): for rec in records: try: raw = rec["raw_data"] - pkt = hex_str_to_bytes(raw) - status = parse_b2_status(pkt) - if status is None: + pkt_bytes = hex_str_to_bytes(raw) + + # 拆分拼接的多个数据包 + packets = split_packets(pkt_bytes) + if not packets: + logger.warning(f"无法拆分数据包: {device_id} rec={rec['id']}") + await mark_record_state(device_id, rec["id"], state=3) continue - fault_info = decode_fault_info(status.fault) - relay_info = decode_relay_info(status.relay_out) + has_valid_b2 = False + all_failed = True - dev_model_map = {1: "PD132", 2: "DLD110"} - str_type = dev_model_map.get(status.dev_model, f"Unknown({status.dev_model})") + for pkt in packets: + cmd = pkt[3] if len(pkt) > 3 else 0 - await insert_test_result( - dnt_id=dnt_id, - dpg430_addr=status.addr, - pcnum=datetime.now().strftime("%Y%m"), - serialnum=0, - sub_type=status.dev_model, - str_type=str_type, - iffinish="1" if status.is_finished else "0", - fault_info=fault_info, - relay_out=relay_info, - ppvalue=status.ppvalue, - idle_freq=status.idle_freq, - enter_freq=status.enter_freq, - exit_freq=status.exit_freq, - enter_dist=status.enter_dist, - exit_dist=status.exit_dist, - enter_speed=status.enter_speed, - exit_speed=status.exit_speed, - ) + # 只处理 B2 状态上报 + if cmd != 0xB2: + logger.debug(f"跳过非 B2 指令: 0x{cmd:02X}") + continue - await mark_parsed(device_id, rec["id"]) - logger.info( - f"解析完成: {device_id} dg430={status.addr} " - f"型号={str_type} 峰峰值={status.ppvalue:.2f}V " - f"进入高度={status.enter_dist}mm 故障={fault_info}" - ) + if not verify_packet(pkt): + logger.warning(f"B2 数据包校验失败: {device_id} rec={rec['id']}") + continue + + status = parse_b2_status(pkt) + if status is None: + logger.warning(f"B2 解析失败: {device_id} rec={rec['id']}") + continue + + fault_info = decode_fault_info(status.fault) + relay_info = decode_relay_info(status.relay_out) + + dev_model_map = {1: "PD132", 2: "DLD110"} + str_type = dev_model_map.get(status.dev_model, f"Unknown({status.dev_model})") + + await insert_test_result( + dnt_id=dnt_id, + dpg430_addr=status.addr, + pcnum=datetime.now().strftime("%Y%m"), + serialnum=0, + sub_type=status.dev_model, + str_type=str_type, + iffinish="1" if status.is_finished else "0", + fault_info=fault_info, + relay_out=relay_info, + ppvalue=status.ppvalue, + idle_freq=status.idle_freq, + enter_freq=status.enter_freq, + exit_freq=status.exit_freq, + enter_dist=status.enter_dist, + exit_dist=status.exit_dist, + enter_speed=status.enter_speed, + exit_speed=status.exit_speed, + ) + + has_valid_b2 = True + all_failed = False + logger.info( + f"解析完成: {device_id} dg430={status.addr} " + f"型号={str_type} 峰峰值={status.ppvalue:.2f}V " + f"进入高度={status.enter_dist}mm 故障={fault_info}" + ) + + if has_valid_b2: + await mark_record_state(device_id, rec["id"], state=1) + elif all_failed and packets: + await mark_record_state(device_id, rec["id"], state=3) + logger.warning(f"记录 {rec['id']} 所有包校验失败, state=3") + else: + # 有非 B2 包但没有 B2,也算已处理(无解析目标) + await mark_record_state(device_id, rec["id"], state=1) except Exception as e: - logger.error(f"解析记录 {rec['id']} 失败: {e}") + logger.error(f"解析记录 {rec['id']} 异常: {e}", exc_info=True) try: - await mark_parsed(device_id, rec["id"]) + await mark_record_state(device_id, rec["id"], state=3) except Exception: pass diff --git a/src/models.py b/src/models.py index 202eccd..3625bce 100644 --- a/src/models.py +++ b/src/models.py @@ -178,14 +178,18 @@ async def fetch_unparsed(device_id: str) -> list[dict]: return await cur.fetchall() -async def mark_parsed(device_id: str, record_id: int): - """标记记录为已处理""" +async def mark_record_state(device_id: str, record_id: int, state: int = 1): + """更新记录状态 + + Args: + state: 0=未处理, 1=已处理, 3=校验失败 + """ table = collect_table_name(device_id) pool = await get_pool() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( - f"UPDATE `{table}` SET state = 1 WHERE id = %s", (record_id,) + f"UPDATE `{table}` SET state = %s WHERE id = %s", (state, record_id), )