feat: DG430 V2.0.3 — 波动测试模式支持

- dg430.py: 新增 DG430WaveStatus + parse_b4_wave_status() 0xB4解析
- dg430.py: 0x4C 扩展6字段(向后兼容旧版长度)
- models.py: tb_fixture_param DDL + upsert 新增6个波动参数
- handlers.py: parse_loop 添加0xB4处理; 0x4C传参扩展
- TestMode=1 模拟过车→波动测试 (注释)
This commit is contained in:
wangfq
2026-06-02 18:06:07 +08:00
parent 6ecc653133
commit 2d6c9f03dd
3 changed files with 129 additions and 7 deletions

View File

@@ -19,7 +19,7 @@ class DG430Status:
"""解析后的 DG430 状态数据"""
addr: int # 地址
dev_model: int # 1 PD132, 2 DLD110
test_mode: int # 0 灵敏度, 1 模拟过车
test_mode: int # 0 灵敏度, 1 波动测试
is_finished: bool # 是否正常完成
finish_code: int # 0 正常, 1 未完成, 2 地感死机
fault: int # bitmask
@@ -150,7 +150,7 @@ def parse_b2_status(data: bytes) -> DG430Status | None:
# 设备型号
dev_model = payload[0] # 1=PD132, 2=DLD110
test_mode = payload[1] # 0=灵敏度, 1=模拟过车
test_mode = payload[1] # 0=灵敏度, 1=波动测试
finish_code = payload[2] # 0=正常, 1=未完成, 2=死机
fault = payload[3] # bitmask
relay_out = payload[4] # bitmask
@@ -191,6 +191,70 @@ def parse_b2_status(data: bytes) -> DG430Status | None:
)
# ─── 0xB4 波动测试上报 ──────────────────────────────────────────
@dataclass
class DG430WaveStatus:
"""波动测试状态上报数据"""
addr: int # 地址
remain_count: int # 剩余波动次数
relay_out: int # 继电器输出 bitmask
work_freq: float # 工作频率 Hz
curr_dist: int # 当前距离 mm
speed: int # 当前速度 dm/s
near_dist: int # 波动最近距离 mm
far_dist: int # 波动最远距离 mm
enter_dist: int # 进入高度 mm
leave_dist: int # 离开高度 mm
def parse_b4_wave_status(data: bytes) -> DG430WaveStatus | None:
"""解析 0xB4 波动测试状态上报包
格式: STX | ADDR | 11 | B4 | DATA(16B) | XOR | SUM
DATA: RemainCount(1) | Relay(1) | WorkFreq(2 LE) | CurrDist(2 LE) |
Speed(2 LE) | NearDist(2 LE) | FarDist(2 LE) |
EnterDist(2 LE) | LeaveDist(2 LE)
"""
if not verify_packet(data):
logger.warning("DG430 B4 数据包校验失败")
return None
cmd = data[3]
if cmd != 0xB4:
logger.debug(f"非 B4 指令: 0x{cmd:02X}")
return None
payload = data[4:3 + data[2]]
if len(payload) < 16:
logger.warning(f"B4 数据长度不足: {len(payload)} < 16")
return None
addr = data[1] & 0x7F
remain_count = payload[0]
relay_out = payload[1]
work_freq = _le16(payload, 2) * 10.0
curr_dist = _le16(payload, 4)
speed = _le16(payload, 6)
near_dist = _le16(payload, 8)
far_dist = _le16(payload, 10)
enter_dist = _le16(payload, 12)
leave_dist = _le16(payload, 14)
return DG430WaveStatus(
addr=addr,
remain_count=remain_count,
relay_out=relay_out,
work_freq=round(work_freq, 1),
curr_dist=curr_dist,
speed=speed,
near_dist=near_dist,
far_dist=far_dist,
enter_dist=enter_dist,
leave_dist=leave_dist,
)
# ─── 故障解码 ───────────────────────────────────────────────────────
FAULT_BITS = {
@@ -290,7 +354,7 @@ class DG430FixtureParams:
flag: int # 0=正常, 1=故障
dev_addr: int # 设备地址
dev_type: int # 设备型号
test_mode: int # 0 灵敏度, 1 模拟过车
test_mode: int # 0 灵敏度, 1 波动测试
reset_dis: int # 复位距离 cm
minus_dis: int # 皮距 cm
sens_min: int # 灵敏度最小值
@@ -299,14 +363,22 @@ class DG430FixtureParams:
fre_max: int # 频率最大值 Hz
peak_min: int # 峰峰值最小值
peak_max: int # 峰峰值最大值
far_tol: int # 最远容差 cm
near_tol: int # 最近容差 cm
step_tol: int # 步进容差 cm
back_forth: int # 来回次数
near_stay: int # 最近停留时间 ms
far_stay: int # 最远停留时间 ms
def parse_4c_params(data: bytes) -> DG430FixtureParams | None:
"""解析 0x4C 查询测试参数响应
"""解析 0x4C 查询测试参数响应 (V2.0.3 扩展)
格式: 7F | ADDR | 13 | 4C | Flag | Addr | DevType | TestMode |
格式: 7F | ADDR | 1B | 4C | Flag | Addr | DevType | TestMode |
ResetDis | MinusDis | SensMin(2) | SensMax(2) |
FreMin(2) | FreMax(2) | PeakMin(2) | PeakMax(2) | XOR | SUM
FreMin(2) | FreMax(2) | PeakMin(2) | PeakMax(2) |
FarTol(1) | NearTol(1) | StepTol(1) | BackForth(1) |
NearStay(2) | FarStay(2) | XOR | SUM
"""
if not verify_packet(data):
return None
@@ -320,6 +392,14 @@ def parse_4c_params(data: bytes) -> DG430FixtureParams | None:
addr = data[1] & 0x7F
# 0x4B/0x4C 多字节字段为小端序
# V2.0.3 新增6个波动参数字段兼容旧版长度不足时默认为0
far_tol = payload[18] if len(payload) >= 19 else 0
near_tol = payload[19] if len(payload) >= 20 else 0
step_tol = payload[20] if len(payload) >= 21 else 0
back_forth = payload[21] if len(payload) >= 22 else 0
near_stay = _le16(payload, 22) if len(payload) >= 24 else 0
far_stay = _le16(payload, 24) if len(payload) >= 26 else 0
return DG430FixtureParams(
addr=addr,
flag=payload[0],
@@ -334,6 +414,12 @@ def parse_4c_params(data: bytes) -> DG430FixtureParams | None:
fre_max=_le16(payload, 12),
peak_min=_le16(payload, 14),
peak_max=_le16(payload, 16),
far_tol=far_tol,
near_tol=near_tol,
step_tol=step_tol,
back_forth=back_forth,
near_stay=near_stay,
far_stay=far_stay,
)