Files
edc_server/src/server.py
wangfq c875cf383b fix: 修复 HeartBeat 大小写不匹配导致交互未被记录的问题
根本原因: 设备发送 Method='HeartBeat'(大写B), 代码匹配'Heartbeat'(小写b),
        心跳包被静默忽略, record_interaction 未调用, 导致监控误判为离线。

修复:
- UDP/TCP 方法匹配改为 case-insensitive (method_lower)
- handle_timestamp 增加 record_interaction 调用 (TimeStamp 也是设备交互)
- TCP 连接/断开时写入 tb_device_log 事件日志 (tcp_connect/tcp_disconnect)
2026-06-10 10:01:07 +08:00

269 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""EDC 服务器主入口 — UDP + TCP 异步服务
UDP :5500 设备登录 (Count_Off)、心跳 (Heartbeat)、传感数据上报 (TSReport)
UDP :5505 消息监听 (同上)
TCP :5550 时间同步 (TimeStamp)、数据上报 (TSReport, SerialNet)
设备接入流程:
1. 设备 TCP 连接 → 发送 TimeStamp 请求 → EDC 返回时间
2. 设备 UDP 上报 Count_Off (设备详细信息) → EDC 注册设备,不回复
3. 设备持续通过 TCP/UDP 上报数据
"""
import asyncio
import json
import logging
import signal
try:
import uvloop # type: ignore
except ImportError:
uvloop = None
from src.config import (
UDP_PORT, UDP_MSG_PORT, TCP_PORT, BIND_HOST, LOG_LEVEL,
)
from src.models import init_pool, close_pool
from src.protocol import parse_message, make_error_response
from src.handlers import (
handle_count_off,
handle_heartbeat,
handle_timestamp,
handle_tsreport,
handle_serial_net,
parse_loop,
serialnet_loop,
serialnet_response_loop,
device_status_monitor,
set_udp_sender,
)
from src.models import insert_device_log
logging.basicConfig(
level=getattr(logging, LOG_LEVEL),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("edc")
# 全局 UDP transport供 serialnet_loop 发送指令
_udp_transport: asyncio.DatagramTransport | None = None
def send_udp(data: bytes, addr: tuple[str, int]):
"""通过全局 UDP transport 发送数据"""
if _udp_transport:
_udp_transport.sendto(data, addr)
class EDCProtocol:
"""asyncio UDP 协议处理器"""
def __init__(self):
self.transport = None
def connection_made(self, transport):
self.transport = transport
global _udp_transport
_udp_transport = transport
def datagram_received(self, data, addr):
asyncio.ensure_future(self._handle(data, addr))
async def _handle(self, data: bytes, addr: tuple):
msg = parse_message(data)
if msg is None:
return
logger.info(f"UDP {msg} from {addr}")
method = msg.get("Method", "")
method_lower = method.lower()
logger.debug(f"UDP {method} from {addr}")
try:
response = None
if method == "Count_Off":
# 设备登录上报,只处理不回复
await handle_count_off(msg, addr)
elif method_lower == "heartbeat":
response = await handle_heartbeat(msg)
elif method_lower == "tsreport":
response = await handle_tsreport(msg)
elif method_lower == "serialnet":
response = await handle_serial_net(msg)
if response and self.transport:
self.transport.sendto(response.encode("utf-8"), addr)
except Exception as e:
logger.error(f"UDP 处理 {method} 异常: {e}", exc_info=True)
async def handle_tcp_client(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
"""TCP 客户端连接处理
使用缓冲 + JSON 解析方式处理 TCP 流,支持:
- 换行分隔的 JSON (NDJSON)
- 紧凑 JSON (无换行)
"""
addr = writer.get_extra_info("peername")
addr_ip = addr[0] if addr else ""
logger.info(f"TCP 连接: {addr}")
# TCP 连接事件日志
try:
asyncio.ensure_future(insert_device_log(
serial="", ip=addr_ip,
event_type="tcp_connect",
content=f"TCP 连接: {addr}",
))
except Exception:
pass
buffer = b""
async def process_message(msg: dict):
"""处理单条消息并返回响应文本"""
logger.info(f"TCP get_rcv {msg} from {addr}")
method = msg.get("Method", "")
method_lower = method.lower()
logger.debug(f"TCP {method} from {addr}")
try:
if method_lower == "timestamp":
return handle_timestamp(msg)
elif method_lower == "tsreport":
return await handle_tsreport(msg)
elif method_lower == "serialnet":
return await handle_serial_net(msg)
elif method_lower == "heartbeat":
return await handle_heartbeat(msg)
else:
logger.debug(f"TCP 未知方法: {method}")
return None
except Exception as e:
logger.error(f"TCP 处理 {method} 异常: {e}")
return make_error_response(method, msg.get("Device_id", ""), str(e))
try:
while True:
chunk = await reader.read(4096)
if not chunk:
break
buffer += chunk
# 尝试从缓冲区提取完整的 JSON 消息
while buffer:
# 策略 1: 先尝试换行分隔
nl_pos = buffer.find(b"\n")
if nl_pos >= 0:
line = buffer[:nl_pos].strip()
buffer = buffer[nl_pos + 1:]
if not line:
continue
msg = parse_message(line)
if msg:
resp = await process_message(msg)
if resp:
writer.write((resp + "\n").encode("utf-8"))
await writer.drain()
continue
# 策略 2: 尝试直接解析整个缓冲区
try:
decoded = buffer.decode("utf-8")
msg, end = json.JSONDecoder().raw_decode(decoded)
buffer = decoded[end:].encode("utf-8")
resp = await process_message(msg)
if resp:
writer.write((resp + "\n").encode("utf-8"))
await writer.drain()
except json.JSONDecodeError:
# 数据不完整,等待更多数据
if len(buffer) > 65536:
logger.warning(f"TCP {addr} 缓冲区过大 ({len(buffer)}B),丢弃")
buffer = b""
break
except (ConnectionResetError, BrokenPipeError, asyncio.IncompleteReadError):
pass
finally:
logger.info(f"TCP 断开: {addr}")
# TCP 断开事件日志
try:
asyncio.ensure_future(insert_device_log(
serial="", ip=addr_ip,
event_type="tcp_disconnect",
content=f"TCP 断开: {addr}",
))
except Exception:
pass
writer.close()
await writer.wait_closed()
async def main():
logger.info("EDC 服务启动中...")
await init_pool()
asyncio.create_task(parse_loop())
asyncio.create_task(serialnet_loop())
asyncio.create_task(serialnet_response_loop())
asyncio.create_task(device_status_monitor())
loop = asyncio.get_running_loop()
# UDP :5500
udp_transport, _ = await loop.create_datagram_endpoint(
lambda: EDCProtocol(), # type: ignore[arg-type]
local_addr=(BIND_HOST, UDP_PORT),
)
set_udp_sender(send_udp) # 注入到 handlers 供 serialnet_loop 使用
logger.info(f"UDP 服务监听 {BIND_HOST}:{UDP_PORT}")
# UDP :5505
udp_msg_transport, _ = await loop.create_datagram_endpoint(
lambda: EDCProtocol(), # type: ignore[arg-type]
local_addr=(BIND_HOST, UDP_MSG_PORT),
)
logger.info(f"UDP 消息监听 {BIND_HOST}:{UDP_MSG_PORT}")
# TCP :5550
tcp_server = await asyncio.start_server(
handle_tcp_client,
BIND_HOST,
TCP_PORT,
)
logger.info(f"TCP 服务监听 {BIND_HOST}:{TCP_PORT}")
stop_event = asyncio.Event()
def _shutdown():
logger.info("收到关闭信号,正在退出...")
stop_event.set()
loop.add_signal_handler(signal.SIGINT, _shutdown)
loop.add_signal_handler(signal.SIGTERM, _shutdown)
await stop_event.wait()
tcp_server.close()
await tcp_server.wait_closed()
udp_transport.close()
udp_msg_transport.close()
await close_pool()
logger.info("EDC 服务已停止")
def run():
if uvloop:
uvloop.install()
logger.info("uvloop 已启用")
asyncio.run(main())
if __name__ == "__main__":
run()