"""DG430 串口协议二进制解析 解析测试工装上报的 B2 状态包(dat_type=8 对应的 raw_data 为 DG430 上报数据)。 """ import logging from dataclasses import dataclass logger = logging.getLogger(__name__) # ─── 协议常量 ─────────────────────────────────────────────────────── STX = 0x7F PKT_MIN_LEN = 6 # STX + ADDR + LEN + CMD + XOR + SUM @dataclass class DG430Status: """解析后的 DG430 状态数据""" addr: int # 地址 dev_model: int # 1 PD132, 2 DLD110 test_mode: int # 0 灵敏度, 1 波动测试 is_finished: bool # 是否正常完成 finish_code: int # 0 正常, 1 未完成, 2 地感死机 fault: int # bitmask relay_out: int # bitmask ppvalue: float # 峰峰值 V idle_freq: float # 开始工作频率 Hz enter_freq: float # 进入工作频率 Hz exit_freq: float # 离开工作频率 Hz enter_dist: int # 进入高度 mm exit_dist: int # 离开高度 mm enter_speed: int # 进入速度 dm/s exit_speed: int # 离开速度 dm/s # ─── 工具函数 ─────────────────────────────────────────────────────── def _xor_checksum(data: bytes, start: int, end: int) -> int: """计算异或校验,区间 [start, end)""" result = 0 for i in range(start, end): result ^= data[i] return result & 0xFF def _sum_checksum(data: bytes, start: int, end: int) -> int: """计算和校验""" result = 0 for i in range(start, end): result += data[i] return result & 0xFF def _le16(data: bytes, offset: int) -> int: """小端 2 字节 → int""" return data[offset] | (data[offset + 1] << 8) def _be16(data: bytes, offset: int) -> int: """大端 2 字节 → int""" return (data[offset] << 8) | data[offset + 1] def verify_packet(data: bytes) -> bool: """校验数据包完整性""" if len(data) < PKT_MIN_LEN: return False if data[0] != STX: return False length = data[2] expected_len = 1 + 1 + 1 + length + 1 + 1 # STX + ADDR + LEN + DATA(LEN) + XOR + SUM if len(data) != expected_len: return False payload_end = 3 + length # 校验范围 [ADDR .. DATA末尾] actual_xor = _xor_checksum(data, 1, payload_end) actual_sum = _sum_checksum(data, 1, payload_end) if data[payload_end] != actual_xor: return False if data[payload_end + 1] != actual_sum: return False return True def hex_str_to_bytes(hex_str: str) -> bytes: """将十六进制字符串转为 bytes,如 '7F8118B2...' → bytes""" return bytes.fromhex(hex_str) def split_packets(data: bytes) -> list[bytes]: """从拼接的字节流中拆分出各个 DG430 数据包 每条记录可能包含多条串口指令拼在一起,如: B1 复位回复 + B2 状态上报 + ... 根据 STX + LEN 字段确定包边界,逐个拆分。 Returns: 独立的数据包列表,每个元素为一个完整包 (含 STX .. SUM) """ packets = [] i = 0 while i < len(data): # 找 STX stx_pos = data.find(STX, i) if stx_pos < 0: break # 需要至少读到 LEN 字段 (STX + ADDR + LEN = 3 bytes) if stx_pos + 3 > len(data): break pkt_len = 5 + data[stx_pos + 2] # STX + ADDR + LEN + DATA(LEN) + XOR + SUM end = stx_pos + pkt_len if end > len(data): logger.warning(f"数据包不完整: stx_pos={stx_pos}, pkt_len={pkt_len}, data_len={len(data)}") break packets.append(data[stx_pos:end]) i = end return packets # ─── 解析指令 ─────────────────────────────────────────────────────── def parse_b2_status(data: bytes) -> DG430Status | None: """解析 0xB2 状态上报包 格式: STX | ADDR | LEN | CMD | DATA(LEN-1 bytes) | XOR | SUM """ if not verify_packet(data): logger.warning("DG430 数据包校验失败") return None addr = data[1] & 0x7F cmd = data[3] if cmd != 0xB2: logger.debug(f"非 B2 指令: 0x{cmd:02X}") return None payload = data[4:3 + data[2]] # DATA = LEN - 1 bytes (CMD 占位置 3) if len(payload) < 20: logger.warning(f"B2 数据长度不足: {len(payload)} < 20") return None # 设备型号 dev_model = payload[0] # 1=PD132, 2=DLD110 test_mode = payload[1] # 0=灵敏度, 1=波动测试 finish_code = payload[2] # 0=正常, 1=未完成, 2=死机 fault = payload[3] # bitmask relay_out = payload[4] # bitmask # 峰峰值: 小端 2 字节,公式: (X * 3.3 / 4095) * 4 pp_raw = _le16(payload, 5) ppvalue = (pp_raw * 3.3 / 4095) * 4 # 频率: 小端 2 字节,公式: 10 * X idle_freq = _le16(payload, 7) * 10.0 enter_freq = _le16(payload, 9) * 10.0 exit_freq = _le16(payload, 11) * 10.0 # 高度 mm enter_dist = _le16(payload, 13) exit_dist = _le16(payload, 15) # 速度 dm/s (V2.0.2 新增) enter_speed = _le16(payload, 17) if len(payload) >= 19 else 0 exit_speed = _le16(payload, 19) if len(payload) >= 21 else 0 return DG430Status( addr=addr, dev_model=dev_model, test_mode=test_mode, is_finished=(finish_code == 0), finish_code=finish_code, fault=fault, relay_out=relay_out, ppvalue=round(ppvalue, 4), idle_freq=round(idle_freq, 1), enter_freq=round(enter_freq, 1), exit_freq=round(exit_freq, 1), enter_dist=enter_dist, exit_dist=exit_dist, enter_speed=enter_speed, exit_speed=exit_speed, ) # ─── 0xB4 波动测试上报 ────────────────────────────────────────── @dataclass class DG430WaveStatus: """波动测试状态上报数据""" addr: int # 地址 remain_count: int # 剩余波动次数 relay_out: int # 继电器输出 bitmask work_freq: float # 工作频率 Hz curr_dist: int # 当前距离 mm speed: int # 当前速度 dm/s near_dist: int # 波动最近距离 mm far_dist: int # 波动最远距离 mm enter_dist: int # 进入高度 mm leave_dist: int # 离开高度 mm def parse_b4_wave_status(data: bytes) -> DG430WaveStatus | None: """解析 0xB4 波动测试状态上报包 格式: STX | ADDR | 11 | B4 | DATA(16B) | XOR | SUM DATA: RemainCount(1) | Relay(1) | WorkFreq(2 LE) | CurrDist(2 LE) | Speed(2 LE) | NearDist(2 LE) | FarDist(2 LE) | EnterDist(2 LE) | LeaveDist(2 LE) """ if not verify_packet(data): logger.warning("DG430 B4 数据包校验失败") return None cmd = data[3] if cmd != 0xB4: logger.debug(f"非 B4 指令: 0x{cmd:02X}") return None payload = data[4:3 + data[2]] if len(payload) < 16: logger.warning(f"B4 数据长度不足: {len(payload)} < 16") return None addr = data[1] & 0x7F remain_count = payload[0] relay_out = payload[1] work_freq = _le16(payload, 2) * 10.0 curr_dist = _le16(payload, 4) speed = _le16(payload, 6) near_dist = _le16(payload, 8) far_dist = _le16(payload, 10) enter_dist = _le16(payload, 12) leave_dist = _le16(payload, 14) return DG430WaveStatus( addr=addr, remain_count=remain_count, relay_out=relay_out, work_freq=round(work_freq, 1), curr_dist=curr_dist, speed=speed, near_dist=near_dist, far_dist=far_dist, enter_dist=enter_dist, leave_dist=leave_dist, ) # ─── 故障解码 ─────────────────────────────────────────────────────── FAULT_BITS = { 0: "工作频率不是最低频", 1: "灵敏度不是最低灵敏度", 2: "灵敏度提升拨码不在OFF", 3: "脉冲输出不是离开脉冲", } RELAY_BITS = { 0: "存在继电器信号有输出", 1: "脉冲继电器信号有输出", } def decode_fault_info(fault: int) -> str: """解码故障 bitmask 为可读字符串""" items = [] for bit, desc in FAULT_BITS.items(): if fault & (1 << bit): items.append(desc) return "; ".join(items) if items else "无故障" def decode_relay_info(relay: int) -> str: """解码继电器 bitmask""" items = [] for bit, desc in RELAY_BITS.items(): if relay & (1 << bit): items.append(desc) return "; ".join(items) if items else "无输出" # ─── 0x4A 获取设备版本号响应 ────────────────────────────────────── @dataclass class DG430Version: addr: int hw_major: int hw_minor: int hw_patch: int sw_major: int sw_minor: int sw_patch: int def parse_4a_version(data: bytes) -> DG430Version | None: """解析 0x4A 版本号响应 格式: 7F | ADDR | 08 | 4A | 00 | HW(3B) | SW(3B) | XOR | SUM """ if not verify_packet(data): return None cmd = data[3] if cmd != 0x4A: return None payload = data[4:3 + data[2]] if len(payload) < 7: return None addr = data[1] & 0x7F return DG430Version( addr=addr, hw_major=payload[1], hw_minor=payload[2], hw_patch=payload[3], sw_major=payload[4], sw_minor=payload[5], sw_patch=payload[6], ) # ─── 通用 Flag 响应 (0x4B/0x4D/0x4E) ───────────────────────────── def parse_flag_response(data: bytes, expected_cmd: int) -> int | None: """解析 Flag 响应格式: STX | ADDR | 02 | CMD | Flag | XOR | SUM Returns: Flag 值 (0=正常, 1=故障), 或 None 表示解析失败 """ if not verify_packet(data): return None cmd = data[3] if cmd != expected_cmd: return None if data[2] < 2: return None return data[4] # Flag # ─── 0x4C 查询设备测试参数响应 ──────────────────────────────────── @dataclass class DG430FixtureParams: addr: int flag: int # 0=正常, 1=故障 dev_addr: int # 设备地址 dev_type: int # 设备型号 test_mode: int # 0 灵敏度, 1 波动测试 reset_dis: int # 复位距离 cm minus_dis: int # 皮距 cm sens_min: int # 灵敏度最小值 sens_max: int # 灵敏度最大值 fre_min: int # 频率最小值 Hz fre_max: int # 频率最大值 Hz peak_min: int # 峰峰值最小值 peak_max: int # 峰峰值最大值 far_tol: int # 最远容差 cm near_tol: int # 最近容差 cm step_tol: int # 步进容差 cm back_forth: int # 来回次数 near_stay: int # 最近停留时间 ms far_stay: int # 最远停留时间 ms def parse_4c_params(data: bytes) -> DG430FixtureParams | None: """解析 0x4C 查询测试参数响应 (V2.0.3 扩展) 格式: 7F | ADDR | 1B | 4C | Flag | Addr | DevType | TestMode | ResetDis | MinusDis | SensMin(2) | SensMax(2) | FreMin(2) | FreMax(2) | PeakMin(2) | PeakMax(2) | FarTol(1) | NearTol(1) | StepTol(1) | BackForth(1) | NearStay(2) | FarStay(2) | XOR | SUM """ if not verify_packet(data): return None cmd = data[3] if cmd != 0x4C: return None payload = data[4:3 + data[2]] if len(payload) < 18: return None addr = data[1] & 0x7F # 0x4B/0x4C 多字节字段为小端序 # V2.0.3 新增6个波动参数字段,兼容旧版(长度不足时默认为0) far_tol = payload[18] if len(payload) >= 19 else 0 near_tol = payload[19] if len(payload) >= 20 else 0 step_tol = payload[20] if len(payload) >= 21 else 0 back_forth = payload[21] if len(payload) >= 22 else 0 near_stay = _le16(payload, 22) if len(payload) >= 24 else 0 far_stay = _le16(payload, 24) if len(payload) >= 26 else 0 return DG430FixtureParams( addr=addr, flag=payload[0], dev_addr=payload[1], dev_type=payload[2], test_mode=payload[3], reset_dis=payload[4], minus_dis=payload[5], sens_min=_le16(payload, 6), sens_max=_le16(payload, 8), fre_min=_le16(payload, 10), fre_max=_le16(payload, 12), peak_min=_le16(payload, 14), peak_max=_le16(payload, 16), far_tol=far_tol, near_tol=near_tol, step_tol=step_tol, back_forth=back_forth, near_stay=near_stay, far_stay=far_stay, ) # ─── 获取数据包 CMD(用于匹配)──────────────────────────────────── def get_packet_cmd(data: bytes) -> int | None: """从数据包中提取 CMD 字节""" if len(data) < 4: return None return data[3]