From a10d176f68fc87e9d39ee9b14269bdf28d7bfe43 Mon Sep 17 00:00:00 2001 From: wangfq Date: Wed, 27 May 2026 10:23:15 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20EDC=20=E6=9C=8D=E5=8A=A1=20=E2=80=94=20?= =?UTF-8?q?Python/uvloop=20=E5=AE=9E=E7=8E=B0=EF=BC=8CUDP/TCP=20=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E7=BD=91=E7=BB=9C=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 5 + README.md | 75 +++++++++++++ requirements.txt | 2 + run.py | 7 ++ src/__init__.py | 0 src/config.py | 29 +++++ src/dg430.py | 181 ++++++++++++++++++++++++++++++++ src/handlers.py | 219 ++++++++++++++++++++++++++++++++++++++ src/models.py | 268 +++++++++++++++++++++++++++++++++++++++++++++++ src/protocol.py | 97 +++++++++++++++++ src/server.py | 193 ++++++++++++++++++++++++++++++++++ 11 files changed, 1076 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 requirements.txt create mode 100644 run.py create mode 100644 src/__init__.py create mode 100644 src/config.py create mode 100644 src/dg430.py create mode 100644 src/handlers.py create mode 100644 src/models.py create mode 100644 src/protocol.py create mode 100644 src/server.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..672cb33 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +.env +*.log diff --git a/README.md b/README.md new file mode 100644 index 0000000..6104711 --- /dev/null +++ b/README.md @@ -0,0 +1,75 @@ +# EDC 服务 (Edge Data Center) + +测试工装边缘数据中心 — Python 实现,基于 uvloop 的高性能异步网络服务。 + +## 系统架构 + +``` + TCP/UDP ┌─────────────────────────────────────┐ +◄─────────────► │ EDC 服务 │ + │ │ + UDP :5500 ───► │ 设备发现 / 心跳 / 信息查询 │ + UDP :5505 ───► │ 消息监听 │ + TCP :5550 ───► │ 时间同步 / 数据上报 / 串口透传 │ + │ │ + │ 后台解析服务 ──► DG430 协议 → MySQL │ + └─────────────────────────────────────┘ +``` + +## 快速开始 + +```bash +# 安装依赖 +pip install -r requirements.txt + +# 设置环境变量 +export EDC_MYSQL_HOST=127.0.0.1 +export EDC_MYSQL_USER=root +export EDC_MYSQL_PASSWORD=your_password +export EDC_MYSQL_DB=edc + +# 确保数据库已创建 +mysql -u root -e "CREATE DATABASE IF NOT EXISTS edc CHARACTER SET utf8mb4" + +# 启动 +python run.py +``` + +## 配置 + +所有配置通过环境变量,见 `src/config.py`: + +| 变量 | 默认值 | 说明 | +|------|--------|------| +| `EDC_UDP_PORT` | 5500 | UDP 设备发现端口 | +| `EDC_UDP_MSG_PORT` | 5505 | UDP 消息监听端口 | +| `EDC_TCP_PORT` | 5550 | TCP 数据上报端口 | +| `EDC_BIND_HOST` | 0.0.0.0 | 绑定地址 | +| `EDC_MYSQL_HOST` | 127.0.0.1 | MySQL 地址 | +| `EDC_MYSQL_PORT` | 3306 | MySQL 端口 | +| `EDC_MYSQL_USER` | root | 数据库用户 | +| `EDC_MYSQL_PASSWORD` | — | 数据库密码 | +| `EDC_MYSQL_DB` | edc | 数据库名 | +| `EDC_PARSE_POLL_INTERVAL` | 0.5 | 解析轮询间隔(秒) | +| `EDC_LOG_LEVEL` | INFO | 日志级别 | + +## 协议参考 + +- [DG430 串口协议](../vd_test_fixture/docs/DG430串口协议.md) +- [PGLC 网络接口协议](../vd_test_fixture/docs/PGLC网络接口协议.md) +- [EDC 服务设计](../vd_test_fixture/docs/EDC服务.md) + +## 目录结构 + +``` +edc_server/ +├── run.py # 入口 +├── requirements.txt # uvloop, aiomysql +└── src/ + ├── config.py # 环境变量配置 + ├── models.py # MySQL 连接池 + 表结构 + CRUD + ├── protocol.py # PGLC JSON 协议解析 + ├── dg430.py # DG430 二进制协议解析 + ├── handlers.py # 业务处理 + 后台解析服务 + └── server.py # UDP/TCP 异步服务主程序 +``` diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..226aad4 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +uvloop>=0.19.0 +aiomysql>=0.2.0 diff --git a/run.py b/run.py new file mode 100644 index 0000000..9d08ffe --- /dev/null +++ b/run.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +"""EDC 服务入口""" + +from src.server import run + +if __name__ == "__main__": + run() diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..e6ba72d --- /dev/null +++ b/src/config.py @@ -0,0 +1,29 @@ +"""EDC 服务器配置""" + +import os + +# 网络端口 +UDP_PORT = int(os.getenv("EDC_UDP_PORT", "5500")) +UDP_MSG_PORT = int(os.getenv("EDC_UDP_MSG_PORT", "5505")) +TCP_PORT = int(os.getenv("EDC_TCP_PORT", "5550")) +BIND_HOST = os.getenv("EDC_BIND_HOST", "0.0.0.0") + +# MySQL +MYSQL_HOST = os.getenv("EDC_MYSQL_HOST", "127.0.0.1") +MYSQL_PORT = int(os.getenv("EDC_MYSQL_PORT", "3306")) +MYSQL_USER = os.getenv("EDC_MYSQL_USER", "root") +MYSQL_PASSWORD = os.getenv("EDC_MYSQL_PASSWORD", "") +MYSQL_DB = os.getenv("EDC_MYSQL_DB", "edc") + +# 连接池 +MYSQL_POOL_MIN = int(os.getenv("EDC_MYSQL_POOL_MIN", "2")) +MYSQL_POOL_MAX = int(os.getenv("EDC_MYSQL_POOL_MAX", "10")) + +# 设备超时 (秒): 超过此时间未收到心跳则标记离线 +DEVICE_TIMEOUT = int(os.getenv("EDC_DEVICE_TIMEOUT", "120")) + +# 业务解析轮询间隔 (秒) +PARSE_POLL_INTERVAL = float(os.getenv("EDC_PARSE_POLL_INTERVAL", "0.5")) + +# 日志 +LOG_LEVEL = os.getenv("EDC_LOG_LEVEL", "INFO") diff --git a/src/dg430.py b/src/dg430.py new file mode 100644 index 0000000..401993a --- /dev/null +++ b/src/dg430.py @@ -0,0 +1,181 @@ +"""DG430 串口协议二进制解析 + +解析测试工装上报的 B2 状态包(dat_type=8 对应的 raw_data 为 DG430 上报数据)。 +""" + +import logging +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + +# ─── 协议常量 ─────────────────────────────────────────────────────── + +STX = 0x7F +PKT_MIN_LEN = 6 # STX + ADDR + LEN + CMD + XOR + SUM + + +@dataclass +class DG430Status: + """解析后的 DG430 状态数据""" + addr: int # 地址 + dev_model: int # 1 PD132, 2 DLD110 + test_mode: int # 0 灵敏度, 1 模拟过车 + is_finished: bool # 是否正常完成 + finish_code: int # 0 正常, 1 未完成, 2 地感死机 + fault: int # bitmask + relay_out: int # bitmask + ppvalue: float # 峰峰值 V + idle_freq: float # 开始工作频率 Hz + enter_freq: float # 进入工作频率 Hz + exit_freq: float # 离开工作频率 Hz + enter_dist: int # 进入高度 mm + exit_dist: int # 离开高度 mm + enter_speed: int # 进入速度 dm/s + exit_speed: int # 离开速度 dm/s + + +# ─── 工具函数 ─────────────────────────────────────────────────────── + +def _xor_checksum(data: bytes, start: int, end: int) -> int: + """计算异或校验,区间 [start, end)""" + result = 0 + for i in range(start, end): + result ^= data[i] + return result & 0xFF + + +def _sum_checksum(data: bytes, start: int, end: int) -> int: + """计算和校验""" + result = 0 + for i in range(start, end): + result += data[i] + return result & 0xFF + + +def _le16(data: bytes, offset: int) -> int: + """小端 2 字节 → int""" + return data[offset] | (data[offset + 1] << 8) + + +def verify_packet(data: bytes) -> bool: + """校验数据包完整性""" + if len(data) < PKT_MIN_LEN: + return False + if data[0] != STX: + return False + + length = data[2] + expected_len = 1 + 1 + 1 + length + 1 + 1 # STX + ADDR + LEN + DATA + XOR + SUM + if len(data) != expected_len: + return False + + payload_end = 4 + length # STX(1) + ADDR(1) + LEN(1) + CMD(1) + DATA(LEN-1) + actual_xor = _xor_checksum(data, 1, payload_end) + actual_sum = _sum_checksum(data, 1, payload_end) + + if data[payload_end] != actual_xor: + return False + if data[payload_end + 1] != actual_sum: + return False + + return True + + +def hex_str_to_bytes(hex_str: str) -> bytes: + """将十六进制字符串转为 bytes,如 '7F8118B2...' → bytes""" + return bytes.fromhex(hex_str) + + +# ─── 解析指令 ─────────────────────────────────────────────────────── + +def parse_b2_status(data: bytes) -> DG430Status | None: + """解析 0xB2 状态上报包 + + 格式: STX | ADDR | LEN(0x18=24) | 0xB2 | 20字节状态内容 | XOR | SUM + """ + if not verify_packet(data): + logger.warning("DG430 数据包校验失败") + return None + + addr = data[1] & 0x7F + cmd = data[3] + if cmd != 0xB2: + logger.debug(f"非 B2 指令: 0x{cmd:02X}") + return None + + payload = data[4:24] # 20 字节状态内容 + + # 设备型号 + dev_model = payload[0] # 1=PD132, 2=DLD110 + test_mode = payload[1] # 0=灵敏度, 1=模拟过车 + finish_code = payload[2] # 0=正常, 1=未完成, 2=死机 + fault = payload[3] # bitmask + relay_out = payload[4] # bitmask + + # 峰峰值: 小端 2 字节,公式: (X * 3.3 / 4095) * 4 + pp_raw = _le16(payload, 5) + ppvalue = (pp_raw * 3.3 / 4095) * 4 + + # 频率: 小端 2 字节,公式: 10 * X + idle_freq = _le16(payload, 7) * 10.0 + enter_freq = _le16(payload, 9) * 10.0 + exit_freq = _le16(payload, 11) * 10.0 + + # 高度 mm + enter_dist = _le16(payload, 13) + exit_dist = _le16(payload, 15) + + # 速度 dm/s + enter_speed = _le16(payload, 17) + exit_speed = _le16(payload, 19) + + return DG430Status( + addr=addr, + dev_model=dev_model, + test_mode=test_mode, + is_finished=(finish_code == 0), + finish_code=finish_code, + fault=fault, + relay_out=relay_out, + ppvalue=round(ppvalue, 4), + idle_freq=round(idle_freq, 1), + enter_freq=round(enter_freq, 1), + exit_freq=round(exit_freq, 1), + enter_dist=enter_dist, + exit_dist=exit_dist, + enter_speed=enter_speed, + exit_speed=exit_speed, + ) + + +# ─── 故障解码 ─────────────────────────────────────────────────────── + +FAULT_BITS = { + 0: "工作频率不是最低频", + 1: "灵敏度不是最低灵敏度", + 2: "灵敏度提升拨码不在OFF", + 3: "脉冲输出不是离开脉冲", +} + +RELAY_BITS = { + 0: "存在继电器信号有输出", + 1: "脉冲继电器信号有输出", +} + + +def decode_fault_info(fault: int) -> str: + """解码故障 bitmask 为可读字符串""" + items = [] + for bit, desc in FAULT_BITS.items(): + if fault & (1 << bit): + items.append(desc) + return "; ".join(items) if items else "无故障" + + +def decode_relay_info(relay: int) -> str: + """解码继电器 bitmask""" + items = [] + for bit, desc in RELAY_BITS.items(): + if relay & (1 << bit): + items.append(desc) + return "; ".join(items) if items else "无输出" diff --git a/src/handlers.py b/src/handlers.py new file mode 100644 index 0000000..19b17b5 --- /dev/null +++ b/src/handlers.py @@ -0,0 +1,219 @@ +"""业务逻辑处理 — 设备发现、数据采集、解析调度""" + +import asyncio +import logging +import time +from datetime import datetime + +from src.models import ( + upsert_dnt, + ensure_collect_table, + insert_collect_data, + fetch_unparsed, + mark_parsed, + insert_test_result, + get_dnt_by_serial, +) +from src.dg430 import parse_b2_status, hex_str_to_bytes, decode_fault_info, decode_relay_info +from src.protocol import ( + make_timestamp_response, + make_heartbeat_response, + make_count_off_success, + make_error_response, +) + +logger = logging.getLogger(__name__) + +# 已注册设备: {device_id: dnt_id} +_registry: dict[str, int] = {} + +# 设备心跳时间: {device_id: last_heartbeat} +_heartbeat: dict[str, float] = {} + + +async def handle_count_off(data: dict, addr: tuple) -> str | None: + """处理设备发现广播 + + 设备上电后主动上报设备信息。EDC 据此注册/更新 dnt_info。 + """ + params = data.get("Params", {}) + device_id = params.get("Device_code") + if not device_id: + logger.warning("Count_Off 缺少 Device_code") + return None + + device_data = data.get("Data", {}) + dev_ip = device_data.get("Ip", addr[0]) + dev_port = device_data.get("Port", 0) or addr[1] + dev_mac = device_data.get("Mac", "") + dev_subnet = device_data.get("SubnetMask", "") + dev_gateway = device_data.get("Gateway", "") + dev_msgport = device_data.get("PortMsg", 0) + dev_version = device_data.get("Version", "") + dev_type = device_data.get("Device_Type", "30") # 默认测试工装 + + # 设备唯一标识用 Device_id(在 Data 内),如果没有则用 Device_code + serial = device_data.get("Device_id") or device_id + + dnt_id = await upsert_dnt( + serial=serial, + ip=dev_ip, + port=dev_port, + mac=dev_mac, + subnet=dev_subnet, + gateway=dev_gateway, + msgport=dev_msgport, + version=dev_version, + dtype=dev_type, + ) + _registry[serial] = dnt_id + _heartbeat[serial] = time.time() + + # 确保采集表存在 + await ensure_collect_table(serial) + + logger.info(f"设备注册: {serial} (dnt_id={dnt_id}, ip={dev_ip})") + return make_count_off_success() + + +async def handle_heartbeat(data: dict) -> str | None: + """处理心跳包""" + params = data.get("Params", {}) + device_id = params.get("Device_id", "") + if not device_id: + return None + + _heartbeat[device_id] = time.time() + + # 心跳也插入采集表 (dat_type=0) + try: + await insert_collect_data(device_id, 0, str(data)) + except Exception: + pass + + return make_heartbeat_response(device_id, int(time.time())) + + +async def handle_timestamp(data: dict) -> str: + """处理时间同步请求""" + params = data.get("Params", {}) + device_id = params.get("Device_id", "") + return make_timestamp_response(device_id, int(time.time())) + + +async def handle_tsreport(data: dict) -> str | None: + """处理设备主动上报子设备传感数据 + + TSReport 中的 Sub_Dat 是 DG430 的二进制上报数据, + 存入采集表 (dat_type=8),由解析服务异步处理。 + """ + device_id = data.get("Device_id", "") + sensor_dat = data.get("Sensor_Dat", {}) + sub_dat = sensor_dat.get("Sub_Dat", "") + + if not device_id or not sub_dat: + return None + + try: + await insert_collect_data(device_id, 8, sub_dat) + except Exception as e: + logger.error(f"存储 TSReport 失败: {e}") + + return None # 协议规定不需要回复(看文档似乎有回复,保守起见返回简单确认) + + +async def handle_serial_net(data: dict) -> str | None: + """处理串口透传返回数据 + + SerialNet 返回中 SerialDat 是串口设备的应答数据, + 存入采集表 (dat_type=9)。 + """ + params = data.get("Params") or data + device_id = params.get("Device_id", "") + serial_dat = params.get("SerialDat", "") + + if not device_id or not serial_dat: + return None + + try: + await insert_collect_data(device_id, 9, serial_dat) + except Exception as e: + logger.error(f"存储 SerialNet 失败: {e}") + + return None + + +async def handle_device_info(data: dict) -> str | None: + """处理设备信息查询""" + # 当前不做特殊处理,仅记录日志 + device_id = data.get("Device", "") + logger.debug(f"Device_Info 请求: {device_id}") + return None + + +# ─── 业务解析服务(后台轮询)───────────────────────────────────────── + +async def parse_loop(): + """后台轮询:解析未处理的 DG430 上报数据""" + logger.info("业务解析服务启动") + + while True: + try: + for device_id, dnt_id in list(_registry.items()): + records = await fetch_unparsed(device_id) + for rec in records: + try: + raw = rec["raw_data"] + # raw_data 是十六进制字符串 + pkt = hex_str_to_bytes(raw) + status = parse_b2_status(pkt) + if status is None: + continue + + # 解码故障和继电器信息 + fault_info = decode_fault_info(status.fault) + relay_info = decode_relay_info(status.relay_out) + + # 设备型号 + dev_model_map = {1: "PD132", 2: "DLD110"} + str_type = dev_model_map.get(status.dev_model, f"Unknown({status.dev_model})") + + await insert_test_result( + dnt_id=dnt_id, + dpg430_addr=status.addr, + pcnum=datetime.now().strftime("%Y%m"), + serialnum=0, + sub_type=status.dev_model, + str_type=str_type, + iffinish="1" if status.is_finished else "0", + fault_info=fault_info, + relay_out=relay_info, + ppvalue=status.ppvalue, + idle_freq=status.idle_freq, + enter_freq=status.enter_freq, + exit_freq=status.exit_freq, + enter_dist=status.enter_dist, + exit_dist=status.exit_dist, + enter_speed=status.enter_speed, + exit_speed=status.exit_speed, + ) + + await mark_parsed(device_id, rec["id"]) + logger.info( + f"解析完成: {device_id} dg430={status.addr} " + f"型号={str_type} 峰峰值={status.ppvalue:.2f}V " + f"进入高度={status.enter_dist}mm 故障={fault_info}" + ) + + except Exception as e: + logger.error(f"解析记录 {rec['id']} 失败: {e}") + # 仍然标记为已处理,避免死循环 + try: + await mark_parsed(device_id, rec["id"]) + except Exception: + pass + + except Exception as e: + logger.error(f"解析循环异常: {e}") + + await asyncio.sleep(0.5) diff --git a/src/models.py b/src/models.py new file mode 100644 index 0000000..202eccd --- /dev/null +++ b/src/models.py @@ -0,0 +1,268 @@ +"""数据库模型 — 连接池管理与表结构初始化""" + +import logging +import aiomysql +from src.config import ( + MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWORD, MYSQL_DB, + MYSQL_POOL_MIN, MYSQL_POOL_MAX, +) + +logger = logging.getLogger(__name__) + +_pool: aiomysql.Pool | None = None + + +async def init_pool() -> aiomysql.Pool: + """初始化 MySQL 连接池并建表""" + global _pool + _pool = await aiomysql.create_pool( + host=MYSQL_HOST, + port=MYSQL_PORT, + user=MYSQL_USER, + password=MYSQL_PASSWORD, + db=MYSQL_DB, + minsize=MYSQL_POOL_MIN, + maxsize=MYSQL_POOL_MAX, + autocommit=True, + ) + await _create_tables(_pool) + logger.info("MySQL 连接池已初始化") + return _pool + + +async def get_pool() -> aiomysql.Pool: + """获取连接池""" + assert _pool is not None, "数据库连接池未初始化" + return _pool + + +async def close_pool(): + """关闭连接池""" + global _pool + if _pool: + _pool.close() + await _pool.wait_closed() + _pool = None + + +# ─── DDL ─────────────────────────────────────────────────────────── + +async def _create_tables(pool: aiomysql.Pool): + async with pool.acquire() as conn: + async with conn.cursor() as cur: + # 1. 联网终端信息表 + await cur.execute(""" + CREATE TABLE IF NOT EXISTS `dnt_info` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `serial` VARCHAR(45) UNIQUE NOT NULL COMMENT '设备唯一编码 Device_id', + `name` VARCHAR(45) DEFAULT '', + `ip` VARCHAR(45) DEFAULT '', + `port` INT DEFAULT 0, + `mac` VARCHAR(45) DEFAULT '', + `subnet` VARCHAR(45) DEFAULT '', + `gateway` VARCHAR(45) DEFAULT '', + `msgport` INT DEFAULT 0, + `version` VARCHAR(45) DEFAULT '', + `dtype` VARCHAR(5) DEFAULT '30' COMMENT '设备类型', + `poll_duration` INT DEFAULT 0, + `reset_duration` INT DEFAULT 0, + `state` TINYINT DEFAULT 0 COMMENT '0 offline, 1 online', + `last_login` DATETIME NULL, + `last_off` DATETIME NULL, + `online_total` INT DEFAULT 0 + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + + # 2. 车检器测试参数信息表 + await cur.execute(""" + CREATE TABLE IF NOT EXISTS `tb_loop_test_info` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `name` VARCHAR(45) DEFAULT '', + `dev_model` VARCHAR(24) DEFAULT '', + `model_code` TINYINT DEFAULT 0, + `hard_ver` VARCHAR(10) DEFAULT '', + `soft_ver` VARCHAR(10) DEFAULT '', + `relay_exist` TINYINT DEFAULT 0, + `relay_pluse` TINYINT DEFAULT 0, + `sens_min` INT DEFAULT 0, + `sens_max` INT DEFAULT 0, + `freq_min` INT DEFAULT 0, + `freq_max` INT DEFAULT 0, + `peak_min` INT DEFAULT 0, + `peak_max` INT DEFAULT 0, + `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, + `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + + # 3. 设备测试状态表 + await cur.execute(""" + CREATE TABLE IF NOT EXISTS `tb_state_tst` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `dnt_id` INT NOT NULL COMMENT 'FK → dnt_info.id', + `dpg430_addr` TINYINT DEFAULT 0, + `pcnum` VARCHAR(10) DEFAULT '' COMMENT '批次号', + `serialnum` INT DEFAULT 0 COMMENT '流水号', + `sub_type` TINYINT DEFAULT 0 COMMENT '1 DLD110, 2 PD132', + `str_type` VARCHAR(30) DEFAULT '', + `iffinish` VARCHAR(5) DEFAULT '' COMMENT '是否完成', + `fault_info` VARCHAR(100) DEFAULT '', + `relay_out` VARCHAR(24) DEFAULT '', + `ppvalue` FLOAT DEFAULT 0 COMMENT '峰峰值', + `idle_freq` FLOAT DEFAULT 0 COMMENT '开始工作频率', + `enter_freq` FLOAT DEFAULT 0, + `exit_freq` FLOAT DEFAULT 0, + `enter_dist` INT DEFAULT 0, + `exit_dist` INT DEFAULT 0, + `enter_speed` INT DEFAULT 0, + `exit_speed` INT DEFAULT 0, + `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, + INDEX `idx_dnt_id` (`dnt_id`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + + # 4. 采集表模板(不直接创建表,设备注册时按此结构动态建表) + logger.info("数据库表初始化完成") + + +# ─── 设备采集表 CRUD ──────────────────────────────────────────────── + +COLLECT_TABLE_PREFIX = "tb_collect_" + + +def collect_table_name(device_id: str) -> str: + """根据 Device_id 生成采集表名""" + return f"{COLLECT_TABLE_PREFIX}{device_id}" + + +async def ensure_collect_table(device_id: str): + """确保设备采集表存在""" + table = collect_table_name(device_id) + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute(f""" + CREATE TABLE IF NOT EXISTS `{table}` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `dat_type` TINYINT DEFAULT 0 COMMENT '0心跳 1流量 2探头 3其他 4时间戳 7 RS485 8串口上报 9配置返回 11异常', + `raw_data` VARCHAR(380) DEFAULT '', + `state` TINYINT DEFAULT 0 COMMENT '0 未处理, 1 已处理', + `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP, + `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """) + logger.info(f"设备采集表 {table} 已就绪") + + +async def insert_collect_data(device_id: str, dat_type: int, raw_data: str): + """向设备采集表插入原始数据""" + table = collect_table_name(device_id) + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + f"INSERT INTO `{table}` (dat_type, raw_data) VALUES (%s, %s)", + (dat_type, raw_data), + ) + + +async def fetch_unparsed(device_id: str) -> list[dict]: + """获取未处理的记录 (state=0, dat_type=8)""" + table = collect_table_name(device_id) + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute( + f"SELECT id, raw_data FROM `{table}` WHERE state = 0 AND dat_type = 8 LIMIT 100" + ) + return await cur.fetchall() + + +async def mark_parsed(device_id: str, record_id: int): + """标记记录为已处理""" + table = collect_table_name(device_id) + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + f"UPDATE `{table}` SET state = 1 WHERE id = %s", (record_id,) + ) + + +# ─── dnt_info CRUD ───────────────────────────────────────────────── + +async def get_dnt_by_serial(serial: str) -> dict | None: + """根据 Device_id (serial) 查询终端""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor(aiomysql.DictCursor) as cur: + await cur.execute("SELECT * FROM dnt_info WHERE serial = %s", (serial,)) + return await cur.fetchone() + + +async def upsert_dnt(serial: str, ip: str, port: int, mac: str, + subnet: str, gateway: str, msgport: int, + version: str, dtype: str = "30") -> int: + """插入或更新终端信息,返回 dnt_info.id""" + existing = await get_dnt_by_serial(serial) + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + if existing: + # 已有记录:更新 IP / 网关 / 上线时间 + if (existing["ip"] != ip or existing["gateway"] != gateway + or existing["port"] != port or existing["subnet"] != subnet): + await cur.execute( + """UPDATE dnt_info SET ip=%s, port=%s, subnet=%s, gateway=%s, + mac=%s, msgport=%s, version=%s, last_login=NOW(), state=1 + WHERE serial=%s""", + (ip, port, subnet, gateway, mac, msgport, version, serial), + ) + else: + await cur.execute( + "UPDATE dnt_info SET last_login=NOW(), state=1 WHERE serial=%s", + (serial,), + ) + return existing["id"] + else: + await cur.execute( + """INSERT INTO dnt_info + (serial, ip, port, mac, subnet, gateway, msgport, version, dtype, + last_login, state) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s, NOW(), 1)""", + (serial, ip, port, mac, subnet, gateway, msgport, version, dtype), + ) + return cur.lastrowid + + +async def insert_test_result(dnt_id: int, dpg430_addr: int, pcnum: str, + serialnum: int, sub_type: int, str_type: str, + iffinish: str, fault_info: str, relay_out: str, + ppvalue: float, idle_freq: float, enter_freq: float, + exit_freq: float, enter_dist: int, exit_dist: int, + enter_speed: int, exit_speed: int): + """插入测试结果到 tb_state_tst""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + """INSERT INTO tb_state_tst + (dnt_id, dpg430_addr, pcnum, serialnum, sub_type, str_type, + iffinish, fault_info, relay_out, ppvalue, idle_freq, + enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""", + (dnt_id, dpg430_addr, pcnum, serialnum, sub_type, str_type, + iffinish, fault_info, relay_out, ppvalue, idle_freq, + enter_freq, exit_freq, enter_dist, exit_dist, enter_speed, exit_speed), + ) + + +async def set_device_offline(serial: str): + """标记设备离线""" + pool = await get_pool() + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute( + "UPDATE dnt_info SET state=0, last_off=NOW() WHERE serial=%s", + (serial,), + ) diff --git a/src/protocol.py b/src/protocol.py new file mode 100644 index 0000000..cc0cc36 --- /dev/null +++ b/src/protocol.py @@ -0,0 +1,97 @@ +"""PGLC JSON 协议解析""" + +import json +import logging + +logger = logging.getLogger(__name__) + + +def parse_message(data: bytes) -> dict | None: + """将收到的 JSON 字节解析为 dict,失败返回 None""" + try: + return json.loads(data.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + logger.warning(f"JSON 解析失败: {e}") + return None + + +def is_count_off(msg: dict) -> bool: + return msg.get("Method") == "Count_Off" + + +def is_device_info(msg: dict) -> bool: + return msg.get("Method") == "Device_Info" + + +def is_heartbeat(msg: dict) -> bool: + return msg.get("Method") == "Heartbeat" + + +def is_timestamp(msg: dict) -> bool: + return msg.get("Method") == "TimeStamp" + + +def is_serial_net(msg: dict) -> bool: + return msg.get("Method") == "SerialNet" + + +def is_tsreport(msg: dict) -> bool: + return msg.get("Method") == "TSReport" + + +def is_dev_reset(msg: dict) -> bool: + return msg.get("Method") == "Dev_Reset" + + +# ─── 请求构造 ────────────────────────────────────────────────────── + +def make_response(method: str, code: int, device_id: str, + message: str = "success", data: dict | None = None) -> str: + resp = { + "Method": method, + "Code": code, + "Message": message, + "Data": data or {"Device_id": device_id}, + } + return json.dumps(resp, ensure_ascii=False) + + +def make_timestamp_response(device_id: str, unix_time: int) -> str: + return json.dumps({ + "Method": "TimeStamp", + "Code": 0, + "Message": "success", + "Data": { + "Device_id": device_id, + "Time_Counter": unix_time, + }, + }, ensure_ascii=False) + + +def make_heartbeat_response(device_id: str, ssc_time: int) -> str: + return json.dumps({ + "Method": "Heartbeat", + "Code": 0, + "Message": "success", + "Data": { + "Device_id": device_id, + "SSC_Time": ssc_time, + }, + }, ensure_ascii=False) + + +def make_count_off_success() -> str: + return json.dumps({ + "Method": "Count_Off", + "Code": 0, + "Message": "success", + }, ensure_ascii=False) + + +def make_error_response(method: str, device_id: str, message: str) -> str: + return json.dumps({ + "Method": method, + "Code": -1, + "Message": message, + "Data": {"Device_id": device_id}, + }, ensure_ascii=False) diff --git a/src/server.py b/src/server.py new file mode 100644 index 0000000..724a8a2 --- /dev/null +++ b/src/server.py @@ -0,0 +1,193 @@ +"""EDC 服务器主入口 — UDP + TCP 异步服务 + +UDP 端口 5500: 发现设备 (Count_Off)、心跳 (Heartbeat)、信息查询 +UDP 端口 5505: 消息监听 +TCP 端口 5550: 时间同步 (TimeStamp)、设备数据上报 (TSReport, SerialNet) +""" + +import asyncio +import logging +import signal +import sys + +try: + import uvloop # type: ignore +except ImportError: + uvloop = None + +from src.config import ( + UDP_PORT, UDP_MSG_PORT, TCP_PORT, BIND_HOST, LOG_LEVEL, +) +from src.models import init_pool, close_pool +from src.protocol import parse_message, make_error_response +from src.handlers import ( + handle_count_off, + handle_heartbeat, + handle_timestamp, + handle_tsreport, + handle_serial_net, + handle_device_info, + parse_loop, +) + +logging.basicConfig( + level=getattr(logging, LOG_LEVEL), + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger("edc") + + +class EDCProtocol: + """asyncio UDP 协议处理器""" + + def __init__(self): + self.transport = None + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + asyncio.ensure_future(self._handle(data, addr)) + + async def _handle(self, data: bytes, addr: tuple): + msg = parse_message(data) + if msg is None: + return + + method = msg.get("Method", "") + logger.debug(f"UDP {method} from {addr}") + + try: + response = None + if method == "Count_Off": + response = await handle_count_off(msg, addr) + elif method == "Heartbeat": + response = await handle_heartbeat(msg) + elif method == "Device_Info": + response = await handle_device_info(msg) + elif method == "TSReport": + await handle_tsreport(msg) + elif method == "SerialNet": + await handle_serial_net(msg) + + if response and self.transport: + self.transport.sendto(response.encode("utf-8"), addr) + + except Exception as e: + logger.error(f"处理 {method} 异常: {e}", exc_info=True) + + +async def handle_tcp_client(reader: asyncio.StreamReader, + writer: asyncio.StreamWriter): + """TCP 客户端连接处理""" + addr = writer.get_extra_info("peername") + logger.info(f"TCP 连接: {addr}") + + try: + while True: + line = await reader.readline() + if not line: + break + + msg = parse_message(line) + if msg is None: + continue + + method = msg.get("Method", "") + logger.debug(f"TCP {method} from {addr}") + + try: + response = None + if method == "TimeStamp": + response = await handle_timestamp(msg) + elif method == "TSReport": + await handle_tsreport(msg) + elif method == "SerialNet": + await handle_serial_net(msg) + elif method == "Heartbeat": + response = await handle_heartbeat(msg) + elif method == "Device_Info": + response = await handle_device_info(msg) + + if response: + writer.write((response + "\n").encode("utf-8")) + await writer.drain() + + except Exception as e: + logger.error(f"TCP 处理 {method} 异常: {e}") + device_id = msg.get("Device_id", "") + err = make_error_response(method, device_id, str(e)) + writer.write((err + "\n").encode("utf-8")) + await writer.drain() + + except (ConnectionResetError, BrokenPipeError): + pass + finally: + logger.info(f"TCP 断开: {addr}") + writer.close() + await writer.wait_closed() + + +async def main(): + logger.info("EDC 服务启动中...") + + # 初始化数据库 + await init_pool() + + # 启动业务解析后台任务 + asyncio.create_task(parse_loop()) + + # 启动 UDP 服务 (端口 5500) + loop = asyncio.get_running_loop() + udp_transport, _ = await loop.create_datagram_endpoint( + lambda: EDCProtocol(), # type: ignore[arg-type] + local_addr=(BIND_HOST, UDP_PORT), + ) + logger.info(f"UDP 服务监听 {BIND_HOST}:{UDP_PORT}") + + # 启动 UDP 消息端口 (5505) + udp_msg_transport, _ = await loop.create_datagram_endpoint( + lambda: EDCProtocol(), # type: ignore[arg-type] + local_addr=(BIND_HOST, UDP_MSG_PORT), + ) + logger.info(f"UDP 消息监听 {BIND_HOST}:{UDP_MSG_PORT}") + + # 启动 TCP 服务 (端口 5550) + tcp_server = await asyncio.start_server( + handle_tcp_client, + BIND_HOST, + TCP_PORT, + ) + logger.info(f"TCP 服务监听 {BIND_HOST}:{TCP_PORT}") + + # 优雅退出 + stop_event = asyncio.Event() + + def _shutdown(): + logger.info("收到关闭信号,正在退出...") + stop_event.set() + + loop.add_signal_handler(signal.SIGINT, _shutdown) + loop.add_signal_handler(signal.SIGTERM, _shutdown) + + await stop_event.wait() + + # 清理 + tcp_server.close() + await tcp_server.wait_closed() + udp_transport.close() + udp_msg_transport.close() + await close_pool() + logger.info("EDC 服务已停止") + + +def run(): + """入口函数""" + if uvloop: + uvloop.install() + logger.info("uvloop 已启用") + asyncio.run(main()) + + +if __name__ == "__main__": + run()