feat: 用户登录/管理 + 操作日志模块

- tb_user 用户表、tb_log 日志表
- Flask-Login 认证(login/logout/权限装饰器)
- 用户管理页(admin 专有):增删改查、改密、角色设置
- 操作日志页:分页查询、按用户/类型筛选
- 测试操作区指令自动记录日志
- 所有页面加 @login_required 保护
- 默认管理员 admin/admin123(首次启动自动创建)
This commit is contained in:
wangfq
2026-05-28 13:58:19 +08:00
parent 56e3b03121
commit 322563dab0
19 changed files with 614 additions and 5 deletions

View File

@@ -8,13 +8,43 @@ def create_app() -> Flask:
app = Flask(__name__)
app.config.from_object(Config)
# 初始化认证
from app.auth import init_auth, auth_bp
init_auth(app)
app.register_blueprint(auth_bp)
# 注册蓝图
from app.routes.devices import bp as devices_bp
from app.routes.test_op import bp as test_op_bp
from app.routes.test_data import bp as test_data_bp
from app.routes.users import bp as users_bp
from app.routes.logs import bp as logs_bp
app.register_blueprint(devices_bp)
app.register_blueprint(test_op_bp)
app.register_blueprint(test_data_bp)
app.register_blueprint(users_bp)
app.register_blueprint(logs_bp)
# 初始化默认管理员
_ensure_admin()
return app
def _ensure_admin():
"""如果没有任何用户,创建默认 admin/admin123"""
from app.models import get_conn
from werkzeug.security import generate_password_hash
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) as cnt FROM tb_user")
if cur.fetchone()["cnt"] == 0:
cur.execute(
"INSERT INTO tb_user (username, password_hash, role) VALUES (%s,%s,%s)",
("admin", generate_password_hash("admin123"), "admin"),
)
conn.commit()
finally:
conn.close()

88
edc-web/app/auth.py Normal file
View File

@@ -0,0 +1,88 @@
"""认证模块 — Flask-Login 集成"""
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from werkzeug.security import check_password_hash
from app.models import get_user_by_username, insert_log
auth_bp = Blueprint("auth", __name__)
login_manager = LoginManager()
login_manager.login_view = "auth.login"
login_manager.login_message = "请先登录"
class User:
"""Flask-Login 用户对象"""
def __init__(self, user_dict):
self.id = user_dict["id"]
self.username = user_dict["username"]
self.role = user_dict["role"]
self.is_active = bool(user_dict.get("is_active", 1))
@property
def is_authenticated(self):
return True
def get_id(self):
return str(self.id)
@login_manager.user_loader
def load_user(user_id):
conn = __import__("app.models", fromlist=["get_conn"]).get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM tb_user WHERE id=%s", (int(user_id),))
row = cur.fetchone()
finally:
conn.close()
return User(row) if row else None
def init_auth(app):
login_manager.init_app(app)
# ─── 装饰器 ────────────────────────────────────────────────────────
def admin_required(f):
"""要求 admin 角色"""
from functools import wraps
@wraps(f)
@login_required
def wrapper(*args, **kwargs):
if current_user.role != "admin":
return "权限不足", 403
return f(*args, **kwargs)
return wrapper
# ─── 登录 / 登出 ────────────────────────────────────────────────────
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
user_dict = get_user_by_username(username)
ip = request.remote_addr or ""
if user_dict and user_dict.get("is_active") and check_password_hash(user_dict["password_hash"], password):
user = User(user_dict)
login_user(user)
insert_log(user.id, user.username, "login", ip=ip, result="ok")
next_page = request.args.get("next")
return redirect(next_page or url_for("devices.index"))
else:
insert_log(0, username, "login", detail="密码错误或账号禁用", ip=ip, result="error")
flash("用户名或密码错误")
return render_template("login.html")
@auth_bp.route("/logout")
@login_required
def logout():
insert_log(current_user.id, current_user.username, "logout",
ip=request.remote_addr or "", result="ok")
logout_user()
return redirect(url_for("auth.login"))

View File

