fix: 支持拼接多包解析 + 校验失败标记 state=3

- dg430.py: 新增 split_packets() 按 STX+LEN 拆分拼接的 DG430 数据包
- handlers.py: parse_loop 拆分后只解析 B2 状态上报包,非 B2 跳过
- models.py: mark_parsed 改为 mark_record_state(state) 支持自定义状态
- 校验失败 → state=3; 解析成功 → state=1
This commit is contained in:
wangfq
2026-05-27 16:39:39 +08:00
parent 8c5389670d
commit 97302caf6b
3 changed files with 125 additions and 40 deletions

View File

@@ -86,6 +86,41 @@ def hex_str_to_bytes(hex_str: str) -> bytes:
return bytes.fromhex(hex_str)
def split_packets(data: bytes) -> list[bytes]:
"""从拼接的字节流中拆分出各个 DG430 数据包
每条记录可能包含多条串口指令拼在一起,如:
B1 复位回复 + B2 状态上报 + ...
根据 STX + LEN 字段确定包边界,逐个拆分。
Returns:
独立的数据包列表,每个元素为一个完整包 (含 STX .. SUM)
"""
packets = []
i = 0
while i < len(data):
# 找 STX
stx_pos = data.find(STX, i)
if stx_pos < 0:
break
# 需要至少读到 LEN 字段 (STX + ADDR + LEN = 3 bytes)
if stx_pos + 3 > len(data):
break
pkt_len = 5 + data[stx_pos + 2] # STX + ADDR + LEN + DATA(LEN) + XOR + SUM
end = stx_pos + pkt_len
if end > len(data):
logger.warning(f"数据包不完整: stx_pos={stx_pos}, pkt_len={pkt_len}, data_len={len(data)}")
break
packets.append(data[stx_pos:end])
i = end
return packets
# ─── 解析指令 ───────────────────────────────────────────────────────
def parse_b2_status(data: bytes) -> DG430Status | None:

View File

@@ -17,10 +17,17 @@ from src.models import (
ensure_collect_table,
insert_collect_data,
fetch_unparsed,
mark_parsed,
mark_record_state,
insert_test_result,
)
from src.dg430 import parse_b2_status, hex_str_to_bytes, decode_fault_info, decode_relay_info
from src.dg430 import (
parse_b2_status,
hex_str_to_bytes,
split_packets,
verify_packet,
decode_fault_info,
decode_relay_info,
)
from src.protocol import (
make_timestamp_response,
make_heartbeat_response,
@@ -189,7 +196,12 @@ async def handle_serial_net(data: dict) -> str | None:
# ─── 业务解析服务(后台轮询)─────────────────────────────────────────
async def parse_loop():
"""后台轮询:解析未处理的 DG430 上报数据"""
"""后台轮询:解析未处理的 DG430 上报数据
每条 raw_data 可能包含多条拼接的 DG430 指令。
从中找出 B2 (状态上报) 指令解析,其余指令忽略。
校验失败的记录标记为 state=3。
"""
logger.info("业务解析服务启动")
while True:
@@ -199,9 +211,33 @@ async def parse_loop():
for rec in records:
try:
raw = rec["raw_data"]
pkt = hex_str_to_bytes(raw)
pkt_bytes = hex_str_to_bytes(raw)
# 拆分拼接的多个数据包
packets = split_packets(pkt_bytes)
if not packets:
logger.warning(f"无法拆分数据包: {device_id} rec={rec['id']}")
await mark_record_state(device_id, rec["id"], state=3)
continue
has_valid_b2 = False
all_failed = True
for pkt in packets:
cmd = pkt[3] if len(pkt) > 3 else 0
# 只处理 B2 状态上报
if cmd != 0xB2:
logger.debug(f"跳过非 B2 指令: 0x{cmd:02X}")
continue
if not verify_packet(pkt):
logger.warning(f"B2 数据包校验失败: {device_id} rec={rec['id']}")
continue
status = parse_b2_status(pkt)
if status is None:
logger.warning(f"B2 解析失败: {device_id} rec={rec['id']}")
continue
fault_info = decode_fault_info(status.fault)
@@ -230,17 +266,27 @@ async def parse_loop():
exit_speed=status.exit_speed,
)
await mark_parsed(device_id, rec["id"])
has_valid_b2 = True
all_failed = False
logger.info(
f"解析完成: {device_id} dg430={status.addr} "
f"型号={str_type} 峰峰值={status.ppvalue:.2f}V "
f"进入高度={status.enter_dist}mm 故障={fault_info}"
)
if has_valid_b2:
await mark_record_state(device_id, rec["id"], state=1)
elif all_failed and packets:
await mark_record_state(device_id, rec["id"], state=3)
logger.warning(f"记录 {rec['id']} 所有包校验失败, state=3")
else:
# 有非 B2 包但没有 B2也算已处理无解析目标
await mark_record_state(device_id, rec["id"], state=1)
except Exception as e:
logger.error(f"解析记录 {rec['id']} 失败: {e}")
logger.error(f"解析记录 {rec['id']} 异常: {e}", exc_info=True)
try:
await mark_parsed(device_id, rec["id"])
await mark_record_state(device_id, rec["id"], state=3)
except Exception:
pass

View File

@@ -178,14 +178,18 @@ async def fetch_unparsed(device_id: str) -> list[dict]:
return await cur.fetchall()
async def mark_parsed(device_id: str, record_id: int):
"""标记记录为已处理"""
async def mark_record_state(device_id: str, record_id: int, state: int = 1):
"""更新记录状态
Args:
state: 0=未处理, 1=已处理, 3=校验失败
"""
table = collect_table_name(device_id)
pool = await get_pool()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"UPDATE `{table}` SET state = 1 WHERE id = %s", (record_id,)
f"UPDATE `{table}` SET state = %s WHERE id = %s", (state, record_id),
)