"""MySQL 数据库操作(同步 pymysql)""" import pymysql from app.config import Config def get_conn(): return pymysql.connect( host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER, password=Config.MYSQL_PASSWORD, database=Config.MYSQL_DB, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, ) # ─── dnt_info ────────────────────────────────────────────────────── def get_all_devices() -> list[dict]: conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT id, serial, name, ip, port, state, version, last_login FROM dnt_info ORDER BY id DESC") return cur.fetchall() finally: conn.close() def update_device_name(device_id: int, name: str): conn = get_conn() try: with conn.cursor() as cur: cur.execute("UPDATE dnt_info SET name=%s WHERE id=%s", (name, device_id)) conn.commit() finally: conn.close() def get_device_by_id(device_id: int) -> dict | None: conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT * FROM dnt_info WHERE id=%s", (device_id,)) return cur.fetchone() finally: conn.close() # ─── tb_serialnet ────────────────────────────────────────────────── def insert_serialnet(dnt_id: int, send_pkg: str) -> int: """返回 record_id""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "INSERT INTO tb_serialnet (dnt_id, send_pkg) VALUES (%s, %s)", (dnt_id, send_pkg), ) conn.commit() return cur.lastrowid finally: conn.close() def get_serialnet_stats(dnt_id: int) -> dict: """返回 {total, pending, sent, done, failed}""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT state, COUNT(*) as cnt FROM tb_serialnet " "WHERE dnt_id=%s GROUP BY state", (dnt_id,), ) rows = cur.fetchall() finally: conn.close() stats = {"total": 0, "pending": 0, "sent": 0, "done": 0, "failed": 0} for r in rows: s = r["state"] stats["total"] += r["cnt"] if s == 0: stats["pending"] = r["cnt"] elif s == 1: stats["sent"] = r["cnt"] elif s == 2: stats["done"] = r["cnt"] elif s == 3: stats["failed"] = r["cnt"] return stats def get_serialnet_records(dnt_id: int, limit: int = 50) -> list[dict]: conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT * FROM tb_serialnet WHERE dnt_id=%s " "ORDER BY id DESC LIMIT %s", (dnt_id, limit), ) return cur.fetchall() finally: conn.close() def clear_serialnet_records(dnt_id: int): """清除指定设备的所有透传记录""" conn = get_conn() try: with conn.cursor() as cur: cur.execute("DELETE FROM tb_serialnet WHERE dnt_id=%s", (dnt_id,)) conn.commit() finally: conn.close() # ─── tb_state_tst ────────────────────────────────────────────────── def get_latest_test_state(dnt_id: int) -> dict | None: """获取设备最新一条测试状态""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT * FROM tb_state_tst WHERE dnt_id=%s " "ORDER BY id DESC LIMIT 1", (dnt_id,), ) return cur.fetchone() finally: conn.close() def get_test_data(page: int = 1, per_page: int = 20, serial: str = "", date_from: str = "", date_to: str = "") -> tuple[list[dict], int]: """分页查询测试数据(JOIN dnt_info),返回 (records, total)""" conn = get_conn() try: with conn.cursor() as cur: where = [] params = [] if serial: where.append("d.serial LIKE %s") params.append(f"%{serial}%") if date_from: where.append("t.create_time >= %s") params.append(date_from) if date_to: where.append("t.create_time <= %s") params.append(date_to + " 23:59:59") where_clause = " AND ".join(where) if where else "1=1" # count cur.execute( f"SELECT COUNT(*) as total FROM tb_state_tst t " f"JOIN dnt_info d ON t.dnt_id = d.id WHERE {where_clause}", params, ) total = cur.fetchone()["total"] # data offset = (page - 1) * per_page cur.execute( f"SELECT t.*, d.serial FROM tb_state_tst t " f"JOIN dnt_info d ON t.dnt_id = d.id " f"WHERE {where_clause} " f"ORDER BY t.id DESC LIMIT %s OFFSET %s", params + [per_page, offset], ) records = cur.fetchall() finally: conn.close() return records, total def get_all_test_data_for_export(serial: str = "", date_from: str = "", date_to: str = "") -> list[dict]: """导出全部数据""" conn = get_conn() try: with conn.cursor() as cur: where = [] params = [] if serial: where.append("d.serial LIKE %s") params.append(f"%{serial}%") if date_from: where.append("t.create_time >= %s") params.append(date_from) if date_to: where.append("t.create_time <= %s") params.append(date_to + " 23:59:59") where_clause = " AND ".join(where) if where else "1=1" cur.execute( f"SELECT t.*, d.serial FROM tb_state_tst t " f"JOIN dnt_info d ON t.dnt_id = d.id " f"WHERE {where_clause} ORDER BY t.id DESC", params, ) return cur.fetchall() finally: conn.close() def get_automation_averages(dnt_id: int) -> dict: """获取自动化测试的平均值(排除失败记录 state=3)""" conn = get_conn() try: with conn.cursor() as cur: # 只计算最近一批自动化测试的平均值 # 取该设备 tb_state_tst 中与 tb_serialnet state=2 对应的记录 cur.execute( "SELECT AVG(ppvalue) as avg_ppvalue, " "AVG(idle_freq) as avg_idle_freq, " "AVG(enter_freq) as avg_enter_freq, " "AVG(exit_freq) as avg_exit_freq, " "AVG(enter_dist) as avg_enter_dist, " "AVG(exit_dist) as avg_exit_dist, " "AVG(enter_speed) as avg_enter_speed, " "AVG(exit_speed) as avg_exit_speed " "FROM tb_state_tst WHERE dnt_id=%s", (dnt_id,), ) row = cur.fetchone() finally: conn.close() if row: return {k: round(v, 2) if v else 0 for k, v in row.items()} return {}