@@ -236,3 +236,108 @@ def get_automation_averages(dnt_id: int) -> dict:
if row:
return {k: round(v, 2) if v else 0 for k, v in row.items()}
return {}
# ─── 用户管理 ──────────────────────────────────────────────────────
def get_user_by_username(username: str) -> dict | None:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM tb_user WHERE username=%s", (username,))
return cur.fetchone()
finally:
conn.close()
def get_all_users() -> list[dict]:
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT id, username, role, is_active, create_time FROM tb_user ORDER BY id")
return cur.fetchall()
finally:
conn.close()
def create_user(username: str, password_hash: str, role: str = "operator"):
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO tb_user (username, password_hash, role) VALUES (%s,%s,%s)",
(username, password_hash, role),
)
conn.commit()
finally:
conn.close()
def update_user(user_id: int, password_hash: str = None, role: str = None, is_active: bool = None):
conn = get_conn()
try:
with conn.cursor() as cur:
parts = []
params = []
if password_hash is not None:
parts.append("password_hash=%s")
params.append(password_hash)
if role is not None:
parts.append("role=%s")
params.append(role)
if is_active is not None:
parts.append("is_active=%s")
params.append(int(is_active))
if parts:
params.append(user_id)
cur.execute(f"UPDATE tb_user SET {', '.join(parts)} WHERE id=%s", params)
conn.commit()
finally:
conn.close()
# ─── 日志管理 ──────────────────────────────────────────────────────
def insert_log(user_id: int, username: str, action_type: str,
target: str = "", detail: str = "", result: str = "ok",
ip: str = ""):
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO tb_log (user_id, username, action_type, target, detail, result, ip) "
"VALUES (%s,%s,%s,%s,%s,%s,%s)",
(user_id, username, action_type, target, detail, result, ip),
)
conn.commit()
finally:
conn.close()
def get_logs(page: int = 1, per_page: int = 30,
username: str = "", action_type: str = "") -> tuple[list[dict], int]:
conn = get_conn()
try:
with conn.cursor() as cur:
where = []
params = []
if username:
where.append("username LIKE %s")
params.append(f"%{username}%")
if action_type:
where.append("action_type=%s")
params.append(action_type)
where_clause = " AND ".join(where) if where else "1=1"
cur.execute(f"SELECT COUNT(*) as total FROM tb_log WHERE {where_clause}", params)
total = cur.fetchone()["total"]
offset = (page - 1) * per_page
cur.execute(
f"SELECT * FROM tb_log WHERE {where_clause} "
f"ORDER BY id DESC LIMIT %s OFFSET %s",
params + [per_page, offset],
)
return cur.fetchall(), total
finally:
conn.close()

View File

@@ -1,12 +1,14 @@
"""设备页面 API"""
from flask import Blueprint, jsonify, render_template, request
from flask_login import login_required
from app.models import get_all_devices, update_device_name
bp = Blueprint("devices", __name__)
@bp.route("/")
@login_required
def index():
"""设备列表页(默认首页)"""
return render_template("devices.html")

View File

@@ -0,0 +1,32 @@
"""日志查询 API"""
from flask import Blueprint, jsonify, render_template, request
from flask_login import login_required
from app.auth import admin_required
from app.models import get_logs
bp = Blueprint("logs", __name__, url_prefix="/logs")
@bp.route("/")
@admin_required
def logs_page():
return render_template("logs.html")
@bp.route("/api/logs")
@admin_required
def api_logs():
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 30, type=int)
username = request.args.get("username", "", type=str)
action_type = request.args.get("action_type", "", type=str)
records, total = get_logs(page, per_page, username, action_type)
return jsonify({
"records": records,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page if total > 0 else 1,
})

View File

@@ -3,6 +3,7 @@
import csv
import io
from flask import Blueprint, jsonify, render_template, request, Response
from flask_login import login_required
from app.models import get_test_data, get_all_test_data_for_export
bp = Blueprint("test_data", __name__)

View File

