Compare commits
6 Commits
ef890fafc6
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3580f89552 | ||
|
|
25aafd57c8 | ||
|
|
cdddfac609 | ||
|
|
944870496a | ||
|
|
c875cf383b | ||
|
|
11f1c4f55b |
16
src/dg430.py
16
src/dg430.py
@@ -280,12 +280,16 @@ def decode_fault_info(fault: int) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def decode_relay_info(relay: int) -> str:
|
def decode_relay_info(relay: int) -> str:
|
||||||
"""解码继电器 bitmask"""
|
"""解码继电器输出状态为可读字符串
|
||||||
items = []
|
|
||||||
for bit, desc in RELAY_BITS.items():
|
0xB2 继电器输出状态原始值 x 的解析规则:
|
||||||
if relay & (1 << bit):
|
- x & 0x01 为真 → "存在继电器有输出",否则 "存在继电器无输出"
|
||||||
items.append(desc)
|
- x & 0x02 为真 → "脉冲继电器有输出",否则 "脉冲继电器无输出"
|
||||||
return "; ".join(items) if items else "无输出"
|
汇总格式: "存在继电器有输出,脉冲继电器有输出"
|
||||||
|
"""
|
||||||
|
exist = "存在继电器有输出" if (relay & 0x01) else "存在继电器无输出"
|
||||||
|
pulse = "脉冲继电器有输出" if (relay & 0x02) else "脉冲继电器无输出"
|
||||||
|
return f"{exist},{pulse}"
|
||||||
|
|
||||||
|
|
||||||
# ─── 0x4A 获取设备版本号响应 ──────────────────────────────────────
|
# ─── 0x4A 获取设备版本号响应 ──────────────────────────────────────
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ from src.models import (
|
|||||||
upsert_fixture_param,
|
upsert_fixture_param,
|
||||||
insert_device_log,
|
insert_device_log,
|
||||||
update_device_status,
|
update_device_status,
|
||||||
|
get_all_device_serials,
|
||||||
)
|
)
|
||||||
from src.dg430 import (
|
from src.dg430 import (
|
||||||
parse_b2_status,
|
parse_b2_status,
|
||||||
@@ -147,6 +148,7 @@ async def handle_count_off(data: dict, addr: tuple):
|
|||||||
_registry[serial] = dnt_id
|
_registry[serial] = dnt_id
|
||||||
_heartbeat[serial] = time.time()
|
_heartbeat[serial] = time.time()
|
||||||
record_interaction(serial)
|
record_interaction(serial)
|
||||||
|
_device_status[serial] = 1 # 登录即视为在线状态
|
||||||
|
|
||||||
await ensure_collect_table(serial)
|
await ensure_collect_table(serial)
|
||||||
|
|
||||||
@@ -185,21 +187,11 @@ async def handle_heartbeat(data: dict) -> str | None:
|
|||||||
|
|
||||||
|
|
||||||
def handle_timestamp(data: dict) -> str:
|
def handle_timestamp(data: dict) -> str:
|
||||||
"""处理时间同步请求
|
"""处理时间同步请求(也是设备交互)"""
|
||||||
|
|
||||||
TimeStamp 请求格式:
|
|
||||||
{
|
|
||||||
"Method": "TimeStamp",
|
|
||||||
"Params": {
|
|
||||||
"Device_id": "2345",
|
|
||||||
"TimeZone": "Asia/Shanghai"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
返回格式参考 PGLC 协议 3.5 节。
|
|
||||||
"""
|
|
||||||
params = data.get("Params", {})
|
params = data.get("Params", {})
|
||||||
device_id = params.get("Device_id", "")
|
device_id = params.get("Device_id", "")
|
||||||
|
if device_id:
|
||||||
|
record_interaction(device_id)
|
||||||
return make_timestamp_response(device_id, int(time.time()))
|
return make_timestamp_response(device_id, int(time.time()))
|
||||||
|
|
||||||
|
|
||||||
@@ -395,6 +387,7 @@ async def parse_loop():
|
|||||||
dpg430_addr=wave.addr,
|
dpg430_addr=wave.addr,
|
||||||
remain_count=wave.remain_count,
|
remain_count=wave.remain_count,
|
||||||
relay_code=wave.relay_out,
|
relay_code=wave.relay_out,
|
||||||
|
relay_out=relay_info,
|
||||||
work_freq=wave.work_freq,
|
work_freq=wave.work_freq,
|
||||||
curr_dist=wave.curr_dist,
|
curr_dist=wave.curr_dist,
|
||||||
speed=wave.speed,
|
speed=wave.speed,
|
||||||
@@ -717,13 +710,14 @@ MONITOR_INTERVAL = 5 # 状态检查间隔 (秒)
|
|||||||
|
|
||||||
|
|
||||||
async def device_status_monitor():
|
async def device_status_monitor():
|
||||||
"""后台轮询:监控所有已注册设备在线/离线状态
|
"""后台轮询:监控所有设备在线/离线状态
|
||||||
|
|
||||||
判定规则:
|
两个阶段:
|
||||||
1. > OFFLINE_IDLE_SEC 无交互 → 离线
|
Phase 1 — 遍历 _registry 中活跃设备,根据交互记录判定状态
|
||||||
2. 最近 OFFLINE_IDLE_SEC 内交互次数 < POOR_MIN_INTERACTIONS → 通信不良
|
Phase 2 — 扫描 dnt_info 全表,修正与实际交互不符的状态:
|
||||||
3. 最近 3 次交互间隔均 < INTERACTION_TIMEOUT → 在线
|
- state=1(在线) 但 >60s 无交互 → 更新为离线
|
||||||
4. 状态变化时写入 tb_device_log + 更新 dnt_info.state
|
- state=0(离线) 但有交互记录 → 根据交互模式更新为在线/通信不良
|
||||||
|
- state 与计算值不一致 → 同步修正
|
||||||
"""
|
"""
|
||||||
logger.info("设备状态监控服务启动")
|
logger.info("设备状态监控服务启动")
|
||||||
await asyncio.sleep(5) # 等其它服务就绪
|
await asyncio.sleep(5) # 等其它服务就绪
|
||||||
@@ -731,16 +725,49 @@ async def device_status_monitor():
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
|
||||||
|
# ── Phase 1: 活跃设备(在 _registry 中)────
|
||||||
for device_id, dnt_id in list(_registry.items()):
|
for device_id, dnt_id in list(_registry.items()):
|
||||||
interactions = _interactions.get(device_id)
|
interactions = _interactions.get(device_id)
|
||||||
if not interactions:
|
actual_state = _calc_device_state(interactions, now) if interactions else 0
|
||||||
|
old_state = _device_status.get(device_id, 1) # 注册时默认在线
|
||||||
|
|
||||||
|
if actual_state != old_state:
|
||||||
|
await _apply_state_change(device_id, actual_state, old_state)
|
||||||
|
|
||||||
|
# ── Phase 2: 扫描 dnt_info 全表,修正不一致 ──
|
||||||
|
try:
|
||||||
|
rows = await get_all_device_serials()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"查询 dnt_info 失败: {e}")
|
||||||
|
rows = []
|
||||||
|
|
||||||
|
for serial, db_state, ip in rows:
|
||||||
|
if not serial:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# 计算新状态
|
interactions = _interactions.get(serial)
|
||||||
new_state = _calc_device_state(interactions, now)
|
actual_state = _calc_device_state(interactions, now) if interactions else 0
|
||||||
old_state = _device_status.get(device_id, 1) # 默认在线
|
memory_state = _device_status.get(serial)
|
||||||
|
|
||||||
if new_state != old_state:
|
# 优先用内存状态做基准(更实时);无内存状态则用 DB 状态
|
||||||
|
tracked_state = memory_state if memory_state is not None else db_state
|
||||||
|
|
||||||
|
if actual_state != tracked_state:
|
||||||
|
await _apply_state_change(serial, actual_state, tracked_state, ip=ip)
|
||||||
|
|
||||||
|
# ── Phase 3: 清理 _registry 中已经不存在的设备 ──
|
||||||
|
# (已注销 / 长期无交互的设备不清除,继续监控)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"设备状态监控异常: {e}")
|
||||||
|
|
||||||
|
await asyncio.sleep(MONITOR_INTERVAL)
|
||||||
|
|
||||||
|
|
||||||
|
async def _apply_state_change(device_id: str, new_state: int, old_state: int,
|
||||||
|
ip: str = ""):
|
||||||
|
"""应用状态变更:更新内存、dnt_info、写入日志"""
|
||||||
_device_status[device_id] = new_state
|
_device_status[device_id] = new_state
|
||||||
logger.info(
|
logger.info(
|
||||||
f"设备 {device_id} 状态变更: "
|
f"设备 {device_id} 状态变更: "
|
||||||
@@ -753,23 +780,24 @@ async def device_status_monitor():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"更新设备状态失败: {e}")
|
logger.error(f"更新设备状态失败: {e}")
|
||||||
|
|
||||||
# 写入事件日志
|
# 获取设备 IP(未传入时从 DB 查)
|
||||||
|
if not ip:
|
||||||
|
try:
|
||||||
dnt = await get_dnt_by_serial(device_id)
|
dnt = await get_dnt_by_serial(device_id)
|
||||||
dev_ip = dnt.get("ip", "") if dnt else ""
|
ip = dnt.get("ip", "") if dnt else ""
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 写入事件日志
|
||||||
event_type, content = _state_event(new_state, old_state)
|
event_type, content = _state_event(new_state, old_state)
|
||||||
try:
|
try:
|
||||||
await insert_device_log(
|
await insert_device_log(
|
||||||
serial=device_id, ip=dev_ip,
|
serial=device_id, ip=ip,
|
||||||
event_type=event_type, content=content,
|
event_type=event_type, content=content,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"写入设备事件日志失败: {e}")
|
logger.error(f"写入设备事件日志失败: {e}")
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"设备状态监控异常: {e}")
|
|
||||||
|
|
||||||
await asyncio.sleep(MONITOR_INTERVAL)
|
|
||||||
|
|
||||||
|
|
||||||
def _calc_device_state(interactions: deque, now: float) -> int:
|
def _calc_device_state(interactions: deque, now: float) -> int:
|
||||||
"""根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)"""
|
"""根据交互记录计算设备状态 (0=离线 1=在线 2=通信不良)"""
|
||||||
|
|||||||
@@ -152,7 +152,7 @@ async def _create_tables(pool: aiomysql.Pool):
|
|||||||
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
||||||
`username` VARCHAR(45) UNIQUE NOT NULL,
|
`username` VARCHAR(45) UNIQUE NOT NULL,
|
||||||
`password_hash` VARCHAR(256) NOT NULL,
|
`password_hash` VARCHAR(256) NOT NULL,
|
||||||
`role` VARCHAR(20) DEFAULT 'operator' COMMENT 'admin/operator',
|
`role` VARCHAR(20) DEFAULT 'operator' COMMENT 'admin/manager/operator/analyst',
|
||||||
`is_active` TINYINT DEFAULT 1,
|
`is_active` TINYINT DEFAULT 1,
|
||||||
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP
|
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
|
||||||
@@ -344,6 +344,15 @@ async def _create_tables(pool: aiomysql.Pool):
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# V2.3.0 迁移:tb_user 角色增加 manager
|
||||||
|
try:
|
||||||
|
await cur.execute(
|
||||||
|
"ALTER TABLE tb_user MODIFY COLUMN `role` VARCHAR(20) DEFAULT 'operator' "
|
||||||
|
"COMMENT 'admin/manager/operator/analyst'"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
logger.info("数据库表初始化完成")
|
logger.info("数据库表初始化完成")
|
||||||
|
|
||||||
|
|
||||||
@@ -522,7 +531,8 @@ async def insert_wave_data(dnt_id: int, dpg430_addr: int,
|
|||||||
remain_count: int, relay_code: int,
|
remain_count: int, relay_code: int,
|
||||||
work_freq: float, curr_dist: int, speed: int,
|
work_freq: float, curr_dist: int, speed: int,
|
||||||
near_dist: int, far_dist: int,
|
near_dist: int, far_dist: int,
|
||||||
enter_dist: int, leave_dist: int):
|
enter_dist: int, leave_dist: int,
|
||||||
|
relay_out: str = ""):
|
||||||
"""插入 0xB4 波动测试上报数据到 tb_state_tst"""
|
"""插入 0xB4 波动测试上报数据到 tb_state_tst"""
|
||||||
coil_id, simulate_car_id = await get_fixture_coil_car_ids(dnt_id)
|
coil_id, simulate_car_id = await get_fixture_coil_car_ids(dnt_id)
|
||||||
dev_type = await get_fixture_dev_type(dnt_id)
|
dev_type = await get_fixture_dev_type(dnt_id)
|
||||||
@@ -542,7 +552,7 @@ async def insert_wave_data(dnt_id: int, dpg430_addr: int,
|
|||||||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""",
|
||||||
(dnt_id, dpg430_addr, dev_type, str_type,
|
(dnt_id, dpg430_addr, dev_type, str_type,
|
||||||
1, "B4",
|
1, "B4",
|
||||||
"", relay_code,
|
relay_out, relay_code,
|
||||||
remain_count, work_freq, curr_dist, speed,
|
remain_count, work_freq, curr_dist, speed,
|
||||||
near_dist, far_dist, enter_dist, leave_dist,
|
near_dist, far_dist, enter_dist, leave_dist,
|
||||||
coil_id, simulate_car_id),
|
coil_id, simulate_car_id),
|
||||||
@@ -604,6 +614,17 @@ async def update_device_status(serial: str, state: int):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def get_all_device_serials() -> list[tuple[str, int, str]]:
|
||||||
|
"""获取所有设备 (serial, state, ip),用于状态扫描"""
|
||||||
|
pool = await get_pool()
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
async with conn.cursor() as cur:
|
||||||
|
await cur.execute(
|
||||||
|
"SELECT serial, state, ip FROM dnt_info ORDER BY id"
|
||||||
|
)
|
||||||
|
return await cur.fetchall()
|
||||||
|
|
||||||
|
|
||||||
# ─── tb_serialnet CRUD ─────────────────────────────────────────────
|
# ─── tb_serialnet CRUD ─────────────────────────────────────────────
|
||||||
|
|
||||||
async def get_pending_serialnet(dnt_id: int) -> dict | None:
|
async def get_pending_serialnet(dnt_id: int) -> dict | None:
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ from src.handlers import (
|
|||||||
device_status_monitor,
|
device_status_monitor,
|
||||||
set_udp_sender,
|
set_udp_sender,
|
||||||
)
|
)
|
||||||
|
from src.models import insert_device_log
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=getattr(logging, LOG_LEVEL),
|
level=getattr(logging, LOG_LEVEL),
|
||||||
@@ -75,6 +76,7 @@ class EDCProtocol:
|
|||||||
logger.info(f"UDP {msg} from {addr}")
|
logger.info(f"UDP {msg} from {addr}")
|
||||||
|
|
||||||
method = msg.get("Method", "")
|
method = msg.get("Method", "")
|
||||||
|
method_lower = method.lower()
|
||||||
logger.debug(f"UDP {method} from {addr}")
|
logger.debug(f"UDP {method} from {addr}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -82,11 +84,11 @@ class EDCProtocol:
|
|||||||
if method == "Count_Off":
|
if method == "Count_Off":
|
||||||
# 设备登录上报,只处理不回复
|
# 设备登录上报,只处理不回复
|
||||||
await handle_count_off(msg, addr)
|
await handle_count_off(msg, addr)
|
||||||
elif method == "Heartbeat":
|
elif method_lower == "heartbeat":
|
||||||
response = await handle_heartbeat(msg)
|
response = await handle_heartbeat(msg)
|
||||||
elif method == "TSReport":
|
elif method_lower == "tsreport":
|
||||||
response = await handle_tsreport(msg)
|
response = await handle_tsreport(msg)
|
||||||
elif method == "SerialNet":
|
elif method_lower == "serialnet":
|
||||||
response = await handle_serial_net(msg)
|
response = await handle_serial_net(msg)
|
||||||
|
|
||||||
if response and self.transport:
|
if response and self.transport:
|
||||||
@@ -105,24 +107,36 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
|
|||||||
- 紧凑 JSON (无换行)
|
- 紧凑 JSON (无换行)
|
||||||
"""
|
"""
|
||||||
addr = writer.get_extra_info("peername")
|
addr = writer.get_extra_info("peername")
|
||||||
|
addr_ip = addr[0] if addr else ""
|
||||||
logger.info(f"TCP 连接: {addr}")
|
logger.info(f"TCP 连接: {addr}")
|
||||||
|
|
||||||
|
# TCP 连接事件日志
|
||||||
|
try:
|
||||||
|
asyncio.ensure_future(insert_device_log(
|
||||||
|
serial="", ip=addr_ip,
|
||||||
|
event_type="tcp_connect",
|
||||||
|
content=f"TCP 连接: {addr}",
|
||||||
|
))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
buffer = b""
|
buffer = b""
|
||||||
|
|
||||||
async def process_message(msg: dict):
|
async def process_message(msg: dict):
|
||||||
"""处理单条消息并返回响应文本"""
|
"""处理单条消息并返回响应文本"""
|
||||||
logger.info(f"TCP get_rcv {msg} from {addr}")
|
logger.info(f"TCP get_rcv {msg} from {addr}")
|
||||||
method = msg.get("Method", "")
|
method = msg.get("Method", "")
|
||||||
|
method_lower = method.lower()
|
||||||
logger.debug(f"TCP {method} from {addr}")
|
logger.debug(f"TCP {method} from {addr}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if method == "TimeStamp":
|
if method_lower == "timestamp":
|
||||||
return handle_timestamp(msg)
|
return handle_timestamp(msg)
|
||||||
elif method == "TSReport":
|
elif method_lower == "tsreport":
|
||||||
return await handle_tsreport(msg)
|
return await handle_tsreport(msg)
|
||||||
elif method == "SerialNet":
|
elif method_lower == "serialnet":
|
||||||
return await handle_serial_net(msg)
|
return await handle_serial_net(msg)
|
||||||
elif method == "Heartbeat":
|
elif method_lower == "heartbeat":
|
||||||
return await handle_heartbeat(msg)
|
return await handle_heartbeat(msg)
|
||||||
else:
|
else:
|
||||||
logger.debug(f"TCP 未知方法: {method}")
|
logger.debug(f"TCP 未知方法: {method}")
|
||||||
@@ -176,6 +190,15 @@ async def handle_tcp_client(reader: asyncio.StreamReader,
|
|||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
logger.info(f"TCP 断开: {addr}")
|
logger.info(f"TCP 断开: {addr}")
|
||||||
|
# TCP 断开事件日志
|
||||||
|
try:
|
||||||
|
asyncio.ensure_future(insert_device_log(
|
||||||
|
serial="", ip=addr_ip,
|
||||||
|
event_type="tcp_disconnect",
|
||||||
|
content=f"TCP 断开: {addr}",
|
||||||
|
))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
writer.close()
|
writer.close()
|
||||||
await writer.wait_closed()
|
await writer.wait_closed()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user