Files
vd_test_fixture/edc-web/app/models.py
wangfq 60215eef48 feat: 自动化增加起止时间 + 本轮每次测试明细表
- 显示自动化开始/结束时间
- 测试信息区新增本轮测试明细表(序号、状态、关键字段)
- 方便对照平均值是否计算正确
2026-05-28 15:19:20 +08:00

378 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MySQL 数据库操作(同步 pymysql"""
import pymysql
from app.config import Config
def get_conn():
return pymysql.connect(
host=Config.MYSQL_HOST,
port=Config.MYSQL_PORT,
user=Config.MYSQL_USER,
password=Config.MYSQL_PASSWORD,
database=Config.MYSQL_DB,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)
# ─── dnt_info ──────────────────────────────────────────────────────
def get_all_devices() -> list[dict]:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT id, serial, name, ip, port, state, version, last_login FROM dnt_info ORDER BY id DESC")
return cur.fetchall()
finally:
conn.close()
def update_device_name(device_id: int, name: str):
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("UPDATE dnt_info SET name=%s WHERE id=%s", (name, device_id))
conn.commit()
finally:
conn.close()
def get_device_by_id(device_id: int) -> dict | None:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM dnt_info WHERE id=%s", (device_id,))
return cur.fetchone()
finally:
conn.close()
# ─── tb_serialnet ──────────────────────────────────────────────────
def insert_serialnet(dnt_id: int, send_pkg: str) -> int:
"""返回 record_id"""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO tb_serialnet (dnt_id, send_pkg) VALUES (%s, %s)",
(dnt_id, send_pkg),
)
conn.commit()
return cur.lastrowid
finally:
conn.close()
def get_serialnet_stats(dnt_id: int) -> dict:
"""返回 {total, pending, sent, done, failed}"""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT state, COUNT(*) as cnt FROM tb_serialnet "
"WHERE dnt_id=%s GROUP BY state",
(dnt_id,),
)
rows = cur.fetchall()
finally:
conn.close()
stats = {"total": 0, "pending": 0, "sent": 0, "done": 0, "failed": 0}
for r in rows:
s = r["state"]
stats["total"] += r["cnt"]
if s == 0:
stats["pending"] = r["cnt"]
elif s == 1:
stats["sent"] = r["cnt"]
elif s == 2:
stats["done"] = r["cnt"]
elif s == 3:
stats["failed"] = r["cnt"]
return stats
def get_serialnet_records(dnt_id: int, limit: int = 50) -> list[dict]:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM tb_serialnet WHERE dnt_id=%s "
"ORDER BY id DESC LIMIT %s",
(dnt_id, limit),
)
return cur.fetchall()
finally:
conn.close()
def clear_serialnet_records(dnt_id: int):
"""清除指定设备的所有透传记录"""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("DELETE FROM tb_serialnet WHERE dnt_id=%s", (dnt_id,))
conn.commit()
finally:
conn.close()
# ─── tb_state_tst ──────────────────────────────────────────────────
def get_latest_test_state(dnt_id: int) -> dict | None:
"""获取设备最新一条测试状态"""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM tb_state_tst WHERE dnt_id=%s "
"ORDER BY id DESC LIMIT 1",
(dnt_id,),
)
return cur.fetchone()
finally:
conn.close()
def get_test_data(page: int = 1, per_page: int = 20,
serial: str = "", date_from: str = "",
date_to: str = "") -> tuple[list[dict], int]:
"""分页查询测试数据JOIN dnt_info返回 (records, total)"""
conn = get_conn()
try:
with conn.cursor() as cur:
where = []
params = []
if serial:
where.append("d.serial LIKE %s")
params.append(f"%{serial}%")
if date_from:
where.append("t.create_time >= %s")
params.append(date_from)
if date_to:
where.append("t.create_time <= %s")
params.append(date_to + " 23:59:59")
where_clause = " AND ".join(where) if where else "1=1"
# count
cur.execute(
f"SELECT COUNT(*) as total FROM tb_state_tst t "
f"JOIN dnt_info d ON t.dnt_id = d.id WHERE {where_clause}",
params,
)
total = cur.fetchone()["total"]
# data
offset = (page - 1) * per_page
cur.execute(
f"SELECT t.*, d.serial FROM tb_state_tst t "
f"JOIN dnt_info d ON t.dnt_id = d.id "
f"WHERE {where_clause} "
f"ORDER BY t.id DESC LIMIT %s OFFSET %s",
params + [per_page, offset],
)
records = cur.fetchall()
finally:
conn.close()
return records, total
def get_all_test_data_for_export(serial: str = "", date_from: str = "",
date_to: str = "") -> list[dict]:
"""导出全部数据"""
conn = get_conn()
try:
with conn.cursor() as cur:
where = []
params = []
if serial:
where.append("d.serial LIKE %s")
params.append(f"%{serial}%")
if date_from:
where.append("t.create_time >= %s")
params.append(date_from)
if date_to:
where.append("t.create_time <= %s")
params.append(date_to + " 23:59:59")
where_clause = " AND ".join(where) if where else "1=1"
cur.execute(
f"SELECT t.*, d.serial FROM tb_state_tst t "
f"JOIN dnt_info d ON t.dnt_id = d.id "
f"WHERE {where_clause} ORDER BY t.id DESC",
params,
)
return cur.fetchall()
finally:
conn.close()
def get_automation_averages(dnt_id: int, since: str = None) -> dict:
"""获取本次自动化测试的平均值
since: ISO 时间字符串,只统计此时间之后的测试记录
速度从 dm/s 转换为 m/s÷10
"""
conn = get_conn()
try:
with conn.cursor() as cur:
where = "dnt_id=%s"
params = [dnt_id]
if since:
where += " AND create_time >= %s"
params.append(since)
cur.execute(
f"SELECT AVG(ppvalue) as avg_ppvalue, "
"AVG(idle_freq) as avg_idle_freq, "
"AVG(enter_freq) as avg_enter_freq, "
"AVG(exit_freq) as avg_exit_freq, "
"AVG(enter_dist) as avg_enter_dist, "
"AVG(exit_dist) as avg_exit_dist, "
"AVG(enter_speed) as avg_enter_speed, "
"AVG(exit_speed) as avg_exit_speed "
f"FROM tb_state_tst WHERE {where}",
params,
)
row = cur.fetchone()
finally:
conn.close()
if row:
result = {k: round(v, 2) if v else 0 for k, v in row.items()}
# 速度 dm/s → m/s
if result.get("avg_enter_speed"):
result["avg_enter_speed"] = round(result["avg_enter_speed"] / 10, 2)
if result.get("avg_exit_speed"):
result["avg_exit_speed"] = round(result["avg_exit_speed"] / 10, 2)
return result
return {}
def get_automation_records(dnt_id: int, since: str) -> list[dict]:
"""获取本轮自动化测试的所有记录(含 serialnet 状态)"""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT t.*, sn.state as sn_state "
"FROM tb_state_tst t "
"LEFT JOIN tb_serialnet sn ON sn.dnt_id = t.dnt_id "
" AND sn.state IN (2,3) "
" AND sn.update_time >= t.create_time "
" AND sn.update_time < DATE_ADD(t.create_time, INTERVAL 1 SECOND) "
"WHERE t.dnt_id=%s AND t.create_time >= %s "
"ORDER BY t.id ASC",
(dnt_id, since),
)
return cur.fetchall()
finally:
conn.close()
# ─── 用户管理 ──────────────────────────────────────────────────────
def get_user_by_username(username: str) -> dict | None:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM tb_user WHERE username=%s", (username,))
return cur.fetchone()
finally:
conn.close()
def get_all_users() -> list[dict]:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT id, username, role, is_active, create_time FROM tb_user ORDER BY id")
return cur.fetchall()
finally:
conn.close()
def create_user(username: str, password_hash: str, role: str = "operator"):
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO tb_user (username, password_hash, role) VALUES (%s,%s,%s)",
(username, password_hash, role),
)
conn.commit()
finally:
conn.close()
def update_user(user_id: int, password_hash: str = None, role: str = None, is_active: bool = None):
conn = get_conn()
try:
with conn.cursor() as cur:
parts = []
params = []
if password_hash is not None:
parts.append("password_hash=%s")
params.append(password_hash)
if role is not None:
parts.append("role=%s")
params.append(role)
if is_active is not None:
parts.append("is_active=%s")
params.append(int(is_active))
if parts:
params.append(user_id)
cur.execute(f"UPDATE tb_user SET {', '.join(parts)} WHERE id=%s", params)
conn.commit()
finally:
conn.close()
# ─── 日志管理 ──────────────────────────────────────────────────────
def insert_log(user_id: int, username: str, action_type: str,
target: str = "", detail: str = "", result: str = "ok",
ip: str = ""):
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO tb_log (user_id, username, action_type, target, detail, result, ip) "
"VALUES (%s,%s,%s,%s,%s,%s,%s)",
(user_id, username, action_type, target, detail, result, ip),
)
conn.commit()
finally:
conn.close()
def get_logs(page: int = 1, per_page: int = 30,
username: str = "", action_type: str = "") -> tuple[list[dict], int]:
conn = get_conn()
try:
with conn.cursor() as cur:
where = []
params = []
if username:
where.append("username LIKE %s")
params.append(f"%{username}%")
if action_type:
where.append("action_type=%s")
params.append(action_type)
where_clause = " AND ".join(where) if where else "1=1"
cur.execute(f"SELECT COUNT(*) as total FROM tb_log WHERE {where_clause}", params)
total = cur.fetchone()["total"]
offset = (page - 1) * per_page
cur.execute(
f"SELECT * FROM tb_log WHERE {where_clause} "
f"ORDER BY id DESC LIMIT %s OFFSET %s",
params + [per_page, offset],
)
return cur.fetchall(), total
finally:
conn.close()