Files
vd_test_fixture/edc-web/app/routes/test_op.py
wangfq 521cbe4107 feat: edc-web 支持车检器序列号输入与显示
- 自动化测试区域新增「车检器序列号」输入框
- 回车键自动触发「开始」按钮
- /api/automation/start 接收 detector_serial,写入 tb_pending_detector
- 测试操作页 + 测试信息页(全部/B2/B4)显示序列号,空时显示 '-'
- 页面加载和测试结束后焦点自动回到序列号输入框(全选),方便连续测试
2026-06-15 10:02:51 +08:00

139 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""测试操作 API"""
from flask import Blueprint, jsonify, render_template, request
from flask_login import login_required, current_user
from app.models import (
get_device_by_id,
insert_serialnet,
get_serialnet_stats,
get_latest_test_state,
get_automation_averages,
get_automation_records,
get_latest_wave_data,
get_wave_records,
clear_serialnet_records,
insert_log,
set_pending_detector_serial,
)
bp = Blueprint("test_op", __name__)
# DG430 指令 (addr=0x01, ADDR=0x81)
COMMANDS = {
"B0": "7F8101B03032", # 开始测试
"B1": "7F8101B13133", # 测试复原
"BA": "7F8101BA3A3C", # 电机前进
"BB": "7F8101BB3B3D", # 电机后退
"BC": "7F8101BC3C3E", # 电机停止
}
CMD_NAMES = {
"B0": "开始测试",
"B1": "测试复原",
"BA": "电机前进",
"BB": "电机后退",
"BC": "电机停止",
}
@bp.route("/test/<int:dnt_id>")
@login_required
def test_page(dnt_id):
"""测试操作页面"""
device = get_device_by_id(dnt_id)
if not device:
return "设备不存在", 404
return render_template("test_op.html", device=device)
@bp.route("/api/command", methods=["POST"])
@login_required
def api_command():
"""发送单次指令"""
data = request.get_json()
dnt_id = data.get("dnt_id")
cmd = data.get("cmd", "").upper()
if cmd not in COMMANDS:
return jsonify({"ok": False, "error": f"未知指令: {cmd}"}), 400
device = get_device_by_id(dnt_id)
target = f"{device['serial']}" if device else f"dnt_id={dnt_id}"
send_pkg = COMMANDS[cmd]
cmd_name = CMD_NAMES.get(cmd, cmd)
try:
record_id = insert_serialnet(dnt_id, send_pkg)
insert_log(
current_user.id, current_user.username, "command",
target=target,
detail=f"{cmd_name}({cmd}) → {send_pkg}",
result="ok",
ip=request.remote_addr or "",
)
return jsonify({"ok": True, "record_id": record_id, "send_pkg": send_pkg})
except Exception as e:
insert_log(
current_user.id, current_user.username, "command",
target=target,
detail=f"{cmd_name}({cmd}) 失败: {e}",
result="error",
ip=request.remote_addr or "",
)
return jsonify({"ok": False, "error": str(e)}), 500
@bp.route("/api/automation/start", methods=["POST"])
@login_required
def api_automation_start():
"""开始自动化测试"""
data = request.get_json()
dnt_id = data.get("dnt_id")
count = int(data.get("count", 1))
detector_serial = (data.get("detector_serial") or "").strip()
device = get_device_by_id(dnt_id)
target = f"{device['serial']}" if device else f"dnt_id={dnt_id}"
# 存储待插入的车检器序列号
if detector_serial:
set_pending_detector_serial(dnt_id, detector_serial)
# 清除旧记录,然后插入第一条 0xB0
clear_serialnet_records(dnt_id)
record_id = insert_serialnet(dnt_id, COMMANDS["B0"])
insert_log(
current_user.id, current_user.username, "command",
target=target,
detail=f"自动化测试开始 ×{count}",
result="ok",
ip=request.remote_addr or "",
)
return jsonify({
"ok": True,
"total": count,
"first_record_id": record_id,
})
@bp.route("/api/automation/<int:dnt_id>/progress")
@login_required
def api_automation_progress(dnt_id):
"""获取自动化进度"""
since = request.args.get("since", "", type=str)
stats = get_serialnet_stats(dnt_id)
latest = get_latest_test_state(dnt_id)
averages = get_automation_averages(dnt_id, since if since else None)
records = get_automation_records(dnt_id, since) if since else []
latest_wave = get_latest_wave_data(dnt_id)
wave_records = get_wave_records(dnt_id, since) if since else []
return jsonify({
"stats": stats,
"latest": latest,
"averages": averages,
"records": records,
"latest_wave": latest_wave,
"wave_records": wave_records,
})