"""MySQL 数据库操作(同步 pymysql)""" import pymysql from app.config import Config def get_conn(): return pymysql.connect( host=Config.MYSQL_HOST, port=Config.MYSQL_PORT, user=Config.MYSQL_USER, password=Config.MYSQL_PASSWORD, database=Config.MYSQL_DB, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, ) # ─── dnt_info ────────────────────────────────────────────────────── def get_all_devices() -> list[dict]: conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT id, serial, name, ip, port, state, version, last_login FROM dnt_info ORDER BY id DESC") return cur.fetchall() finally: conn.close() def update_device_name(device_id: int, name: str): conn = get_conn() try: with conn.cursor() as cur: cur.execute("UPDATE dnt_info SET name=%s WHERE id=%s", (name, device_id)) conn.commit() finally: conn.close() def get_device_by_id(device_id: int) -> dict | None: conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT * FROM dnt_info WHERE id=%s", (device_id,)) return cur.fetchone() finally: conn.close() # ─── tb_serialnet ────────────────────────────────────────────────── def insert_serialnet(dnt_id: int, send_pkg: str) -> int: """返回 record_id""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "INSERT INTO tb_serialnet (dnt_id, send_pkg) VALUES (%s, %s)", (dnt_id, send_pkg), ) conn.commit() return cur.lastrowid finally: conn.close() def get_serialnet_stats(dnt_id: int) -> dict: """返回 {total, pending, sent, done, failed}""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT state, COUNT(*) as cnt FROM tb_serialnet " "WHERE dnt_id=%s GROUP BY state", (dnt_id,), ) rows = cur.fetchall() finally: conn.close() stats = {"total": 0, "pending": 0, "sent": 0, "done": 0, "failed": 0} for r in rows: s = r["state"] stats["total"] += r["cnt"] if s == 0: stats["pending"] = r["cnt"] elif s == 1: stats["sent"] = r["cnt"] elif s == 2: stats["done"] = r["cnt"] elif s == 3: stats["failed"] = r["cnt"] return stats def get_serialnet_records(dnt_id: int, limit: int = 50) -> list[dict]: conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT * FROM tb_serialnet WHERE dnt_id=%s " "ORDER BY id DESC LIMIT %s", (dnt_id, limit), ) return cur.fetchall() finally: conn.close() def get_serialnet_by_id(record_id: int) -> dict | None: """根据 ID 获取 tb_serialnet 记录""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT * FROM tb_serialnet WHERE id=%s", (record_id,), ) return cur.fetchone() finally: conn.close() def clear_serialnet_records(dnt_id: int): """清除指定设备的所有透传记录""" conn = get_conn() try: with conn.cursor() as cur: cur.execute("DELETE FROM tb_serialnet WHERE dnt_id=%s", (dnt_id,)) conn.commit() finally: conn.close() # ─── tb_state_tst ────────────────────────────────────────────────── def get_latest_test_state(dnt_id: int) -> dict | None: """获取设备最新一条测试状态""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT * FROM tb_state_tst WHERE dnt_id=%s " "ORDER BY id DESC LIMIT 1", (dnt_id,), ) return cur.fetchone() finally: conn.close() def get_test_data(page: int = 1, per_page: int = 20, serial: str = "", date_from: str = "", date_to: str = "", test_mode: str = "", data_source: str = "") -> tuple[list[dict], int]: """分页查询测试数据(JOIN dnt_info),返回 (records, total) test_mode: ''=全部, '0'=灵敏度, '1'=波动 data_source: ''=全部, 'B2', 'B4' """ conn = get_conn() try: with conn.cursor() as cur: where = [] params = [] if serial: where.append("d.serial LIKE %s") params.append(f"%{serial}%") if date_from: where.append("t.create_time >= %s") params.append(date_from if len(date_from) > 10 else date_from) if date_to: where.append("t.create_time <= %s") params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59") if test_mode: where.append("t.test_mode = %s") params.append(int(test_mode)) if data_source: where.append("t.data_source = %s") params.append(data_source) where_clause = " AND ".join(where) if where else "1=1" # count cur.execute( f"SELECT COUNT(*) as total FROM tb_state_tst t " f"JOIN dnt_info d ON t.dnt_id = d.id WHERE {where_clause}", params, ) total = cur.fetchone()["total"] # data offset = (page - 1) * per_page cur.execute( f"SELECT t.*, d.serial, " f"c.coil_num, c.name as coil_name, " f"sc.simulate_num, sc.name as car_name " f"FROM tb_state_tst t " f"JOIN dnt_info d ON t.dnt_id = d.id " f"LEFT JOIN tb_coil_info c ON t.coil_id = c.id " f"LEFT JOIN tb_simulate_car sc ON t.simulate_car_id = sc.id " f"WHERE {where_clause} " f"ORDER BY t.id DESC LIMIT %s OFFSET %s", params + [per_page, offset], ) records = cur.fetchall() finally: conn.close() return records, total def get_all_test_data_for_export(serial: str = "", date_from: str = "", date_to: str = "", test_mode: str = "", data_source: str = "") -> list[dict]: """导出全部数据 test_mode: ''=全部, '0'=灵敏度, '1'=波动 data_source: ''=全部, 'B2', 'B4' """ conn = get_conn() try: with conn.cursor() as cur: where = [] params = [] if serial: where.append("d.serial LIKE %s") params.append(f"%{serial}%") if date_from: where.append("t.create_time >= %s") params.append(date_from if len(date_from) > 10 else date_from) if date_to: where.append("t.create_time <= %s") params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59") if test_mode: where.append("t.test_mode = %s") params.append(int(test_mode)) if data_source: where.append("t.data_source = %s") params.append(data_source) where_clause = " AND ".join(where) if where else "1=1" cur.execute( f"SELECT t.*, d.serial, " f"c.coil_num, c.name as coil_name, " f"sc.simulate_num, sc.name as car_name " f"FROM tb_state_tst t " f"JOIN dnt_info d ON t.dnt_id = d.id " f"LEFT JOIN tb_coil_info c ON t.coil_id = c.id " f"LEFT JOIN tb_simulate_car sc ON t.simulate_car_id = sc.id " f"WHERE {where_clause} ORDER BY t.id DESC", params, ) return cur.fetchall() finally: conn.close() def get_automation_averages(dnt_id: int, since: str = None) -> dict: """获取本次自动化测试的平均值 since: ISO 时间字符串,只统计此时间之后的测试记录 速度从 dm/s 转换为 m/s(÷10) """ conn = get_conn() try: with conn.cursor() as cur: where = "dnt_id=%s" params = [dnt_id] if since: where += " AND create_time >= %s" params.append(since) cur.execute( f"SELECT AVG(ppvalue) as avg_ppvalue, " "AVG(idle_freq) as avg_idle_freq, " "AVG(enter_freq) as avg_enter_freq, " "AVG(exit_freq) as avg_exit_freq, " "AVG(enter_dist) as avg_enter_dist, " "AVG(exit_dist) as avg_exit_dist, " "AVG(enter_speed) as avg_enter_speed, " "AVG(exit_speed) as avg_exit_speed " f"FROM tb_state_tst WHERE {where}", params, ) row = cur.fetchone() finally: conn.close() if row: result = {k: round(v, 2) if v else 0 for k, v in row.items()} # 速度 dm/s → m/s if result.get("avg_enter_speed"): result["avg_enter_speed"] = round(result["avg_enter_speed"] / 10, 2) if result.get("avg_exit_speed"): result["avg_exit_speed"] = round(result["avg_exit_speed"] / 10, 2) return result return {} def get_automation_records(dnt_id: int, since: str) -> list[dict]: """获取本轮自动化测试的所有记录(含 serialnet 状态)""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT t.*, sn.state as sn_state " "FROM tb_state_tst t " "LEFT JOIN tb_serialnet sn ON sn.dnt_id = t.dnt_id " " AND sn.state IN (2,3) " " AND sn.update_time >= t.create_time " " AND sn.update_time < DATE_ADD(t.create_time, INTERVAL 1 SECOND) " "WHERE t.dnt_id=%s AND t.create_time >= %s " "ORDER BY t.id ASC", (dnt_id, since), ) return cur.fetchall() finally: conn.close() def get_latest_wave_data(dnt_id: int) -> dict | None: """获取设备最新一条 B4 波动测试数据""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT * FROM tb_state_tst WHERE dnt_id=%s AND data_source='B4' " "ORDER BY id DESC LIMIT 1", (dnt_id,), ) return cur.fetchone() finally: conn.close() def get_wave_records(dnt_id: int, since: str) -> list[dict]: """获取本轮 B4 波动测试明细""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT * FROM tb_state_tst " "WHERE dnt_id=%s AND data_source='B4' AND create_time >= %s " "ORDER BY id ASC", (dnt_id, since), ) return cur.fetchall() finally: conn.close() # ─── 用户管理 ────────────────────────────────────────────────────── def get_user_by_username(username: str) -> dict | None: conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT * FROM tb_user WHERE username=%s", (username,)) return cur.fetchone() finally: conn.close() def get_all_users() -> list[dict]: conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT id, username, role, is_active, create_time FROM tb_user ORDER BY id") return cur.fetchall() finally: conn.close() def create_user(username: str, password_hash: str, role: str = "operator"): conn = get_conn() try: with conn.cursor() as cur: cur.execute( "INSERT INTO tb_user (username, password_hash, role) VALUES (%s,%s,%s)", (username, password_hash, role), ) conn.commit() finally: conn.close() def update_user(user_id: int, password_hash: str = None, role: str = None, is_active: bool = None): conn = get_conn() try: with conn.cursor() as cur: parts = [] params = [] if password_hash is not None: parts.append("password_hash=%s") params.append(password_hash) if role is not None: parts.append("role=%s") params.append(role) if is_active is not None: parts.append("is_active=%s") params.append(int(is_active)) if parts: params.append(user_id) cur.execute(f"UPDATE tb_user SET {', '.join(parts)} WHERE id=%s", params) conn.commit() finally: conn.close() # ─── 日志管理 ────────────────────────────────────────────────────── def insert_log(user_id: int, username: str, action_type: str, target: str = "", detail: str = "", result: str = "ok", ip: str = ""): conn = get_conn() try: with conn.cursor() as cur: cur.execute( "INSERT INTO tb_log (user_id, username, action_type, target, detail, result, ip) " "VALUES (%s,%s,%s,%s,%s,%s,%s)", (user_id, username, action_type, target, detail, result, ip), ) conn.commit() finally: conn.close() def get_logs(page: int = 1, per_page: int = 30, username: str = "", action_type: str = "") -> tuple[list[dict], int]: conn = get_conn() try: with conn.cursor() as cur: where = [] params = [] if username: where.append("username LIKE %s") params.append(f"%{username}%") if action_type: where.append("action_type=%s") params.append(action_type) where_clause = " AND ".join(where) if where else "1=1" cur.execute(f"SELECT COUNT(*) as total FROM tb_log WHERE {where_clause}", params) total = cur.fetchone()["total"] offset = (page - 1) * per_page cur.execute( f"SELECT * FROM tb_log WHERE {where_clause} " f"ORDER BY id DESC LIMIT %s OFFSET %s", params + [per_page, offset], ) return cur.fetchall(), total finally: conn.close() # ─── tb_fixture_param ────────────────────────────────────────────── def get_fixture_param(dnt_id: int) -> dict | None: """获取设备的工装测试参数(含线圈和模拟车辆信息)""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT fp.*, " "c.coil_num, c.name as coil_name, c.shape as coil_shape, " "c.length as coil_length, c.width as coil_width, c.radius as coil_radius, " "c.turns as coil_turns, c.resistance as coil_resistance, " "c.material as coil_material, " "sc.simulate_num, sc.name as car_name, sc.shape as car_shape, " "sc.length as car_length, sc.width as car_width, sc.radius as car_radius, " "sc.material as car_material " "FROM tb_fixture_param fp " "LEFT JOIN tb_coil_info c ON fp.coil_id = c.id " "LEFT JOIN tb_simulate_car sc ON fp.simulate_car_id = sc.id " "WHERE fp.dnt_id=%s", (dnt_id,), ) return cur.fetchone() finally: conn.close() def upsert_fixture_param(dnt_id: int, **kwargs): """插入或更新工装测试参数""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT id FROM tb_fixture_param WHERE dnt_id=%s", (dnt_id,), ) existing = cur.fetchone() # 主线参数字段(不含 coil_id/simulate_car_id,后面单独处理) fields = [ "Addr", "DevType", "TestMode", "RestDis", "MinusDis", "SensMin", "SensMax", "FreMin", "FreMax", "PeakMin", "PeakMax", "FarTol", "NearTol", "StepTol", "BackForth", "NearStay", "FarStay", ] if existing: sets = ", ".join(f"`{f}`=%s" for f in fields) values = [kwargs.get(f, 0) for f in fields] + [dnt_id] cur.execute( f"UPDATE tb_fixture_param SET {sets} WHERE dnt_id=%s", values, ) else: placeholders = ", ".join(["%s"] * len(fields)) col_names = ", ".join(f"`{f}`" for f in fields) values = [kwargs.get(f, 0) for f in fields] cur.execute( f"INSERT INTO tb_fixture_param (dnt_id, {col_names}) " f"VALUES (%s, {placeholders})", [dnt_id] + values, ) # 单独处理线圈/模拟车辆关联(可选,不覆盖已有值) if "coil_id" in kwargs: cur.execute( "UPDATE tb_fixture_param SET coil_id=%s WHERE dnt_id=%s", (kwargs["coil_id"], dnt_id), ) if "simulate_car_id" in kwargs: cur.execute( "UPDATE tb_fixture_param SET simulate_car_id=%s WHERE dnt_id=%s", (kwargs["simulate_car_id"], dnt_id), ) conn.commit() finally: conn.close() # ─── tb_vechicle_base_test ───────────────────────────────────────── def get_vehicle_base_tests(search: str = "") -> list[dict]: """获取车检器测试基准参数列表""" conn = get_conn() try: with conn.cursor() as cur: if search: cur.execute( "SELECT * FROM tb_vechicle_base_test " "WHERE dev_name LIKE %s OR type_num LIKE %s " "ORDER BY type_num ASC", (f"%{search}%", f"%{search}%"), ) else: cur.execute( "SELECT * FROM tb_vechicle_base_test ORDER BY type_num ASC", ) return cur.fetchall() finally: conn.close() def get_vehicle_base_test_by_id(test_id: int) -> dict | None: """根据 ID 获取车检器测试基准""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "SELECT * FROM tb_vechicle_base_test WHERE id=%s", (test_id,), ) return cur.fetchone() finally: conn.close() def create_vehicle_base_test(**kwargs) -> int: """创建车检器测试基准,返回新记录 ID""" conn = get_conn() try: with conn.cursor() as cur: fields = [ "dev_name", "type_num", "SensMin", "SensMax", "FreMin", "FreMax", "PeakMin", "PeakMax", "remark", ] col_names = ", ".join(f"`{f}`" for f in fields) placeholders = ", ".join(["%s"] * len(fields)) values = [kwargs.get(f, "" if f in ("dev_name", "remark") else 0) for f in fields] cur.execute( f"INSERT INTO tb_vechicle_base_test ({col_names}) VALUES ({placeholders})", values, ) conn.commit() return cur.lastrowid finally: conn.close() def update_vehicle_base_test(test_id: int, **kwargs): """更新车检器测试基准""" conn = get_conn() try: with conn.cursor() as cur: fields = [ "dev_name", "type_num", "SensMin", "SensMax", "FreMin", "FreMax", "PeakMin", "PeakMax", "remark", ] sets = ", ".join(f"`{f}`=%s" for f in fields) values = [kwargs.get(f, "" if f in ("dev_name", "remark") else 0) for f in fields] + [test_id] cur.execute( f"UPDATE tb_vechicle_base_test SET {sets} WHERE id=%s", values, ) conn.commit() finally: conn.close() def delete_vehicle_base_test(test_id: int): """删除车检器测试基准""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( "DELETE FROM tb_vechicle_base_test WHERE id=%s", (test_id,), ) conn.commit() finally: conn.close() # ─── 线圈参数 CRUD ────────────────────────────────────────────────── def get_coil_info_list(search: str = "") -> list[dict]: """获取线圈参数列表""" conn = get_conn() try: with conn.cursor() as cur: if search: cur.execute( "SELECT * FROM tb_coil_info WHERE coil_num LIKE %s OR name LIKE %s " "ORDER BY id DESC", (f"%{search}%", f"%{search}%"), ) else: cur.execute("SELECT * FROM tb_coil_info ORDER BY id DESC") return cur.fetchall() finally: conn.close() def get_coil_info_by_id(coil_id: int) -> dict | None: """获取单个线圈参数""" conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT * FROM tb_coil_info WHERE id=%s", (coil_id,)) return cur.fetchone() finally: conn.close() def create_coil_info(**kwargs) -> int: """创建线圈参数,返回新 ID""" conn = get_conn() try: with conn.cursor() as cur: fields = [ "coil_num", "name", "induct", "shape", "length", "width", "radius", "turns", "resistance", "material", "remark", ] col_names = ", ".join(f"`{f}`" for f in fields) placeholders = ", ".join(["%s"] * len(fields)) values = [kwargs.get(f, "" if f in ("coil_num", "name", "shape", "material", "remark") else 0) for f in fields] cur.execute( f"INSERT INTO tb_coil_info ({col_names}) VALUES ({placeholders})", values, ) conn.commit() return cur.lastrowid finally: conn.close() def update_coil_info(coil_id: int, **kwargs): """更新线圈参数""" conn = get_conn() try: with conn.cursor() as cur: fields = [ "coil_num", "name", "induct", "shape", "length", "width", "radius", "turns", "resistance", "material", "remark", ] sets = ", ".join(f"`{f}`=%s" for f in fields) values = [kwargs.get(f, "" if f in ("coil_num", "name", "shape", "material", "remark") else 0) for f in fields] + [coil_id] cur.execute( f"UPDATE tb_coil_info SET {sets} WHERE id=%s", values, ) conn.commit() finally: conn.close() def delete_coil_info(coil_id: int): """删除线圈参数""" conn = get_conn() try: with conn.cursor() as cur: cur.execute("DELETE FROM tb_coil_info WHERE id=%s", (coil_id,)) conn.commit() finally: conn.close() # ─── 模拟车辆参数 CRUD ────────────────────────────────────────────── def get_simulate_car_list(search: str = "") -> list[dict]: """获取模拟车辆参数列表""" conn = get_conn() try: with conn.cursor() as cur: if search: cur.execute( "SELECT * FROM tb_simulate_car WHERE simulate_num LIKE %s OR name LIKE %s " "ORDER BY id DESC", (f"%{search}%", f"%{search}%"), ) else: cur.execute("SELECT * FROM tb_simulate_car ORDER BY id DESC") return cur.fetchall() finally: conn.close() def get_simulate_car_by_id(car_id: int) -> dict | None: """获取单个模拟车辆参数""" conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT * FROM tb_simulate_car WHERE id=%s", (car_id,)) return cur.fetchone() finally: conn.close() def create_simulate_car(**kwargs) -> int: """创建模拟车辆参数,返回新 ID""" conn = get_conn() try: with conn.cursor() as cur: fields = [ "simulate_num", "name", "shape", "length", "width", "radius", "material", "remark", ] col_names = ", ".join(f"`{f}`" for f in fields) placeholders = ", ".join(["%s"] * len(fields)) values = [kwargs.get(f, "" if f in ("simulate_num", "name", "shape", "material", "remark") else 0) for f in fields] cur.execute( f"INSERT INTO tb_simulate_car ({col_names}) VALUES ({placeholders})", values, ) conn.commit() return cur.lastrowid finally: conn.close() def update_simulate_car(car_id: int, **kwargs): """更新模拟车辆参数""" conn = get_conn() try: with conn.cursor() as cur: fields = [ "simulate_num", "name", "shape", "length", "width", "radius", "material", "remark", ] sets = ", ".join(f"`{f}`=%s" for f in fields) values = [kwargs.get(f, "" if f in ("simulate_num", "name", "shape", "material", "remark") else 0) for f in fields] + [car_id] cur.execute( f"UPDATE tb_simulate_car SET {sets} WHERE id=%s", values, ) conn.commit() finally: conn.close() def delete_simulate_car(car_id: int): """删除模拟车辆参数""" conn = get_conn() try: with conn.cursor() as cur: cur.execute("DELETE FROM tb_simulate_car WHERE id=%s", (car_id,)) conn.commit() finally: conn.close() # ─── 测试数据删除 ────────────────────────────────────────────── def delete_test_data(serial: str = "", date_from: str = "", date_to: str = "", data_source: str = "") -> int: """删除符合条件的测试数据,返回删除行数 必须至少提供一个条件(serial / date范围 / data_source),不允许无条件全删。 """ conn = get_conn() try: with conn.cursor() as cur: where = [] params = [] if serial: where.append("t.dnt_id IN (SELECT id FROM dnt_info WHERE serial LIKE %s)") params.append(f"%{serial}%") if date_from: where.append("t.create_time >= %s") params.append(date_from if len(date_from) > 10 else date_from) if date_to: where.append("t.create_time <= %s") params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59") if data_source: where.append("t.data_source = %s") params.append(data_source) if not where: return 0 # 拒绝无条件全删 where_clause = " AND ".join(where) cur.execute( f"SELECT COUNT(*) as cnt FROM tb_state_tst t WHERE {where_clause}", params, ) cnt = cur.fetchone()["cnt"] cur.execute( f"DELETE t FROM tb_state_tst t WHERE {where_clause}", params, ) conn.commit() return cnt finally: conn.close() # ─── tb_device_log ───────────────────────────────────────────────── def get_device_logs(page: int = 1, per_page: int = 30, serial: str = "", event_type: str = "", date_from: str = "", date_to: str = "") -> tuple[list[dict], int]: """分页查询设备事件日志,返回 (records, total)""" conn = get_conn() try: with conn.cursor() as cur: where = [] params = [] if serial: where.append("device_serial LIKE %s") params.append(f"%{serial}%") if event_type: where.append("event_type = %s") params.append(event_type) if date_from: where.append("create_time >= %s") params.append(date_from if len(date_from) > 10 else date_from) if date_to: where.append("create_time <= %s") params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59") where_clause = " AND ".join(where) if where else "1=1" cur.execute( f"SELECT COUNT(*) as total FROM tb_device_log WHERE {where_clause}", params, ) total = cur.fetchone()["total"] offset = (page - 1) * per_page cur.execute( f"SELECT * FROM tb_device_log WHERE {where_clause} " f"ORDER BY id DESC LIMIT %s OFFSET %s", params + [per_page, offset], ) return cur.fetchall(), total finally: conn.close() def export_device_logs(serial: str = "", event_type: str = "", date_from: str = "", date_to: str = "") -> list[dict]: """导出全部设备事件日志(不分页)""" conn = get_conn() try: with conn.cursor() as cur: where = [] params = [] if serial: where.append("device_serial LIKE %s") params.append(f"%{serial}%") if event_type: where.append("event_type = %s") params.append(event_type) if date_from: where.append("create_time >= %s") params.append(date_from if len(date_from) > 10 else date_from) if date_to: where.append("create_time <= %s") params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59") where_clause = " AND ".join(where) if where else "1=1" cur.execute( f"SELECT * FROM tb_device_log WHERE {where_clause} " f"ORDER BY id DESC", params, ) return cur.fetchall() finally: conn.close() def delete_device_logs(serial: str = "", event_type: str = "", date_from: str = "", date_to: str = "") -> int: """删除符合条件的设备日志,返回删除行数。至少需要一个条件。""" conn = get_conn() try: with conn.cursor() as cur: where = [] params = [] if serial: where.append("device_serial LIKE %s") params.append(f"%{serial}%") if event_type: where.append("event_type = %s") params.append(event_type) if date_from: where.append("create_time >= %s") params.append(date_from if len(date_from) > 10 else date_from) if date_to: where.append("create_time <= %s") params.append(date_to if len(date_to) > 10 else date_to + " 23:59:59") if not where: return 0 where_clause = " AND ".join(where) cur.execute( f"SELECT COUNT(*) as cnt FROM tb_device_log WHERE {where_clause}", params, ) cnt = cur.fetchone()["cnt"] cur.execute( f"DELETE FROM tb_device_log WHERE {where_clause}", params, ) conn.commit() return cnt finally: conn.close() # ─── tb_pending_detector ─────────────────────────────────────────── def set_pending_detector_serial(dnt_id: int, detector_serial: str): """设置待插入的车检器序列号(UPSERT)""" conn = get_conn() try: with conn.cursor() as cur: cur.execute( """INSERT INTO tb_pending_detector (dnt_id, detector_serial) VALUES (%s, %s) ON DUPLICATE KEY UPDATE detector_serial = VALUES(detector_serial)""", (dnt_id, detector_serial), ) conn.commit() finally: conn.close()