"""DG430 串口协议二进制解析 解析测试工装上报的 B2 状态包(dat_type=8 对应的 raw_data 为 DG430 上报数据)。 """ import logging from dataclasses import dataclass logger = logging.getLogger(__name__) # ─── 协议常量 ─────────────────────────────────────────────────────── STX = 0x7F PKT_MIN_LEN = 6 # STX + ADDR + LEN + CMD + XOR + SUM @dataclass class DG430Status: """解析后的 DG430 状态数据""" addr: int # 地址 dev_model: int # 1 PD132, 2 DLD110 test_mode: int # 0 灵敏度, 1 模拟过车 is_finished: bool # 是否正常完成 finish_code: int # 0 正常, 1 未完成, 2 地感死机 fault: int # bitmask relay_out: int # bitmask ppvalue: float # 峰峰值 V idle_freq: float # 开始工作频率 Hz enter_freq: float # 进入工作频率 Hz exit_freq: float # 离开工作频率 Hz enter_dist: int # 进入高度 mm exit_dist: int # 离开高度 mm enter_speed: int # 进入速度 dm/s exit_speed: int # 离开速度 dm/s # ─── 工具函数 ─────────────────────────────────────────────────────── def _xor_checksum(data: bytes, start: int, end: int) -> int: """计算异或校验,区间 [start, end)""" result = 0 for i in range(start, end): result ^= data[i] return result & 0xFF def _sum_checksum(data: bytes, start: int, end: int) -> int: """计算和校验""" result = 0 for i in range(start, end): result += data[i] return result & 0xFF def _le16(data: bytes, offset: int) -> int: """小端 2 字节 → int""" return data[offset] | (data[offset + 1] << 8) def _be16(data: bytes, offset: int) -> int: """大端 2 字节 → int""" return (data[offset] << 8) | data[offset + 1] def verify_packet(data: bytes) -> bool: """校验数据包完整性""" if len(data) < PKT_MIN_LEN: return False if data[0] != STX: return False length = data[2] expected_len = 1 + 1 + 1 + length + 1 + 1 # STX + ADDR + LEN + DATA(LEN) + XOR + SUM if len(data) != expected_len: return False payload_end = 3 + length # 校验范围 [ADDR .. DATA末尾] actual_xor = _xor_checksum(data, 1, payload_end) actual_sum = _sum_checksum(data, 1, payload_end) if data[payload_end] != actual_xor: return False if data[payload_end + 1] != actual_sum: return False return True def hex_str_to_bytes(hex_str: str) -> bytes: """将十六进制字符串转为 bytes,如 '7F8118B2...' → bytes""" return bytes.fromhex(hex_str) def split_packets(data: bytes) -> list[bytes]: """从拼接的字节流中拆分出各个 DG430 数据包 每条记录可能包含多条串口指令拼在一起,如: B1 复位回复 + B2 状态上报 + ... 根据 STX + LEN 字段确定包边界,逐个拆分。 Returns: 独立的数据包列表,每个元素为一个完整包 (含 STX .. SUM) """ packets = [] i = 0 while i < len(data): # 找 STX stx_pos = data.find(STX, i) if stx_pos < 0: break # 需要至少读到 LEN 字段 (STX + ADDR + LEN = 3 bytes) if stx_pos + 3 > len(data): break pkt_len = 5 + data[stx_pos + 2] # STX + ADDR + LEN + DATA(LEN) + XOR + SUM end = stx_pos + pkt_len if end > len(data): logger.warning(f"数据包不完整: stx_pos={stx_pos}, pkt_len={pkt_len}, data_len={len(data)}") break packets.append(data[stx_pos:end]) i = end return packets # ─── 解析指令 ─────────────────────────────────────────────────────── def parse_b2_status(data: bytes) -> DG430Status | None: """解析 0xB2 状态上报包 格式: STX | ADDR | LEN | CMD | DATA(LEN-1 bytes) | XOR | SUM """ if not verify_packet(data): logger.warning("DG430 数据包校验失败") return None addr = data[1] & 0x7F cmd = data[3] if cmd != 0xB2: logger.debug(f"非 B2 指令: 0x{cmd:02X}") return None payload = data[4:3 + data[2]] # DATA = LEN - 1 bytes (CMD 占位置 3) if len(payload) < 20: logger.warning(f"B2 数据长度不足: {len(payload)} < 20") return None # 设备型号 dev_model = payload[0] # 1=PD132, 2=DLD110 test_mode = payload[1] # 0=灵敏度, 1=模拟过车 finish_code = payload[2] # 0=正常, 1=未完成, 2=死机 fault = payload[3] # bitmask relay_out = payload[4] # bitmask # 峰峰值: 小端 2 字节,公式: (X * 3.3 / 4095) * 4 pp_raw = _le16(payload, 5) ppvalue = (pp_raw * 3.3 / 4095) * 4 # 频率: 小端 2 字节,公式: 10 * X idle_freq = _le16(payload, 7) * 10.0 enter_freq = _le16(payload, 9) * 10.0 exit_freq = _le16(payload, 11) * 10.0 # 高度 mm enter_dist = _le16(payload, 13) exit_dist = _le16(payload, 15) # 速度 dm/s (V2.0.2 新增) enter_speed = _le16(payload, 17) if len(payload) >= 19 else 0 exit_speed = _le16(payload, 19) if len(payload) >= 21 else 0 return DG430Status( addr=addr, dev_model=dev_model, test_mode=test_mode, is_finished=(finish_code == 0), finish_code=finish_code, fault=fault, relay_out=relay_out, ppvalue=round(ppvalue, 4), idle_freq=round(idle_freq, 1), enter_freq=round(enter_freq, 1), exit_freq=round(exit_freq, 1), enter_dist=enter_dist, exit_dist=exit_dist, enter_speed=enter_speed, exit_speed=exit_speed, ) # ─── 故障解码 ─────────────────────────────────────────────────────── FAULT_BITS = { 0: "工作频率不是最低频", 1: "灵敏度不是最低灵敏度", 2: "灵敏度提升拨码不在OFF", 3: "脉冲输出不是离开脉冲", } RELAY_BITS = { 0: "存在继电器信号有输出", 1: "脉冲继电器信号有输出", } def decode_fault_info(fault: int) -> str: """解码故障 bitmask 为可读字符串""" items = [] for bit, desc in FAULT_BITS.items(): if fault & (1 << bit): items.append(desc) return "; ".join(items) if items else "无故障" def decode_relay_info(relay: int) -> str: """解码继电器 bitmask""" items = [] for bit, desc in RELAY_BITS.items(): if relay & (1 << bit): items.append(desc) return "; ".join(items) if items else "无输出" # ─── 0x4A 获取设备版本号响应 ────────────────────────────────────── @dataclass class DG430Version: addr: int hw_major: int hw_minor: int hw_patch: int sw_major: int sw_minor: int sw_patch: int def parse_4a_version(data: bytes) -> DG430Version | None: """解析 0x4A 版本号响应 格式: 7F | ADDR | 08 | 4A | 00 | HW(3B) | SW(3B) | XOR | SUM """ if not verify_packet(data): return None cmd = data[3] if cmd != 0x4A: return None payload = data[4:3 + data[2]] if len(payload) < 7: return None addr = data[1] & 0x7F return DG430Version( addr=addr, hw_major=payload[1], hw_minor=payload[2], hw_patch=payload[3], sw_major=payload[4], sw_minor=payload[5], sw_patch=payload[6], ) # ─── 通用 Flag 响应 (0x4B/0x4D/0x4E) ───────────────────────────── def parse_flag_response(data: bytes, expected_cmd: int) -> int | None: """解析 Flag 响应格式: STX | ADDR | 02 | CMD | Flag | XOR | SUM Returns: Flag 值 (0=正常, 1=故障), 或 None 表示解析失败 """ if not verify_packet(data): return None cmd = data[3] if cmd != expected_cmd: return None if data[2] < 2: return None return data[4] # Flag # ─── 0x4C 查询设备测试参数响应 ──────────────────────────────────── @dataclass class DG430FixtureParams: addr: int flag: int # 0=正常, 1=故障 dev_addr: int # 设备地址 dev_type: int # 设备型号 test_mode: int # 0 灵敏度, 1 模拟过车 reset_dis: int # 复位距离 cm minus_dis: int # 皮距 cm sens_min: int # 灵敏度最小值 sens_max: int # 灵敏度最大值 fre_min: int # 频率最小值 Hz fre_max: int # 频率最大值 Hz peak_min: int # 峰峰值最小值 peak_max: int # 峰峰值最大值 def parse_4c_params(data: bytes) -> DG430FixtureParams | None: """解析 0x4C 查询测试参数响应 格式: 7F | ADDR | 13 | 4C | Flag | Addr | DevType | TestMode | ResetDis | MinusDis | SensMin(2) | SensMax(2) | FreMin(2) | FreMax(2) | PeakMin(2) | PeakMax(2) | XOR | SUM """ if not verify_packet(data): return None cmd = data[3] if cmd != 0x4C: return None payload = data[4:3 + data[2]] if len(payload) < 18: return None addr = data[1] & 0x7F # 0x4B/0x4C 多字节字段为小端序 return DG430FixtureParams( addr=addr, flag=payload[0], dev_addr=payload[1], dev_type=payload[2], test_mode=payload[3], reset_dis=payload[4], minus_dis=payload[5], sens_min=_le16(payload, 6), sens_max=_le16(payload, 8), fre_min=_le16(payload, 10), fre_max=_le16(payload, 12), peak_min=_le16(payload, 14), peak_max=_le16(payload, 16), ) # ─── 获取数据包 CMD(用于匹配)──────────────────────────────────── def get_packet_cmd(data: bytes) -> int | None: """从数据包中提取 CMD 字节""" if len(data) < 4: return None return data[3]