Files
vd_test_fixture/edc-web/app/models.py
wangfq 70dd3f8246 feat: 新增 edc-web Flask 前端管理系统 + 需求文档
- edc-web: Flask 项目骨架(设备管理、测试操作、测试信息三大页面)
- edc_server: 升级子模块(tb_serialnet 透传支持)
- docs: 测试工装EDC管理系统需求文档
2026-05-28 09:40:45 +08:00

228 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""MySQL 数据库操作(同步 pymysql"""
import pymysql
from app.config import Config
def get_conn():
return pymysql.connect(
host=Config.MYSQL_HOST,
port=Config.MYSQL_PORT,
user=Config.MYSQL_USER,
password=Config.MYSQL_PASSWORD,
database=Config.MYSQL_DB,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)
# ─── dnt_info ──────────────────────────────────────────────────────
def get_all_devices() -> list[dict]:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT id, serial, name, ip, port, state, version, last_login FROM dnt_info ORDER BY id DESC")
return cur.fetchall()
finally:
conn.close()
def update_device_name(device_id: int, name: str):
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("UPDATE dnt_info SET name=%s WHERE id=%s", (name, device_id))
conn.commit()
finally:
conn.close()
def get_device_by_id(device_id: int) -> dict | None:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM dnt_info WHERE id=%s", (device_id,))
return cur.fetchone()
finally:
conn.close()
# ─── tb_serialnet ──────────────────────────────────────────────────
def insert_serialnet(dnt_id: int, send_pkg: str) -> int:
"""返回 record_id"""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO tb_serialnet (dnt_id, send_pkg) VALUES (%s, %s)",
(dnt_id, send_pkg),
)
conn.commit()
return cur.lastrowid
finally:
conn.close()
def get_serialnet_stats(dnt_id: int) -> dict:
"""返回 {total, pending, sent, done, failed}"""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT state, COUNT(*) as cnt FROM tb_serialnet "
"WHERE dnt_id=%s GROUP BY state",
(dnt_id,),
)
rows = cur.fetchall()
finally:
conn.close()
stats = {"total": 0, "pending": 0, "sent": 0, "done": 0, "failed": 0}
for r in rows:
s = r["state"]
stats["total"] += r["cnt"]
if s == 0:
stats["pending"] = r["cnt"]
elif s == 1:
stats["sent"] = r["cnt"]
elif s == 2:
stats["done"] = r["cnt"]
elif s == 3:
stats["failed"] = r["cnt"]
return stats
def get_serialnet_records(dnt_id: int, limit: int = 50) -> list[dict]:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM tb_serialnet WHERE dnt_id=%s "
"ORDER BY id DESC LIMIT %s",
(dnt_id, limit),
)
return cur.fetchall()
finally:
conn.close()
# ─── tb_state_tst ──────────────────────────────────────────────────
def get_latest_test_state(dnt_id: int) -> dict | None:
"""获取设备最新一条测试状态"""
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM tb_state_tst WHERE dnt_id=%s "
"ORDER BY id DESC LIMIT 1",
(dnt_id,),
)
return cur.fetchone()
finally:
conn.close()
def get_test_data(page: int = 1, per_page: int = 20,
serial: str = "", date_from: str = "",
date_to: str = "") -> tuple[list[dict], int]:
"""分页查询测试数据JOIN dnt_info返回 (records, total)"""
conn = get_conn()
try:
with conn.cursor() as cur:
where = []
params = []
if serial:
where.append("d.serial LIKE %s")
params.append(f"%{serial}%")
if date_from:
where.append("t.create_time >= %s")
params.append(date_from)
if date_to:
where.append("t.create_time <= %s")
params.append(date_to + " 23:59:59")
where_clause = " AND ".join(where) if where else "1=1"
# count
cur.execute(
f"SELECT COUNT(*) as total FROM tb_state_tst t "
f"JOIN dnt_info d ON t.dnt_id = d.id WHERE {where_clause}",
params,
)
total = cur.fetchone()["total"]
# data
offset = (page - 1) * per_page
cur.execute(
f"SELECT t.*, d.serial FROM tb_state_tst t "
f"JOIN dnt_info d ON t.dnt_id = d.id "
f"WHERE {where_clause} "
f"ORDER BY t.id DESC LIMIT %s OFFSET %s",
params + [per_page, offset],
)
records = cur.fetchall()
finally:
conn.close()
return records, total
def get_all_test_data_for_export(serial: str = "", date_from: str = "",
date_to: str = "") -> list[dict]:
"""导出全部数据"""
conn = get_conn()
try:
with conn.cursor() as cur:
where = []
params = []
if serial:
where.append("d.serial LIKE %s")
params.append(f"%{serial}%")
if date_from:
where.append("t.create_time >= %s")
params.append(date_from)
if date_to:
where.append("t.create_time <= %s")
params.append(date_to + " 23:59:59")
where_clause = " AND ".join(where) if where else "1=1"
cur.execute(
f"SELECT t.*, d.serial FROM tb_state_tst t "
f"JOIN dnt_info d ON t.dnt_id = d.id "
f"WHERE {where_clause} ORDER BY t.id DESC",
params,
)
return cur.fetchall()
finally:
conn.close()
def get_automation_averages(dnt_id: int) -> dict:
"""获取自动化测试的平均值(排除失败记录 state=3"""
conn = get_conn()
try:
with conn.cursor() as cur:
# 只计算最近一批自动化测试的平均值
# 取该设备 tb_state_tst 中与 tb_serialnet state=2 对应的记录
cur.execute(
"SELECT AVG(ppvalue) as avg_ppvalue, "
"AVG(idle_freq) as avg_idle_freq, "
"AVG(enter_freq) as avg_enter_freq, "
"AVG(exit_freq) as avg_exit_freq, "
"AVG(enter_dist) as avg_enter_dist, "
"AVG(exit_dist) as avg_exit_dist, "
"AVG(enter_speed) as avg_enter_speed, "
"AVG(exit_speed) as avg_exit_speed "
"FROM tb_state_tst WHERE dnt_id=%s",
(dnt_id,),
)
row = cur.fetchone()
finally:
conn.close()
if row:
return {k: round(v, 2) if v else 0 for k, v in row.items()}
return {}