@@ -1,7 +1,7 @@
"""测试操作 API"""
import time
from flask import Blueprint, jsonify, render_template, request
from flask_login import login_required, current_user
from app.models import (
get_device_by_id,
insert_serialnet,
@@ -9,12 +9,12 @@ from app.models import (
get_latest_test_state,
get_automation_averages,
clear_serialnet_records,
insert_log,
)
bp = Blueprint("test_op", __name__)
# DG430 指令 (addr=0x01, ADDR=0x81)
# XOR/SUM 预计算值
COMMANDS = {
"B0": "7F8101B03032", # 开始测试
"B1": "7F8101B13133", # 测试复原
@@ -23,8 +23,17 @@ COMMANDS = {
"BC": "7F8101BC3C3E", # 电机停止
}
CMD_NAMES = {
"B0": "开始测试",
"B1": "测试复原",
"BA": "电机前进",
"BB": "电机后退",
"BC": "电机停止",
}
@bp.route("/test/<int:dnt_id>")
@login_required
def test_page(dnt_id):
"""测试操作页面"""
device = get_device_by_id(dnt_id)
@@ -34,6 +43,7 @@ def test_page(dnt_id):
@bp.route("/api/command", methods=["POST"])
@login_required
def api_command():
"""发送单次指令"""
data = request.get_json()
@@ -43,21 +53,54 @@ def api_command():
if cmd not in COMMANDS:
return jsonify({"ok": False, "error": f"未知指令: {cmd}"}), 400
device = get_device_by_id(dnt_id)
target = f"{device['serial']}" if device else f"dnt_id={dnt_id}"
send_pkg = COMMANDS[cmd]
record_id = insert_serialnet(dnt_id, send_pkg)
return jsonify({"ok": True, "record_id": record_id, "send_pkg": send_pkg})
cmd_name = CMD_NAMES.get(cmd, cmd)
try:
record_id = insert_serialnet(dnt_id, send_pkg)
insert_log(
current_user.id, current_user.username, "command",
target=target,
detail=f"{cmd_name}({cmd}) → {send_pkg}",
result="ok",
ip=request.remote_addr or "",
)
return jsonify({"ok": True, "record_id": record_id, "send_pkg": send_pkg})
except Exception as e:
insert_log(
current_user.id, current_user.username, "command",
target=target,
detail=f"{cmd_name}({cmd}) 失败: {e}",
result="error",
ip=request.remote_addr or "",
)
return jsonify({"ok": False, "error": str(e)}), 500
@bp.route("/api/automation/start", methods=["POST"])
@login_required
def api_automation_start():
"""开始自动化测试"""
data = request.get_json()
dnt_id = data.get("dnt_id")
count = int(data.get("count", 1))
device = get_device_by_id(dnt_id)
target = f"{device['serial']}" if device else f"dnt_id={dnt_id}"
# 清除旧记录,然后插入第一条 0xB0
clear_serialnet_records(dnt_id)
record_id = insert_serialnet(dnt_id, COMMANDS["B0"])
insert_log(
current_user.id, current_user.username, "command",
target=target,
detail=f"自动化测试开始 ×{count}",
result="ok",
ip=request.remote_addr or "",
)
return jsonify({
"ok": True,
"total": count,
@@ -66,6 +109,7 @@ def api_automation_start():
@bp.route("/api/automation/<int:dnt_id>/progress")
@login_required
def api_automation_progress(dnt_id):
"""获取自动化进度"""
stats = get_serialnet_stats(dnt_id)

View File

@@ -0,0 +1,54 @@
"""用户管理 API"""
from flask import Blueprint, jsonify, render_template, request
from flask_login import login_required, current_user
from werkzeug.security import generate_password_hash
from app.auth import admin_required
from app.models import get_all_users, create_user, update_user, get_user_by_username
bp = Blueprint("users", __name__, url_prefix="/users")
@bp.route("/")
@admin_required
def users_page():
return render_template("users.html")
@bp.route("/api/users")
@admin_required
def api_users():
return jsonify(get_all_users())
@bp.route("/api/users", methods=["POST"])
@admin_required
def api_create_user():
data = request.get_json()
username = data.get("username", "").strip()
password = data.get("password", "").strip()
role = data.get("role", "operator")
if not username or not password:
return jsonify({"ok": False, "error": "用户名和密码不能为空"}), 400
if get_user_by_username(username):
return jsonify({"ok": False, "error": "用户名已存在"}), 400
create_user(username, generate_password_hash(password), role)
return jsonify({"ok": True})
@bp.route("/api/users/<int:user_id>", methods=["PUT"])
@admin_required
def api_update_user(user_id):
data = request.get_json()
kwargs = {}
if "password" in data and data["password"]:
kwargs["password_hash"] = generate_password_hash(data["password"])
if "role" in data:
kwargs["role"] = data["role"]
if "is_active" in data:
kwargs["is_active"] = data["is_active"]
if kwargs:
update_user(user_id, **kwargs)
return jsonify({"ok": True})

View File

@@ -7,6 +7,9 @@ body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; b
.top-menu a { color: #bdc3c7; text-decoration: none; padding: 14px 24px; font-size: 15px; transition: .2s; }
.top-menu a:hover { color: #fff; background: #34495e; }
.top-menu a.active { color: #fff; background: #3498db; }
.top-menu .user-info { margin-left: auto; color: #bdc3c7; padding: 14px 0; font-size: 13px; display: flex; align-items: center; gap: 12px; }
.top-menu .user-info a { padding: 4px 12px; background: #e74c3c; border-radius: 4px; font-size: 12px; }
.top-menu .user-info a:hover { background: #c0392b; }
/* === Container === */
.container { max-width: 1400px; margin: 24px auto; padding: 0 24px; }

View File

@@ -10,6 +10,16 @@
<nav class="top-menu">
<a href="/" class="{% if request.path == '/' %}active{% endif %}">设备</a>
<a href="/test-data" class="{% if request.path == '/test-data' %}active{% endif %}">测试信息</a>
{% if current_user.is_authenticated and current_user.role == 'admin' %}
<a href="/logs/" class="{% if request.path == '/logs/' %}active{% endif %}">操作日志</a>
<a href="/users/" class="{% if request.path == '/users/' %}active{% endif %}">用户管理</a>
{% endif %}
<span class="user-info">
{% if current_user.is_authenticated %}
{{ current_user.username }} ({{ current_user.role }})
<a href="/logout">退出</a>
{% endif %}
</span>
</nav>
<main class="container">
{% block content %}{% endblock %}

View File

@@ -0,0 +1,36 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>登录 - EDC 工装管理系统</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; background: #f5f6fa; display: flex; justify-content: center; align-items: center; min-height: 100vh; }
.login-box { background: #fff; padding: 40px; border-radius: 12px; box-shadow: 0 4px 20px rgba(0,0,0,.1); width: 360px; }
.login-box h2 { text-align: center; margin-bottom: 24px; color: #2c3e50; }
.login-box label { display: block; font-size: 13px; margin-bottom: 4px; color: #555; }
.login-box input { width: 100%; padding: 10px 12px; margin-bottom: 16px; border: 1px solid #ddd; border-radius: 6px; font-size: 14px; }
.login-box button { width: 100%; padding: 10px; background: #3498db; color: #fff; border: none; border-radius: 6px; font-size: 15px; cursor: pointer; }
.login-box button:hover { background: #2980b9; }
.flash-error { color: #e74c3c; font-size: 13px; text-align: center; margin-bottom: 12px; }
</style>
</head>
<body>
<div class="login-box">
<h2>EDC 工装管理系统</h2>
{% with messages = get_flashed_messages() %}
{% if messages %}
<div class="flash-error">{{ messages[0] }}</div>
{% endif %}
{% endwith %}
<form method="post">
<label>用户名</label>
<input type="text" name="username" required autofocus>
<label>密码</label>
<input type="password" name="password" required>
<button type="submit">登录</button>
</form>
</div>
</body>
</html>

View File

@@ -0,0 +1,92 @@
{% extends "base.html" %}
{% block title %}操作日志 - EDC 工装管理系统{% endblock %}
{% block content %}
<h2>操作日志</h2>
<div class="search-bar">
<label>用户名:<input type="text" id="search-username" placeholder="筛选用户..."></label>
<label>操作类型:
<select id="search-action">
<option value="">全部</option>
<option value="login">登录</option>
<option value="logout">登出</option>
<option value="command">指令操作</option>
</select>
</label>
<button onclick="searchLogs(1)" class="btn-search">查询</button>
</div>
<table>
<thead>
<tr>
<th>ID</th>
<th>用户</th>
<th>操作类型</th>
<th>对象</th>
<th>详情</th>
<th>结果</th>
<th>IP</th>
<th>时间</th>
</tr>
</thead>
<tbody id="log-tbody"></tbody>
</table>
<div class="pagination" id="pagination"></div>
{% endblock %}
{% block scripts %}
<script>
let currentPage = 1, totalPages = 1;
async function searchLogs(page = 1) {
currentPage = page;
const username = document.getElementById("search-username").value;
const action_type = document.getElementById("search-action").value;
const params = new URLSearchParams({page, per_page: 30});
if (username) params.set("username", username);
if (action_type) params.set("action_type", action_type);
const resp = await fetch(`/logs/api/logs?${params}`);
const data = await resp.json();
renderTable(data.records);
totalPages = data.pages;
renderPagination();
}
function renderTable(records) {
const tbody = document.getElementById("log-tbody");
if (!records.length) {
tbody.innerHTML = '<tr><td colspan="8" style="text-align:center;color:#999;">暂无记录</td></tr>';
return;
}
tbody.innerHTML = records.map(r => `
<tr>
<td>${r.id}</td>
<td>${r.username || '-'}</td>
<td>${r.action_type}</td>
<td>${r.target || '-'}</td>
<td style="max-width:250px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap;" title="${esc(r.detail)}">${r.detail || '-'}</td>
<td style="color:${r.result==='ok'?'#27ae60':'#e74c3c'}">${r.result}</td>
<td>${r.ip || '-'}</td>
<td>${r.create_time || '-'}</td>
</tr>
`).join("");
}
function renderPagination() {
const div = document.getElementById("pagination");
let html = `<button onclick="searchLogs(${currentPage-1})" ${currentPage<=1?'disabled':''}>上一页</button>`;
for (let i = 1; i <= totalPages; i++) {
html += `<button onclick="searchLogs(${i})" class="${i===currentPage?'active':''}">${i}</button>`;
}
html += `<button onclick="searchLogs(${currentPage+1})" ${currentPage>=totalPages?'disabled':''}>下一页</button>`;
div.innerHTML = html;
}
function esc(s) { return s.replace(/"/g, '&quot;'); }
searchLogs(1);
</script>
{% endblock %}

View File

@@ -0,0 +1,112 @@
{% extends "base.html" %}
{% block title %}用户管理 - EDC 工装管理系统{% endblock %}
{% block content %}
<h2>用户管理</h2>
<div style="margin-bottom:16px;">
<button onclick="showAddForm()" class="btn-search">新增用户</button>
<div id="add-form" style="display:none;margin-top:12px;background:#fff;padding:16px;border-radius:8px;">
<label>用户名:<input type="text" id="new-username" style="margin:0 8px;"></label>
<label>密码:<input type="password" id="new-password" style="margin:0 8px;"></label>
<label>角色:
<select id="new-role" style="margin:0 8px;">
<option value="operator">operator</option>
<option value="admin">admin</option>
</select>
</label>
<button onclick="addUser()" class="btn-search">确认</button>
<button onclick="document.getElementById('add-form').style.display='none'" class="btn-export">取消</button>
</div>
</div>
<table>
<thead>
<tr>
<th>ID</th>
<th>用户名</th>
<th>角色</th>
<th>状态</th>
<th>创建时间</th>
<th>操作</th>
</tr>
</thead>
<tbody id="user-tbody"></tbody>
</table>
{% endblock %}
{% block scripts %}
<script>
async function loadUsers() {
const resp = await fetch("/users/api/users");
const users = await resp.json();
const tbody = document.getElementById("user-tbody");
tbody.innerHTML = users.map(u => `
<tr>
<td>${u.id}</td>
<td>${u.username}</td>
<td>${u.role}</td>
<td>${u.is_active ? '启用' : '禁用'}</td>
<td>${u.create_time || '-'}</td>
<td>
<select onchange="updateUser(${u.id}, this, 'role')" data-field="role">
<option value="operator" ${u.role==='operator'?'selected':''}>operator</option>
<option value="admin" ${u.role==='admin'?'selected':''}>admin</option>
</select>
<select onchange="updateUser(${u.id}, this, 'is_active')" data-field="is_active">
<option value="1" ${u.is_active?'selected':''}>启用</option>
<option value="0" ${!u.is_active?'selected':''}>禁用</option>
</select>
<button onclick="resetPwd(${u.id})" class="btn-search" style="font-size:11px;">改密</button>
</td>
</tr>
`).join("");
}
function showAddForm() {
document.getElementById("add-form").style.display = "block";
document.getElementById("new-username").value = "";
document.getElementById("new-password").value = "";
}
async function addUser() {
const username = document.getElementById("new-username").value.trim();
const password = document.getElementById("new-password").value;
const role = document.getElementById("new-role").value;
if (!username || !password) { alert("用户名和密码不能为空"); return; }
const resp = await fetch("/users/api/users", {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({username, password, role}),
});
const data = await resp.json();
if (data.ok) { document.getElementById("add-form").style.display = "none"; loadUsers(); }
else { alert(data.error || "创建失败"); }
}
async function updateUser(id, el, field) {
const value = el.value;
const body = {};
body[field] = field === 'is_active' ? parseInt(value) : value;
await fetch(`/users/api/users/${id}`, {
method: "PUT",
headers: {"Content-Type": "application/json"},
body: JSON.stringify(body),
});
loadUsers();
}
async function resetPwd(id) {
const pwd = prompt("输入新密码至少6位");
if (!pwd || pwd.length < 6) { alert("密码至少6位"); return; }
await fetch(`/users/api/users/${id}`, {
method: "PUT",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({password: pwd}),
});
alert("密码已更新");
}
loadUsers();
</script>
{% endblock %}