- tb_user 用户表、tb_log 日志表 - Flask-Login 认证(login/logout/权限装饰器) - 用户管理页(admin 专有):增删改查、改密、角色设置 - 操作日志页:分页查询、按用户/类型筛选 - 测试操作区指令自动记录日志 - 所有页面加 @login_required 保护 - 默认管理员 admin/admin123(首次启动自动创建)
89 lines
3.0 KiB
Python
89 lines
3.0 KiB
Python
"""认证模块 — Flask-Login 集成"""
|
|
|
|
from flask import Blueprint, render_template, request, redirect, url_for, flash
|
|
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
|
|
from werkzeug.security import check_password_hash
|
|
from app.models import get_user_by_username, insert_log
|
|
|
|
auth_bp = Blueprint("auth", __name__)
|
|
login_manager = LoginManager()
|
|
login_manager.login_view = "auth.login"
|
|
login_manager.login_message = "请先登录"
|
|
|
|
|
|
class User:
|
|
"""Flask-Login 用户对象"""
|
|
def __init__(self, user_dict):
|
|
self.id = user_dict["id"]
|
|
self.username = user_dict["username"]
|
|
self.role = user_dict["role"]
|
|
self.is_active = bool(user_dict.get("is_active", 1))
|
|
|
|
@property
|
|
def is_authenticated(self):
|
|
return True
|
|
|
|
def get_id(self):
|
|
return str(self.id)
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
conn = __import__("app.models", fromlist=["get_conn"]).get_conn()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT * FROM tb_user WHERE id=%s", (int(user_id),))
|
|
row = cur.fetchone()
|
|
finally:
|
|
conn.close()
|
|
return User(row) if row else None
|
|
|
|
|
|
def init_auth(app):
|
|
login_manager.init_app(app)
|
|
|
|
|
|
# ─── 装饰器 ────────────────────────────────────────────────────────
|
|
|
|
def admin_required(f):
|
|
"""要求 admin 角色"""
|
|
from functools import wraps
|
|
@wraps(f)
|
|
@login_required
|
|
def wrapper(*args, **kwargs):
|
|
if current_user.role != "admin":
|
|
return "权限不足", 403
|
|
return f(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
# ─── 登录 / 登出 ────────────────────────────────────────────────────
|
|
|
|
@auth_bp.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "POST":
|
|
username = request.form.get("username", "").strip()
|
|
password = request.form.get("password", "")
|
|
user_dict = get_user_by_username(username)
|
|
ip = request.remote_addr or ""
|
|
|
|
if user_dict and user_dict.get("is_active") and check_password_hash(user_dict["password_hash"], password):
|
|
user = User(user_dict)
|
|
login_user(user)
|
|
insert_log(user.id, user.username, "login", ip=ip, result="ok")
|
|
next_page = request.args.get("next")
|
|
return redirect(next_page or url_for("devices.index"))
|
|
else:
|
|
insert_log(0, username, "login", detail="密码错误或账号禁用", ip=ip, result="error")
|
|
flash("用户名或密码错误")
|
|
return render_template("login.html")
|
|
|
|
|
|
@auth_bp.route("/logout")
|
|
@login_required
|
|
def logout():
|
|
insert_log(current_user.id, current_user.username, "logout",
|
|
ip=request.remote_addr or "", result="ok")
|
|
logout_user()
|
|
return redirect(url_for("auth.login"))
|