"""工装配置 API""" from flask import Blueprint, jsonify, render_template, request from flask_login import login_required, current_user from app.models import ( get_device_by_id, insert_serialnet, get_fixture_param, upsert_fixture_param, get_vehicle_base_tests, get_vehicle_base_test_by_id, create_vehicle_base_test, update_vehicle_base_test, delete_vehicle_base_test, get_serialnet_by_id, insert_log, ) bp = Blueprint("fixture", __name__) # DG430 工装配置指令 (addr=0x01) FIXTURE_COMMANDS = { "4A": "7F81014ACACC", # 获取设备版本号 "4C": "7F81014CCCCE", # 查询设备测试参数 "4D": "7F81014DCDCF", # 出厂初始化 "4E": "7F81014ECED0", # 设备复位 } CMD_NAMES = { "4A": "获取设备版本号", "4B": "配置设备测试参数", "4C": "查询设备测试参数", "4D": "出厂初始化", "4E": "设备复位", } def _xor_sum(data: bytes) -> tuple[int, int]: """计算异或和校验 (从 ADDR 到 DATA 末)""" xor = 0 s = 0 for b in data: xor ^= b s += b return xor & 0xFF, s & 0xFF def _le16(val: int) -> bytes: """int → 小端 2 字节""" return bytes([val & 0xFF, (val >> 8) & 0xFF]) def _be16(val: int) -> bytes: """int → 大端 2 字节""" return bytes([(val >> 8) & 0xFF, val & 0xFF]) def build_4b_packet(addr: int, dev_type: int, test_mode: int, reset_dis: int, minus_dis: int, sens_min: int, sens_max: int, fre_min: int, fre_max: int, peak_min: int, peak_max: int) -> str: """构造 0x4B 配置指令 hex 字符串 格式: 7F | 81 | 12 | 4B | Addr(1) | DevType(1) | TestMode(1) | ResetDis(1) | MinusDis(1) | SensMin(2 LE) | SensMax(2 LE) | FreMin(2 LE) | FreMax(2 LE) | PeakMin(2 LE) | PeakMax(2 LE) | XOR | SUM """ payload = bytes([ 0x4B, # CMD addr & 0xFF, dev_type & 0xFF, test_mode & 0xFF, reset_dis & 0xFF, minus_dis & 0xFF, ]) payload += (_le16(sens_min) + _le16(sens_max) + _le16(fre_min) + _le16(fre_max) + _le16(peak_min) + _le16(peak_max)) pkt = bytes([0x7F, 0x81, len(payload)]) + payload xor, total = _xor_sum(pkt[1:]) pkt += bytes([xor, total]) return pkt.hex().upper() # ─── 页面 ─────────────────────────────────────────────────────────── @bp.route("/fixture/") @login_required def fixture_page(dnt_id): """工装配置页面""" device = get_device_by_id(dnt_id) if not device: return "设备不存在", 404 return render_template("fixture.html", device=device) @bp.route("/vehicle-base-test") @login_required def vehicle_base_test_page(): """车检器测试基准参数管理页面""" return render_template("vehicle_base_test.html") # ─── 工装配置指令 API ────────────────────────────────────────────── @bp.route("/api/fixture/command", methods=["POST"]) @login_required def api_fixture_command(): """发送工装配置指令 (0x4A/0x4B/0x4C/0x4D/0x4E)""" data = request.get_json() dnt_id = data.get("dnt_id") cmd = data.get("cmd", "").upper() device = get_device_by_id(dnt_id) target = f"{device['serial']}" if device else f"dnt_id={dnt_id}" if cmd == "4B": # 动态构造 0x4B 指令 params = data.get("params", {}) send_pkg = build_4b_packet( addr=params.get("addr", 1), dev_type=params.get("dev_type", 0), test_mode=params.get("test_mode", 0), reset_dis=params.get("reset_dis", 0), minus_dis=params.get("minus_dis", 0), sens_min=params.get("sens_min", 0), sens_max=params.get("sens_max", 0), fre_min=params.get("fre_min", 0), fre_max=params.get("fre_max", 0), peak_min=params.get("peak_min", 0), peak_max=params.get("peak_max", 0), ) elif cmd in FIXTURE_COMMANDS: send_pkg = FIXTURE_COMMANDS[cmd] else: return jsonify({"ok": False, "error": f"未知指令: {cmd}"}), 400 cmd_name = CMD_NAMES.get(cmd, cmd) try: record_id = insert_serialnet(dnt_id, send_pkg) insert_log( current_user.id, current_user.username, "command", target=target, detail=f"工装配置: {cmd_name}(0x{cmd}) → {send_pkg}", result="ok", ip=request.remote_addr or "", ) return jsonify({"ok": True, "record_id": record_id, "send_pkg": send_pkg}) except Exception as e: insert_log( current_user.id, current_user.username, "command", target=target, detail=f"工装配置 {cmd_name}(0x{cmd}) 失败: {e}", result="error", ip=request.remote_addr or "", ) return jsonify({"ok": False, "error": str(e)}), 500 @bp.route("/api/fixture/serialnet/") @login_required def api_get_serialnet(record_id): """查询 tb_serialnet 记录状态和返回数据""" rec = get_serialnet_by_id(record_id) if not rec: return jsonify({"error": "记录不存在"}), 404 return jsonify({ "id": rec["id"], "state": rec["state"], "send_pkg": rec.get("send_pkg", ""), "rcv_pkg": rec.get("rcv_pkg", ""), "create_time": str(rec.get("create_time", "")), "update_time": str(rec.get("update_time", "")), }) # ─── 工装参数 CRUD API ───────────────────────────────────────────── @bp.route("/api/fixture/param/") @login_required def api_get_fixture_param(dnt_id): """获取工装测试参数""" param = get_fixture_param(dnt_id) return jsonify(param or {}) @bp.route("/api/fixture/param/", methods=["POST"]) @login_required def api_save_fixture_param(dnt_id): """保存工装测试参数(仅数据库,不下发设备)""" data = request.get_json() if not data: return jsonify({"ok": False, "error": "数据为空"}), 400 upsert_fixture_param(dnt_id, **data) return jsonify({"ok": True}) # ─── 车检器测试基准参数 CRUD API ────────────────────────────────── @bp.route("/api/vehicle-base-test") @login_required def api_list_vehicle_base_tests(): """列出车检器测试基准参数""" search = request.args.get("search", "") tests = get_vehicle_base_tests(search) return jsonify(tests) @bp.route("/api/vehicle-base-test/") @login_required def api_get_vehicle_base_test(test_id): """获取单个车检器测试基准""" test = get_vehicle_base_test_by_id(test_id) if not test: return jsonify({"error": "不存在"}), 404 return jsonify(test) @bp.route("/api/vehicle-base-test", methods=["POST"]) @login_required def api_create_vehicle_base_test(): """创建车检器测试基准""" data = request.get_json() if not data: return jsonify({"ok": False, "error": "数据为空"}), 400 try: test_id = create_vehicle_base_test(**data) return jsonify({"ok": True, "id": test_id}) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 @bp.route("/api/vehicle-base-test/", methods=["PUT"]) @login_required def api_update_vehicle_base_test(test_id): """更新车检器测试基准""" data = request.get_json() if not data: return jsonify({"ok": False, "error": "数据为空"}), 400 try: update_vehicle_base_test(test_id, **data) return jsonify({"ok": True}) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500 @bp.route("/api/vehicle-base-test/", methods=["DELETE"]) @login_required def api_delete_vehicle_base_test(test_id): """删除车检器测试基准""" try: delete_vehicle_base_test(test_id) return jsonify({"ok": True}) except Exception as e: return jsonify({"ok": False, "error": str(e)}), 500