fix: 修正设备接入流程
- Count_Off 改为读取 Data.Device_id (设备返回格式), 不再回复设备 - TimeStamp 改为同步函数 (无异步 IO) - TCP 改用 JSON 流解析 (raw_decode), 支持紧凑 JSON 和 NDJSON - TSReport/SerialNet 返回确认消息
This commit is contained in:
133
src/server.py
133
src/server.py
@@ -1,14 +1,19 @@
|
||||
"""EDC 服务器主入口 — UDP + TCP 异步服务
|
||||
|
||||
UDP 端口 5500: 发现设备 (Count_Off)、心跳 (Heartbeat)、信息查询
|
||||
UDP 端口 5505: 消息监听
|
||||
TCP 端口 5550: 时间同步 (TimeStamp)、设备数据上报 (TSReport, SerialNet)
|
||||
UDP :5500 设备登录 (Count_Off)、心跳 (Heartbeat)、传感数据上报 (TSReport)
|
||||
UDP :5505 消息监听 (同上)
|
||||
TCP :5550 时间同步 (TimeStamp)、数据上报 (TSReport, SerialNet)
|
||||
|
||||
设备接入流程:
|
||||
1. 设备 TCP 连接 → 发送 TimeStamp 请求 → EDC 返回时间
|
||||
2. 设备 UDP 上报 Count_Off (设备详细信息) → EDC 注册设备,不回复
|
||||
3. 设备持续通过 TCP/UDP 上报数据
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
|
||||
try:
|
||||
import uvloop # type: ignore
|
||||
@@ -26,7 +31,6 @@ from src.handlers import (
|
||||
handle_timestamp,
|
||||
handle_tsreport,
|
||||
handle_serial_net,
|
||||
handle_device_info,
|
||||
parse_loop,
|
||||
)
|
||||
|
||||
@@ -60,67 +64,98 @@ class EDCProtocol:
|
||||
try:
|
||||
response = None
|
||||
if method == "Count_Off":
|
||||
response = await handle_count_off(msg, addr)
|
||||
# 设备登录上报,只处理不回复
|
||||
await handle_count_off(msg, addr)
|
||||
elif method == "Heartbeat":
|
||||
response = await handle_heartbeat(msg)
|
||||
elif method == "Device_Info":
|
||||
response = await handle_device_info(msg)
|
||||
elif method == "TSReport":
|
||||
await handle_tsreport(msg)
|
||||
response = await handle_tsreport(msg)
|
||||
elif method == "SerialNet":
|
||||
await handle_serial_net(msg)
|
||||
response = await handle_serial_net(msg)
|
||||
|
||||
if response and self.transport:
|
||||
self.transport.sendto(response.encode("utf-8"), addr)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理 {method} 异常: {e}", exc_info=True)
|
||||
logger.error(f"UDP 处理 {method} 异常: {e}", exc_info=True)
|
||||
|
||||
|
||||
async def handle_tcp_client(reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter):
|
||||
"""TCP 客户端连接处理"""
|
||||
"""TCP 客户端连接处理
|
||||
|
||||
使用缓冲 + JSON 解析方式处理 TCP 流,支持:
|
||||
- 换行分隔的 JSON (NDJSON)
|
||||
- 紧凑 JSON (无换行)
|
||||
"""
|
||||
addr = writer.get_extra_info("peername")
|
||||
logger.info(f"TCP 连接: {addr}")
|
||||
|
||||
buffer = b""
|
||||
|
||||
async def process_message(msg: dict):
|
||||
"""处理单条消息并返回响应文本"""
|
||||
method = msg.get("Method", "")
|
||||
logger.debug(f"TCP {method} from {addr}")
|
||||
|
||||
try:
|
||||
if method == "TimeStamp":
|
||||
return handle_timestamp(msg)
|
||||
elif method == "TSReport":
|
||||
return await handle_tsreport(msg)
|
||||
elif method == "SerialNet":
|
||||
return await handle_serial_net(msg)
|
||||
elif method == "Heartbeat":
|
||||
return await handle_heartbeat(msg)
|
||||
else:
|
||||
logger.debug(f"TCP 未知方法: {method}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"TCP 处理 {method} 异常: {e}")
|
||||
return make_error_response(method, msg.get("Device_id", ""), str(e))
|
||||
|
||||
try:
|
||||
while True:
|
||||
line = await reader.readline()
|
||||
if not line:
|
||||
chunk = await reader.read(4096)
|
||||
if not chunk:
|
||||
break
|
||||
|
||||
msg = parse_message(line)
|
||||
if msg is None:
|
||||
continue
|
||||
buffer += chunk
|
||||
|
||||
method = msg.get("Method", "")
|
||||
logger.debug(f"TCP {method} from {addr}")
|
||||
# 尝试从缓冲区提取完整的 JSON 消息
|
||||
while buffer:
|
||||
# 策略 1: 先尝试换行分隔
|
||||
nl_pos = buffer.find(b"\n")
|
||||
if nl_pos >= 0:
|
||||
line = buffer[:nl_pos].strip()
|
||||
buffer = buffer[nl_pos + 1:]
|
||||
if not line:
|
||||
continue
|
||||
msg = parse_message(line)
|
||||
if msg:
|
||||
resp = await process_message(msg)
|
||||
if resp:
|
||||
writer.write((resp + "\n").encode("utf-8"))
|
||||
await writer.drain()
|
||||
continue
|
||||
|
||||
try:
|
||||
response = None
|
||||
if method == "TimeStamp":
|
||||
response = await handle_timestamp(msg)
|
||||
elif method == "TSReport":
|
||||
await handle_tsreport(msg)
|
||||
elif method == "SerialNet":
|
||||
await handle_serial_net(msg)
|
||||
elif method == "Heartbeat":
|
||||
response = await handle_heartbeat(msg)
|
||||
elif method == "Device_Info":
|
||||
response = await handle_device_info(msg)
|
||||
# 策略 2: 尝试直接解析整个缓冲区
|
||||
try:
|
||||
decoded = buffer.decode("utf-8")
|
||||
msg, end = json.JSONDecoder().raw_decode(decoded)
|
||||
buffer = decoded[end:].encode("utf-8")
|
||||
resp = await process_message(msg)
|
||||
if resp:
|
||||
writer.write((resp + "\n").encode("utf-8"))
|
||||
await writer.drain()
|
||||
except json.JSONDecodeError:
|
||||
# 数据不完整,等待更多数据
|
||||
if len(buffer) > 65536:
|
||||
logger.warning(f"TCP {addr} 缓冲区过大 ({len(buffer)}B),丢弃")
|
||||
buffer = b""
|
||||
break
|
||||
|
||||
if response:
|
||||
writer.write((response + "\n").encode("utf-8"))
|
||||
await writer.drain()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"TCP 处理 {method} 异常: {e}")
|
||||
device_id = msg.get("Device_id", "")
|
||||
err = make_error_response(method, device_id, str(e))
|
||||
writer.write((err + "\n").encode("utf-8"))
|
||||
await writer.drain()
|
||||
|
||||
except (ConnectionResetError, BrokenPipeError):
|
||||
except (ConnectionResetError, BrokenPipeError, asyncio.IncompleteReadError):
|
||||
pass
|
||||
finally:
|
||||
logger.info(f"TCP 断开: {addr}")
|
||||
@@ -131,28 +166,27 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
|
||||
async def main():
|
||||
logger.info("EDC 服务启动中...")
|
||||
|
||||
# 初始化数据库
|
||||
await init_pool()
|
||||
|
||||
# 启动业务解析后台任务
|
||||
asyncio.create_task(parse_loop())
|
||||
|
||||
# 启动 UDP 服务 (端口 5500)
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
# UDP :5500
|
||||
udp_transport, _ = await loop.create_datagram_endpoint(
|
||||
lambda: EDCProtocol(), # type: ignore[arg-type]
|
||||
local_addr=(BIND_HOST, UDP_PORT),
|
||||
)
|
||||
logger.info(f"UDP 服务监听 {BIND_HOST}:{UDP_PORT}")
|
||||
|
||||
# 启动 UDP 消息端口 (5505)
|
||||
# UDP :5505
|
||||
udp_msg_transport, _ = await loop.create_datagram_endpoint(
|
||||
lambda: EDCProtocol(), # type: ignore[arg-type]
|
||||
local_addr=(BIND_HOST, UDP_MSG_PORT),
|
||||
)
|
||||
logger.info(f"UDP 消息监听 {BIND_HOST}:{UDP_MSG_PORT}")
|
||||
|
||||
# 启动 TCP 服务 (端口 5550)
|
||||
# TCP :5550
|
||||
tcp_server = await asyncio.start_server(
|
||||
handle_tcp_client,
|
||||
BIND_HOST,
|
||||
@@ -160,7 +194,6 @@ async def main():
|
||||
)
|
||||
logger.info(f"TCP 服务监听 {BIND_HOST}:{TCP_PORT}")
|
||||
|
||||
# 优雅退出
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
def _shutdown():
|
||||
@@ -172,7 +205,6 @@ async def main():
|
||||
|
||||
await stop_event.wait()
|
||||
|
||||
# 清理
|
||||
tcp_server.close()
|
||||
await tcp_server.wait_closed()
|
||||
udp_transport.close()
|
||||
@@ -182,7 +214,6 @@ async def main():
|
||||
|
||||
|
||||
def run():
|
||||
"""入口函数"""
|
||||
if uvloop:
|
||||
uvloop.install()
|
||||
logger.info("uvloop 已启用")
|
||||
|
||||
Reference in New Issue
Block a user