- 新增 /device-logs 设备事件日志管理页 (admin 权限) - 支持按设备序列号/事件类型筛选查询 - 支持 admin 按条件删除日志 - 不同事件类型彩色标识 (在线=绿, 离线=红, 通信不良=橙) - 新增 /api/devices/<id>/status 设备状态 API - 设备列表页:每 5s 异步刷新所有设备在线状态 - 测试操作页:顶部显示设备状态,每 5s 异步刷新 - dnt_info state 支持三态显示 (在线/离线/通信不良) - 导航栏增加「设备日志」入口 (admin only)
906 lines
31 KiB
Python
906 lines
31 KiB
Python
"""MySQL 数据库操作(同步 pymysql)"""
|
||
|
||
import pymysql
|
||
from app.config import Config
|
||
|
||
|
||
def get_conn():
|
||
return pymysql.connect(
|
||
host=Config.MYSQL_HOST,
|
||
port=Config.MYSQL_PORT,
|
||
user=Config.MYSQL_USER,
|
||
password=Config.MYSQL_PASSWORD,
|
||
database=Config.MYSQL_DB,
|
||
charset="utf8mb4",
|
||
cursorclass=pymysql.cursors.DictCursor,
|
||
)
|
||
|
||
|
||
# ─── dnt_info ──────────────────────────────────────────────────────
|
||
|
||
def get_all_devices() -> list[dict]:
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT id, serial, name, ip, port, state, version, last_login FROM dnt_info ORDER BY id DESC")
|
||
return cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def update_device_name(device_id: int, name: str):
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("UPDATE dnt_info SET name=%s WHERE id=%s", (name, device_id))
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_device_by_id(device_id: int) -> dict | None:
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT * FROM dnt_info WHERE id=%s", (device_id,))
|
||
return cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── tb_serialnet ──────────────────────────────────────────────────
|
||
|
||
def insert_serialnet(dnt_id: int, send_pkg: str) -> int:
|
||
"""返回 record_id"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"INSERT INTO tb_serialnet (dnt_id, send_pkg) VALUES (%s, %s)",
|
||
(dnt_id, send_pkg),
|
||
)
|
||
conn.commit()
|
||
return cur.lastrowid
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_serialnet_stats(dnt_id: int) -> dict:
|
||
"""返回 {total, pending, sent, done, failed}"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT state, COUNT(*) as cnt FROM tb_serialnet "
|
||
"WHERE dnt_id=%s GROUP BY state",
|
||
(dnt_id,),
|
||
)
|
||
rows = cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
stats = {"total": 0, "pending": 0, "sent": 0, "done": 0, "failed": 0}
|
||
for r in rows:
|
||
s = r["state"]
|
||
stats["total"] += r["cnt"]
|
||
if s == 0:
|
||
stats["pending"] = r["cnt"]
|
||
elif s == 1:
|
||
stats["sent"] = r["cnt"]
|
||
elif s == 2:
|
||
stats["done"] = r["cnt"]
|
||
elif s == 3:
|
||
stats["failed"] = r["cnt"]
|
||
return stats
|
||
|
||
|
||
def get_serialnet_records(dnt_id: int, limit: int = 50) -> list[dict]:
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT * FROM tb_serialnet WHERE dnt_id=%s "
|
||
"ORDER BY id DESC LIMIT %s",
|
||
(dnt_id, limit),
|
||
)
|
||
return cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_serialnet_by_id(record_id: int) -> dict | None:
|
||
"""根据 ID 获取 tb_serialnet 记录"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT * FROM tb_serialnet WHERE id=%s", (record_id,),
|
||
)
|
||
return cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def clear_serialnet_records(dnt_id: int):
|
||
"""清除指定设备的所有透传记录"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("DELETE FROM tb_serialnet WHERE dnt_id=%s", (dnt_id,))
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── tb_state_tst ──────────────────────────────────────────────────
|
||
|
||
def get_latest_test_state(dnt_id: int) -> dict | None:
|
||
"""获取设备最新一条测试状态"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT * FROM tb_state_tst WHERE dnt_id=%s "
|
||
"ORDER BY id DESC LIMIT 1",
|
||
(dnt_id,),
|
||
)
|
||
return cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_test_data(page: int = 1, per_page: int = 20,
|
||
serial: str = "", date_from: str = "",
|
||
date_to: str = "", test_mode: str = "",
|
||
data_source: str = "") -> tuple[list[dict], int]:
|
||
"""分页查询测试数据(JOIN dnt_info),返回 (records, total)
|
||
|
||
test_mode: ''=全部, '0'=灵敏度, '1'=波动
|
||
data_source: ''=全部, 'B2', 'B4'
|
||
"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
where = []
|
||
params = []
|
||
if serial:
|
||
where.append("d.serial LIKE %s")
|
||
params.append(f"%{serial}%")
|
||
if date_from:
|
||
where.append("t.create_time >= %s")
|
||
params.append(date_from if len(date_from) > 10 else date_from)
|
||
if date_to:
|
||
where.append("t.create_time <= %s")
|
||
params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59")
|
||
if test_mode:
|
||
where.append("t.test_mode = %s")
|
||
params.append(int(test_mode))
|
||
if data_source:
|
||
where.append("t.data_source = %s")
|
||
params.append(data_source)
|
||
|
||
where_clause = " AND ".join(where) if where else "1=1"
|
||
|
||
# count
|
||
cur.execute(
|
||
f"SELECT COUNT(*) as total FROM tb_state_tst t "
|
||
f"JOIN dnt_info d ON t.dnt_id = d.id WHERE {where_clause}",
|
||
params,
|
||
)
|
||
total = cur.fetchone()["total"]
|
||
|
||
# data
|
||
offset = (page - 1) * per_page
|
||
cur.execute(
|
||
f"SELECT t.*, d.serial, "
|
||
f"c.coil_num, c.name as coil_name, "
|
||
f"sc.simulate_num, sc.name as car_name "
|
||
f"FROM tb_state_tst t "
|
||
f"JOIN dnt_info d ON t.dnt_id = d.id "
|
||
f"LEFT JOIN tb_coil_info c ON t.coil_id = c.id "
|
||
f"LEFT JOIN tb_simulate_car sc ON t.simulate_car_id = sc.id "
|
||
f"WHERE {where_clause} "
|
||
f"ORDER BY t.id DESC LIMIT %s OFFSET %s",
|
||
params + [per_page, offset],
|
||
)
|
||
records = cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
return records, total
|
||
|
||
|
||
def get_all_test_data_for_export(serial: str = "", date_from: str = "",
|
||
date_to: str = "", test_mode: str = "",
|
||
data_source: str = "") -> list[dict]:
|
||
"""导出全部数据
|
||
|
||
test_mode: ''=全部, '0'=灵敏度, '1'=波动
|
||
data_source: ''=全部, 'B2', 'B4'
|
||
"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
where = []
|
||
params = []
|
||
if serial:
|
||
where.append("d.serial LIKE %s")
|
||
params.append(f"%{serial}%")
|
||
if date_from:
|
||
where.append("t.create_time >= %s")
|
||
params.append(date_from if len(date_from) > 10 else date_from)
|
||
if date_to:
|
||
where.append("t.create_time <= %s")
|
||
params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59")
|
||
if test_mode:
|
||
where.append("t.test_mode = %s")
|
||
params.append(int(test_mode))
|
||
if data_source:
|
||
where.append("t.data_source = %s")
|
||
params.append(data_source)
|
||
|
||
where_clause = " AND ".join(where) if where else "1=1"
|
||
cur.execute(
|
||
f"SELECT t.*, d.serial, "
|
||
f"c.coil_num, c.name as coil_name, "
|
||
f"sc.simulate_num, sc.name as car_name "
|
||
f"FROM tb_state_tst t "
|
||
f"JOIN dnt_info d ON t.dnt_id = d.id "
|
||
f"LEFT JOIN tb_coil_info c ON t.coil_id = c.id "
|
||
f"LEFT JOIN tb_simulate_car sc ON t.simulate_car_id = sc.id "
|
||
f"WHERE {where_clause} ORDER BY t.id DESC",
|
||
params,
|
||
)
|
||
return cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_automation_averages(dnt_id: int, since: str = None) -> dict:
|
||
"""获取本次自动化测试的平均值
|
||
|
||
since: ISO 时间字符串,只统计此时间之后的测试记录
|
||
速度从 dm/s 转换为 m/s(÷10)
|
||
"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
where = "dnt_id=%s"
|
||
params = [dnt_id]
|
||
if since:
|
||
where += " AND create_time >= %s"
|
||
params.append(since)
|
||
cur.execute(
|
||
f"SELECT AVG(ppvalue) as avg_ppvalue, "
|
||
"AVG(idle_freq) as avg_idle_freq, "
|
||
"AVG(enter_freq) as avg_enter_freq, "
|
||
"AVG(exit_freq) as avg_exit_freq, "
|
||
"AVG(enter_dist) as avg_enter_dist, "
|
||
"AVG(exit_dist) as avg_exit_dist, "
|
||
"AVG(enter_speed) as avg_enter_speed, "
|
||
"AVG(exit_speed) as avg_exit_speed "
|
||
f"FROM tb_state_tst WHERE {where}",
|
||
params,
|
||
)
|
||
row = cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
if row:
|
||
result = {k: round(v, 2) if v else 0 for k, v in row.items()}
|
||
# 速度 dm/s → m/s
|
||
if result.get("avg_enter_speed"):
|
||
result["avg_enter_speed"] = round(result["avg_enter_speed"] / 10, 2)
|
||
if result.get("avg_exit_speed"):
|
||
result["avg_exit_speed"] = round(result["avg_exit_speed"] / 10, 2)
|
||
return result
|
||
return {}
|
||
|
||
|
||
def get_automation_records(dnt_id: int, since: str) -> list[dict]:
|
||
"""获取本轮自动化测试的所有记录(含 serialnet 状态)"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT t.*, sn.state as sn_state "
|
||
"FROM tb_state_tst t "
|
||
"LEFT JOIN tb_serialnet sn ON sn.dnt_id = t.dnt_id "
|
||
" AND sn.state IN (2,3) "
|
||
" AND sn.update_time >= t.create_time "
|
||
" AND sn.update_time < DATE_ADD(t.create_time, INTERVAL 1 SECOND) "
|
||
"WHERE t.dnt_id=%s AND t.create_time >= %s "
|
||
"ORDER BY t.id ASC",
|
||
(dnt_id, since),
|
||
)
|
||
return cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_latest_wave_data(dnt_id: int) -> dict | None:
|
||
"""获取设备最新一条 B4 波动测试数据"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT * FROM tb_state_tst WHERE dnt_id=%s AND data_source='B4' "
|
||
"ORDER BY id DESC LIMIT 1",
|
||
(dnt_id,),
|
||
)
|
||
return cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_wave_records(dnt_id: int, since: str) -> list[dict]:
|
||
"""获取本轮 B4 波动测试明细"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT * FROM tb_state_tst "
|
||
"WHERE dnt_id=%s AND data_source='B4' AND create_time >= %s "
|
||
"ORDER BY id ASC",
|
||
(dnt_id, since),
|
||
)
|
||
return cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── 用户管理 ──────────────────────────────────────────────────────
|
||
|
||
def get_user_by_username(username: str) -> dict | None:
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT * FROM tb_user WHERE username=%s", (username,))
|
||
return cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_all_users() -> list[dict]:
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT id, username, role, is_active, create_time FROM tb_user ORDER BY id")
|
||
return cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def create_user(username: str, password_hash: str, role: str = "operator"):
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"INSERT INTO tb_user (username, password_hash, role) VALUES (%s,%s,%s)",
|
||
(username, password_hash, role),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def update_user(user_id: int, password_hash: str = None, role: str = None, is_active: bool = None):
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
parts = []
|
||
params = []
|
||
if password_hash is not None:
|
||
parts.append("password_hash=%s")
|
||
params.append(password_hash)
|
||
if role is not None:
|
||
parts.append("role=%s")
|
||
params.append(role)
|
||
if is_active is not None:
|
||
parts.append("is_active=%s")
|
||
params.append(int(is_active))
|
||
if parts:
|
||
params.append(user_id)
|
||
cur.execute(f"UPDATE tb_user SET {', '.join(parts)} WHERE id=%s", params)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── 日志管理 ──────────────────────────────────────────────────────
|
||
|
||
def insert_log(user_id: int, username: str, action_type: str,
|
||
target: str = "", detail: str = "", result: str = "ok",
|
||
ip: str = ""):
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"INSERT INTO tb_log (user_id, username, action_type, target, detail, result, ip) "
|
||
"VALUES (%s,%s,%s,%s,%s,%s,%s)",
|
||
(user_id, username, action_type, target, detail, result, ip),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_logs(page: int = 1, per_page: int = 30,
|
||
username: str = "", action_type: str = "") -> tuple[list[dict], int]:
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
where = []
|
||
params = []
|
||
if username:
|
||
where.append("username LIKE %s")
|
||
params.append(f"%{username}%")
|
||
if action_type:
|
||
where.append("action_type=%s")
|
||
params.append(action_type)
|
||
where_clause = " AND ".join(where) if where else "1=1"
|
||
|
||
cur.execute(f"SELECT COUNT(*) as total FROM tb_log WHERE {where_clause}", params)
|
||
total = cur.fetchone()["total"]
|
||
|
||
offset = (page - 1) * per_page
|
||
cur.execute(
|
||
f"SELECT * FROM tb_log WHERE {where_clause} "
|
||
f"ORDER BY id DESC LIMIT %s OFFSET %s",
|
||
params + [per_page, offset],
|
||
)
|
||
return cur.fetchall(), total
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── tb_fixture_param ──────────────────────────────────────────────
|
||
|
||
def get_fixture_param(dnt_id: int) -> dict | None:
|
||
"""获取设备的工装测试参数(含线圈和模拟车辆信息)"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT fp.*, "
|
||
"c.coil_num, c.name as coil_name, c.shape as coil_shape, "
|
||
"c.length as coil_length, c.width as coil_width, c.radius as coil_radius, "
|
||
"c.turns as coil_turns, c.resistance as coil_resistance, "
|
||
"c.material as coil_material, "
|
||
"sc.simulate_num, sc.name as car_name, sc.shape as car_shape, "
|
||
"sc.length as car_length, sc.width as car_width, sc.radius as car_radius, "
|
||
"sc.material as car_material "
|
||
"FROM tb_fixture_param fp "
|
||
"LEFT JOIN tb_coil_info c ON fp.coil_id = c.id "
|
||
"LEFT JOIN tb_simulate_car sc ON fp.simulate_car_id = sc.id "
|
||
"WHERE fp.dnt_id=%s",
|
||
(dnt_id,),
|
||
)
|
||
return cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def upsert_fixture_param(dnt_id: int, **kwargs):
|
||
"""插入或更新工装测试参数"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT id FROM tb_fixture_param WHERE dnt_id=%s", (dnt_id,),
|
||
)
|
||
existing = cur.fetchone()
|
||
# 主线参数字段(不含 coil_id/simulate_car_id,后面单独处理)
|
||
fields = [
|
||
"Addr", "DevType", "TestMode", "RestDis", "MinusDis",
|
||
"SensMin", "SensMax", "FreMin", "FreMax", "PeakMin", "PeakMax",
|
||
"FarTol", "NearTol", "StepTol", "BackForth", "NearStay", "FarStay",
|
||
]
|
||
if existing:
|
||
sets = ", ".join(f"`{f}`=%s" for f in fields)
|
||
values = [kwargs.get(f, 0) for f in fields] + [dnt_id]
|
||
cur.execute(
|
||
f"UPDATE tb_fixture_param SET {sets} WHERE dnt_id=%s",
|
||
values,
|
||
)
|
||
else:
|
||
placeholders = ", ".join(["%s"] * len(fields))
|
||
col_names = ", ".join(f"`{f}`" for f in fields)
|
||
values = [kwargs.get(f, 0) for f in fields]
|
||
cur.execute(
|
||
f"INSERT INTO tb_fixture_param (dnt_id, {col_names}) "
|
||
f"VALUES (%s, {placeholders})",
|
||
[dnt_id] + values,
|
||
)
|
||
# 单独处理线圈/模拟车辆关联(可选,不覆盖已有值)
|
||
if "coil_id" in kwargs:
|
||
cur.execute(
|
||
"UPDATE tb_fixture_param SET coil_id=%s WHERE dnt_id=%s",
|
||
(kwargs["coil_id"], dnt_id),
|
||
)
|
||
if "simulate_car_id" in kwargs:
|
||
cur.execute(
|
||
"UPDATE tb_fixture_param SET simulate_car_id=%s WHERE dnt_id=%s",
|
||
(kwargs["simulate_car_id"], dnt_id),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── tb_vechicle_base_test ─────────────────────────────────────────
|
||
|
||
def get_vehicle_base_tests(search: str = "") -> list[dict]:
|
||
"""获取车检器测试基准参数列表"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
if search:
|
||
cur.execute(
|
||
"SELECT * FROM tb_vechicle_base_test "
|
||
"WHERE dev_name LIKE %s OR type_num LIKE %s "
|
||
"ORDER BY type_num ASC",
|
||
(f"%{search}%", f"%{search}%"),
|
||
)
|
||
else:
|
||
cur.execute(
|
||
"SELECT * FROM tb_vechicle_base_test ORDER BY type_num ASC",
|
||
)
|
||
return cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_vehicle_base_test_by_id(test_id: int) -> dict | None:
|
||
"""根据 ID 获取车检器测试基准"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"SELECT * FROM tb_vechicle_base_test WHERE id=%s", (test_id,),
|
||
)
|
||
return cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def create_vehicle_base_test(**kwargs) -> int:
|
||
"""创建车检器测试基准,返回新记录 ID"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
fields = [
|
||
"dev_name", "type_num", "SensMin", "SensMax",
|
||
"FreMin", "FreMax", "PeakMin", "PeakMax", "remark",
|
||
]
|
||
col_names = ", ".join(f"`{f}`" for f in fields)
|
||
placeholders = ", ".join(["%s"] * len(fields))
|
||
values = [kwargs.get(f, "" if f in ("dev_name", "remark") else 0) for f in fields]
|
||
cur.execute(
|
||
f"INSERT INTO tb_vechicle_base_test ({col_names}) VALUES ({placeholders})",
|
||
values,
|
||
)
|
||
conn.commit()
|
||
return cur.lastrowid
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def update_vehicle_base_test(test_id: int, **kwargs):
|
||
"""更新车检器测试基准"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
fields = [
|
||
"dev_name", "type_num", "SensMin", "SensMax",
|
||
"FreMin", "FreMax", "PeakMin", "PeakMax", "remark",
|
||
]
|
||
sets = ", ".join(f"`{f}`=%s" for f in fields)
|
||
values = [kwargs.get(f, "" if f in ("dev_name", "remark") else 0) for f in fields] + [test_id]
|
||
cur.execute(
|
||
f"UPDATE tb_vechicle_base_test SET {sets} WHERE id=%s",
|
||
values,
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def delete_vehicle_base_test(test_id: int):
|
||
"""删除车检器测试基准"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"DELETE FROM tb_vechicle_base_test WHERE id=%s", (test_id,),
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── 线圈参数 CRUD ──────────────────────────────────────────────────
|
||
|
||
def get_coil_info_list(search: str = "") -> list[dict]:
|
||
"""获取线圈参数列表"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
if search:
|
||
cur.execute(
|
||
"SELECT * FROM tb_coil_info WHERE coil_num LIKE %s OR name LIKE %s "
|
||
"ORDER BY id DESC",
|
||
(f"%{search}%", f"%{search}%"),
|
||
)
|
||
else:
|
||
cur.execute("SELECT * FROM tb_coil_info ORDER BY id DESC")
|
||
return cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_coil_info_by_id(coil_id: int) -> dict | None:
|
||
"""获取单个线圈参数"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT * FROM tb_coil_info WHERE id=%s", (coil_id,))
|
||
return cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def create_coil_info(**kwargs) -> int:
|
||
"""创建线圈参数,返回新 ID"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
fields = [
|
||
"coil_num", "name", "induct", "shape", "length", "width",
|
||
"radius", "turns", "resistance", "material", "remark",
|
||
]
|
||
col_names = ", ".join(f"`{f}`" for f in fields)
|
||
placeholders = ", ".join(["%s"] * len(fields))
|
||
values = [kwargs.get(f, "" if f in ("coil_num", "name", "shape", "material", "remark") else 0) for f in fields]
|
||
cur.execute(
|
||
f"INSERT INTO tb_coil_info ({col_names}) VALUES ({placeholders})",
|
||
values,
|
||
)
|
||
conn.commit()
|
||
return cur.lastrowid
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def update_coil_info(coil_id: int, **kwargs):
|
||
"""更新线圈参数"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
fields = [
|
||
"coil_num", "name", "induct", "shape", "length", "width",
|
||
"radius", "turns", "resistance", "material", "remark",
|
||
]
|
||
sets = ", ".join(f"`{f}`=%s" for f in fields)
|
||
values = [kwargs.get(f, "" if f in ("coil_num", "name", "shape", "material", "remark") else 0) for f in fields] + [coil_id]
|
||
cur.execute(
|
||
f"UPDATE tb_coil_info SET {sets} WHERE id=%s", values,
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def delete_coil_info(coil_id: int):
|
||
"""删除线圈参数"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("DELETE FROM tb_coil_info WHERE id=%s", (coil_id,))
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── 模拟车辆参数 CRUD ──────────────────────────────────────────────
|
||
|
||
def get_simulate_car_list(search: str = "") -> list[dict]:
|
||
"""获取模拟车辆参数列表"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
if search:
|
||
cur.execute(
|
||
"SELECT * FROM tb_simulate_car WHERE simulate_num LIKE %s OR name LIKE %s "
|
||
"ORDER BY id DESC",
|
||
(f"%{search}%", f"%{search}%"),
|
||
)
|
||
else:
|
||
cur.execute("SELECT * FROM tb_simulate_car ORDER BY id DESC")
|
||
return cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_simulate_car_by_id(car_id: int) -> dict | None:
|
||
"""获取单个模拟车辆参数"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT * FROM tb_simulate_car WHERE id=%s", (car_id,))
|
||
return cur.fetchone()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def create_simulate_car(**kwargs) -> int:
|
||
"""创建模拟车辆参数,返回新 ID"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
fields = [
|
||
"simulate_num", "name", "shape", "length", "width",
|
||
"radius", "material", "remark",
|
||
]
|
||
col_names = ", ".join(f"`{f}`" for f in fields)
|
||
placeholders = ", ".join(["%s"] * len(fields))
|
||
values = [kwargs.get(f, "" if f in ("simulate_num", "name", "shape", "material", "remark") else 0) for f in fields]
|
||
cur.execute(
|
||
f"INSERT INTO tb_simulate_car ({col_names}) VALUES ({placeholders})",
|
||
values,
|
||
)
|
||
conn.commit()
|
||
return cur.lastrowid
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def update_simulate_car(car_id: int, **kwargs):
|
||
"""更新模拟车辆参数"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
fields = [
|
||
"simulate_num", "name", "shape", "length", "width",
|
||
"radius", "material", "remark",
|
||
]
|
||
sets = ", ".join(f"`{f}`=%s" for f in fields)
|
||
values = [kwargs.get(f, "" if f in ("simulate_num", "name", "shape", "material", "remark") else 0) for f in fields] + [car_id]
|
||
cur.execute(
|
||
f"UPDATE tb_simulate_car SET {sets} WHERE id=%s", values,
|
||
)
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def delete_simulate_car(car_id: int):
|
||
"""删除模拟车辆参数"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("DELETE FROM tb_simulate_car WHERE id=%s", (car_id,))
|
||
conn.commit()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── 测试数据删除 ──────────────────────────────────────────────
|
||
|
||
def delete_test_data(serial: str = "", date_from: str = "",
|
||
date_to: str = "", data_source: str = "") -> int:
|
||
"""删除符合条件的测试数据,返回删除行数
|
||
|
||
必须至少提供一个条件(serial / date范围 / data_source),不允许无条件全删。
|
||
"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
where = []
|
||
params = []
|
||
if serial:
|
||
where.append("t.dnt_id IN (SELECT id FROM dnt_info WHERE serial LIKE %s)")
|
||
params.append(f"%{serial}%")
|
||
if date_from:
|
||
where.append("t.create_time >= %s")
|
||
params.append(date_from if len(date_from) > 10 else date_from)
|
||
if date_to:
|
||
where.append("t.create_time <= %s")
|
||
params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59")
|
||
if data_source:
|
||
where.append("t.data_source = %s")
|
||
params.append(data_source)
|
||
|
||
if not where:
|
||
return 0 # 拒绝无条件全删
|
||
|
||
where_clause = " AND ".join(where)
|
||
cur.execute(
|
||
f"SELECT COUNT(*) as cnt FROM tb_state_tst t WHERE {where_clause}",
|
||
params,
|
||
)
|
||
cnt = cur.fetchone()["cnt"]
|
||
|
||
cur.execute(
|
||
f"DELETE t FROM tb_state_tst t WHERE {where_clause}", params,
|
||
)
|
||
conn.commit()
|
||
return cnt
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─── tb_device_log ─────────────────────────────────────────────────
|
||
|
||
def get_device_logs(page: int = 1, per_page: int = 30,
|
||
serial: str = "", event_type: str = "") -> tuple[list[dict], int]:
|
||
"""分页查询设备事件日志,返回 (records, total)"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
where = []
|
||
params = []
|
||
if serial:
|
||
where.append("device_serial LIKE %s")
|
||
params.append(f"%{serial}%")
|
||
if event_type:
|
||
where.append("event_type = %s")
|
||
params.append(event_type)
|
||
|
||
where_clause = " AND ".join(where) if where else "1=1"
|
||
|
||
cur.execute(
|
||
f"SELECT COUNT(*) as total FROM tb_device_log WHERE {where_clause}",
|
||
params,
|
||
)
|
||
total = cur.fetchone()["total"]
|
||
|
||
offset = (page - 1) * per_page
|
||
cur.execute(
|
||
f"SELECT * FROM tb_device_log WHERE {where_clause} "
|
||
f"ORDER BY id DESC LIMIT %s OFFSET %s",
|
||
params + [per_page, offset],
|
||
)
|
||
return cur.fetchall(), total
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def delete_device_logs(serial: str = "", event_type: str = "",
|
||
date_from: str = "", date_to: str = "") -> int:
|
||
"""删除符合条件的设备日志,返回删除行数。至少需要一个条件。"""
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
where = []
|
||
params = []
|
||
if serial:
|
||
where.append("device_serial LIKE %s")
|
||
params.append(f"%{serial}%")
|
||
if event_type:
|
||
where.append("event_type = %s")
|
||
params.append(event_type)
|
||
if date_from:
|
||
where.append("create_time >= %s")
|
||
params.append(date_from if len(date_from) > 10 else date_from)
|
||
if date_to:
|
||
where.append("create_time <= %s")
|
||
params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59")
|
||
|
||
if not where:
|
||
return 0
|
||
|
||
where_clause = " AND ".join(where)
|
||
cur.execute(
|
||
f"SELECT COUNT(*) as cnt FROM tb_device_log WHERE {where_clause}",
|
||
params,
|
||
)
|
||
cnt = cur.fetchone()["cnt"]
|
||
|
||
cur.execute(
|
||
f"DELETE FROM tb_device_log WHERE {where_clause}", params,
|
||
)
|
||
conn.commit()
|
||
return cnt
|
||
finally:
|
||
conn.close()
|