feat: EDC 服务 — Python/uvloop 实现,UDP/TCP 异步网络服务

This commit is contained in:
wangfq
2026-05-27 10:23:15 +08:00
commit a10d176f68
11 changed files with 1076 additions and 0 deletions

181
src/dg430.py Normal file
View File

@@ -0,0 +1,181 @@
"""DG430 串口协议二进制解析
解析测试工装上报的 B2 状态包dat_type=8 对应的 raw_data 为 DG430 上报数据)。
"""
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# ─── 协议常量 ───────────────────────────────────────────────────────
STX = 0x7F
PKT_MIN_LEN = 6 # STX + ADDR + LEN + CMD + XOR + SUM
@dataclass
class DG430Status:
"""解析后的 DG430 状态数据"""
addr: int # 地址
dev_model: int # 1 PD132, 2 DLD110
test_mode: int # 0 灵敏度, 1 模拟过车
is_finished: bool # 是否正常完成
finish_code: int # 0 正常, 1 未完成, 2 地感死机
fault: int # bitmask
relay_out: int # bitmask
ppvalue: float # 峰峰值 V
idle_freq: float # 开始工作频率 Hz
enter_freq: float # 进入工作频率 Hz
exit_freq: float # 离开工作频率 Hz
enter_dist: int # 进入高度 mm
exit_dist: int # 离开高度 mm
enter_speed: int # 进入速度 dm/s
exit_speed: int # 离开速度 dm/s
# ─── 工具函数 ───────────────────────────────────────────────────────
def _xor_checksum(data: bytes, start: int, end: int) -> int:
"""计算异或校验,区间 [start, end)"""
result = 0
for i in range(start, end):
result ^= data[i]
return result & 0xFF
def _sum_checksum(data: bytes, start: int, end: int) -> int:
"""计算和校验"""
result = 0
for i in range(start, end):
result += data[i]
return result & 0xFF
def _le16(data: bytes, offset: int) -> int:
"""小端 2 字节 → int"""
return data[offset] | (data[offset + 1] << 8)
def verify_packet(data: bytes) -> bool:
"""校验数据包完整性"""
if len(data) < PKT_MIN_LEN:
return False
if data[0] != STX:
return False
length = data[2]
expected_len = 1 + 1 + 1 + length + 1 + 1 # STX + ADDR + LEN + DATA + XOR + SUM
if len(data) != expected_len:
return False
payload_end = 4 + length # STX(1) + ADDR(1) + LEN(1) + CMD(1) + DATA(LEN-1)
actual_xor = _xor_checksum(data, 1, payload_end)
actual_sum = _sum_checksum(data, 1, payload_end)
if data[payload_end] != actual_xor:
return False
if data[payload_end + 1] != actual_sum:
return False
return True
def hex_str_to_bytes(hex_str: str) -> bytes:
"""将十六进制字符串转为 bytes'7F8118B2...' → bytes"""
return bytes.fromhex(hex_str)
# ─── 解析指令 ───────────────────────────────────────────────────────
def parse_b2_status(data: bytes) -> DG430Status | None:
"""解析 0xB2 状态上报包
格式: STX | ADDR | LEN(0x18=24) | 0xB2 | 20字节状态内容 | XOR | SUM
"""
if not verify_packet(data):
logger.warning("DG430 数据包校验失败")
return None
addr = data[1] & 0x7F
cmd = data[3]
if cmd != 0xB2:
logger.debug(f"非 B2 指令: 0x{cmd:02X}")
return None
payload = data[4:24] # 20 字节状态内容
# 设备型号
dev_model = payload[0] # 1=PD132, 2=DLD110
test_mode = payload[1] # 0=灵敏度, 1=模拟过车
finish_code = payload[2] # 0=正常, 1=未完成, 2=死机
fault = payload[3] # bitmask
relay_out = payload[4] # bitmask
# 峰峰值: 小端 2 字节,公式: (X * 3.3 / 4095) * 4
pp_raw = _le16(payload, 5)
ppvalue = (pp_raw * 3.3 / 4095) * 4
# 频率: 小端 2 字节,公式: 10 * X
idle_freq = _le16(payload, 7) * 10.0
enter_freq = _le16(payload, 9) * 10.0
exit_freq = _le16(payload, 11) * 10.0
# 高度 mm
enter_dist = _le16(payload, 13)
exit_dist = _le16(payload, 15)
# 速度 dm/s
enter_speed = _le16(payload, 17)
exit_speed = _le16(payload, 19)
return DG430Status(
addr=addr,
dev_model=dev_model,
test_mode=test_mode,
is_finished=(finish_code == 0),
finish_code=finish_code,
fault=fault,
relay_out=relay_out,
ppvalue=round(ppvalue, 4),
idle_freq=round(idle_freq, 1),
enter_freq=round(enter_freq, 1),
exit_freq=round(exit_freq, 1),
enter_dist=enter_dist,
exit_dist=exit_dist,
enter_speed=enter_speed,
exit_speed=exit_speed,
)
# ─── 故障解码 ───────────────────────────────────────────────────────
FAULT_BITS = {
0: "工作频率不是最低频",
1: "灵敏度不是最低灵敏度",
2: "灵敏度提升拨码不在OFF",
3: "脉冲输出不是离开脉冲",
}
RELAY_BITS = {
0: "存在继电器信号有输出",
1: "脉冲继电器信号有输出",
}
def decode_fault_info(fault: int) -> str:
"""解码故障 bitmask 为可读字符串"""
items = []
for bit, desc in FAULT_BITS.items():
if fault & (1 << bit):
items.append(desc)
return "; ".join(items) if items else "无故障"
def decode_relay_info(relay: int) -> str:
"""解码继电器 bitmask"""
items = []
for bit, desc in RELAY_BITS.items():
if relay & (1 << bit):
items.append(desc)
return "; ".join(items) if items else "无输出"