Files
vd_test_fixture/edc-web/app/routes/test_op.py

79 lines
2.1 KiB
Python

"""测试操作 API"""
import time
from flask import Blueprint, jsonify, render_template, request
from app.models import (
get_device_by_id,
insert_serialnet,
get_serialnet_stats,
get_latest_test_state,
get_automation_averages,
clear_serialnet_records,
)
bp = Blueprint("test_op", __name__)
# DG430 指令 (addr=0x01, ADDR=0x81)
# XOR/SUM 预计算值
COMMANDS = {
"B0": "7F8101B03032", # 开始测试
"B1": "7F8101B13133", # 测试复原
"BA": "7F8101BA3A3C", # 电机前进
"BB": "7F8101BB3B3D", # 电机后退
"BC": "7F8101BC3C3E", # 电机停止
}
@bp.route("/test/<int:dnt_id>")
def test_page(dnt_id):
"""测试操作页面"""
device = get_device_by_id(dnt_id)
if not device:
return "设备不存在", 404
return render_template("test_op.html", device=device)
@bp.route("/api/command", methods=["POST"])
def api_command():
"""发送单次指令"""
data = request.get_json()
dnt_id = data.get("dnt_id")
cmd = data.get("cmd", "").upper()
if cmd not in COMMANDS:
return jsonify({"ok": False, "error": f"未知指令: {cmd}"}), 400
send_pkg = COMMANDS[cmd]
record_id = insert_serialnet(dnt_id, send_pkg)
return jsonify({"ok": True, "record_id": record_id, "send_pkg": send_pkg})
@bp.route("/api/automation/start", methods=["POST"])
def api_automation_start():
"""开始自动化测试"""
data = request.get_json()
dnt_id = data.get("dnt_id")
count = int(data.get("count", 1))
# 清除旧记录,然后插入第一条 0xB0
clear_serialnet_records(dnt_id)
record_id = insert_serialnet(dnt_id, COMMANDS["B0"])
return jsonify({
"ok": True,
"total": count,
"first_record_id": record_id,
})
@bp.route("/api/automation/<int:dnt_id>/progress")
def api_automation_progress(dnt_id):
"""获取自动化进度"""
stats = get_serialnet_stats(dnt_id)
latest = get_latest_test_state(dnt_id)
averages = get_automation_averages(dnt_id)
return jsonify({
"stats": stats,
"latest": latest,
"averages": averages,
})