Files
vd_960/DBNetClient/main.py
wangfq 8b4404d5b3 feat(DBNetClient): TCP JSON 协议桌面测试工具
- tcp_json_client.py: 协议客户端库 — 行分隔JSON, 请求-响应, 主动推送接收
- main.py: tkinter 跨平台 GUI — 7个标签页覆盖全部15条命令
- 虚拟环境: venv/, 无额外依赖(tkinter 标准库)
- 支持: 鉴权/设备信息/网络配置/IoT配置/线圈参数/系统操作/Raw JSON
2026-06-30 15:32:57 +08:00

492 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
DBNetClient — DLD960 TCP JSON 协议测试工具
跨平台桌面应用 (tkinter),用于测试 vd960DBN 的网络接口协议。
"""
import sys
import os
import threading
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
from tcp_json_client import DBNetClient, TcpJsonError
APP_TITLE = "DBNetClient — DLD960 TCP JSON 协议测试工具"
class DBNetApp:
def __init__(self, root: tk.Tk):
self.root = root
self.root.title(APP_TITLE)
self.root.geometry("960x720")
self.root.minsize(800, 600)
self.client = DBNetClient(log_callback=self.log)
self.client.on_push("loop_data", self.on_loop_data)
self.client.on_push("event_report", self.on_event_report)
self._build_ui()
self._set_defaults()
# ==================================================================
# UI
# ==================================================================
def _build_ui(self):
# ---- Top: Connection bar ----
top = ttk.Frame(self.root, padding=5)
top.pack(fill=tk.X)
ttk.Label(top, text="Host:").pack(side=tk.LEFT)
self.host_var = tk.StringVar(value="192.168.1.188")
ttk.Entry(top, textvariable=self.host_var, width=14).pack(side=tk.LEFT, padx=2)
ttk.Label(top, text="Port:").pack(side=tk.LEFT, padx=(8, 0))
self.port_var = tk.IntVar(value=5960)
ttk.Entry(top, textvariable=self.port_var, width=6).pack(side=tk.LEFT, padx=2)
self.btn_conn = ttk.Button(top, text="Connect", command=self._toggle_connect)
self.btn_conn.pack(side=tk.LEFT, padx=8)
self.conn_status = ttk.Label(top, text="● Disconnected", foreground="red")
self.conn_status.pack(side=tk.LEFT, padx=5)
ttk.Separator(self.root, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=2)
# ---- Notebook ----
nb = ttk.Notebook(self.root)
nb.pack(fill=tk.BOTH, expand=True, padx=5, pady=2)
nb.add(self._tab_auth(nb), text="鉴权")
nb.add(self._tab_devinfo(nb), text="设备信息")
nb.add(self._tab_network(nb), text="网络配置")
nb.add(self._tab_iot(nb), text="IoT 配置")
nb.add(self._tab_loop(nb), text="线圈参数")
nb.add(self._tab_system(nb), text="系统")
nb.add(self._tab_raw(nb), text="Raw JSON")
# ---- Bottom: Log ----
log_frame = ttk.LabelFrame(self.root, text="日志 / 原始响应", padding=2)
log_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=(0, 5))
self.log_text = scrolledtext.ScrolledText(log_frame, height=8, wrap=tk.WORD,
font=("Consolas", 9))
self.log_text.pack(fill=tk.BOTH, expand=True)
self.log_text.tag_configure("sent", foreground="blue")
self.log_text.tag_configure("recv", foreground="green")
self.log_text.tag_configure("error", foreground="red")
self.log_text.tag_configure("push", foreground="purple")
# ---- Bottom buttons ----
btn_frame = ttk.Frame(self.root, padding=5)
btn_frame.pack(fill=tk.X)
ttk.Button(btn_frame, text="Clear Log", command=self._clear_log).pack(side=tk.RIGHT)
def _set_defaults(self):
self._entry_set(self.pwd_var, "123456")
# ---- Tab builders ----
def _tab_auth(self, parent):
f = ttk.Frame(parent, padding=10)
ttk.Label(f, text="设备密码 (6位数字):").grid(row=0, column=0, sticky=tk.W, pady=5)
self.pwd_var = tk.StringVar()
ttk.Entry(f, textvariable=self.pwd_var, width=12, show="*").grid(row=0, column=1, sticky=tk.W, padx=5)
ttk.Button(f, text="鉴权 (pwd_verify)", command=self._do_pwd_verify).grid(row=0, column=2, padx=10)
ttk.Separator(f, orient=tk.HORIZONTAL).grid(row=1, column=0, columnspan=3,
sticky=tk.EW, pady=10)
ttk.Label(f, text="连接后必须先鉴权,才能执行其他命令。\n连续3次错误将锁定60秒。",
foreground="gray").grid(row=2, column=0, columnspan=3, pady=5)
return f
def _tab_devinfo(self, parent):
f = ttk.Frame(parent, padding=10)
ttk.Button(f, text="查询设备信息", command=self._do_dev_info_query).pack(anchor=tk.W, pady=3)
ttk.Separator(f, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=8)
ttk.Label(f, text="更改设备序列码 (12位HEX):").pack(anchor=tk.W)
row = ttk.Frame(f)
row.pack(fill=tk.X, pady=5)
self.serial_var = tk.StringVar()
ttk.Entry(row, textvariable=self.serial_var, width=18).pack(side=tk.LEFT, padx=5)
ttk.Button(row, text="设置", command=self._do_dev_serial_set).pack(side=tk.LEFT)
ttk.Separator(f, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=8)
# Response display
self.devinfo_text = scrolledtext.ScrolledText(f, height=10, wrap=tk.WORD,
font=("Consolas", 9))
self.devinfo_text.pack(fill=tk.BOTH, expand=True)
return f
def _tab_network(self, parent):
f = ttk.Frame(parent, padding=10)
# SSC Net fields
fields = [
("设备 IP:", "dev_ip"),
("子网掩码:", "subnet"),
("网关:", "route_ip"),
("LSSC IP:", "lssc_ip"),
("DNS:", "dns"),
("LSSC Port:", "ssc_port"),
]
self.net_vars = {}
for i, (label, key) in enumerate(fields):
ttk.Label(f, text=label).grid(row=i, column=0, sticky=tk.W, pady=2)
var = tk.StringVar()
self.net_vars[key] = var
ttk.Entry(f, textvariable=var, width=18).grid(row=i, column=1, sticky=tk.W, padx=5)
btn_row = ttk.Frame(f)
btn_row.grid(row=len(fields), column=0, columnspan=2, pady=10)
ttk.Button(btn_row, text="查询 (ssc_net_query)", command=self._do_ssc_net_query).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_row, text="设置 (ssc_net_set)", command=self._do_ssc_net_set).pack(side=tk.LEFT, padx=5)
self.net_resp = ttk.Label(f, text="", foreground="gray")
self.net_resp.grid(row=len(fields) + 1, column=0, columnspan=2, pady=5)
return f
def _tab_iot(self, parent):
f = ttk.Frame(parent, padding=10)
iot_fields = [
("MQTT Host:", "iot_host"),
("MQTT Port:", "iot_port"),
("Client ID:", "iot_client_id"),
("Username:", "iot_user"),
("Password:", "iot_pwd"),
("Topic Pub:", "topic_pub"),
("Topic Sub:", "topic_sub"),
]
self.iot_vars = {}
for i, (label, key) in enumerate(iot_fields):
ttk.Label(f, text=label).grid(row=i, column=0, sticky=tk.W, pady=2)
var = tk.StringVar()
self.iot_vars[key] = var
ttk.Entry(f, textvariable=var, width=24).grid(row=i, column=1, sticky=tk.W, padx=5)
self.iot_cid_var = tk.BooleanVar(value=False)
ttk.Checkbutton(f, text="Topic 追加 Client ID",
variable=self.iot_cid_var).grid(row=len(iot_fields), column=0, columnspan=2, pady=5)
btn_row = ttk.Frame(f)
btn_row.grid(row=len(iot_fields) + 1, column=0, columnspan=2, pady=5)
ttk.Button(btn_row, text="IoT 查询", command=self._do_iot_net_query).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_row, text="IoT 设置", command=self._do_iot_net_set).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_row, text="Topic 查询", command=self._do_iot_topic_query).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_row, text="Topic 设置", command=self._do_iot_topic_set).pack(side=tk.LEFT, padx=5)
self.iot_resp = ttk.Label(f, text="", foreground="gray")
self.iot_resp.grid(row=len(iot_fields) + 2, column=0, columnspan=2, pady=5)
return f
def _tab_loop(self, parent):
f = ttk.Frame(parent, padding=10)
ttk.Button(f, text="查询线圈参数 (loop_param_query)",
command=self._do_loop_param_query).pack(anchor=tk.W, pady=3)
ttk.Label(f, text="参数设置通过 BLE/小程序,或 Raw JSON 标签页发送。",
foreground="gray").pack(anchor=tk.W, pady=5)
self.loop_text = scrolledtext.ScrolledText(f, height=12, wrap=tk.WORD,
font=("Consolas", 9))
self.loop_text.pack(fill=tk.BOTH, expand=True)
return f
def _tab_system(self, parent):
f = ttk.Frame(parent, padding=10)
# Change password
ttk.Label(f, text="修改设备密码:").grid(row=0, column=0, sticky=tk.W, pady=3)
ttk.Label(f, text="旧密码:").grid(row=1, column=0, sticky=tk.W)
self.old_pwd_var = tk.StringVar()
ttk.Entry(f, textvariable=self.old_pwd_var, width=12, show="*").grid(row=1, column=1, sticky=tk.W, padx=5)
ttk.Label(f, text="新密码:").grid(row=2, column=0, sticky=tk.W)
self.new_pwd_var = tk.StringVar()
ttk.Entry(f, textvariable=self.new_pwd_var, width=12, show="*").grid(row=2, column=1, sticky=tk.W, padx=5)
ttk.Button(f, text="修改密码", command=self._do_pwd_set).grid(row=2, column=2, padx=10)
ttk.Separator(f, orient=tk.HORIZONTAL).grid(row=3, column=0, columnspan=3,
sticky=tk.EW, pady=10)
# Reset
ttk.Label(f, text="⚠ 危险操作:", foreground="red").grid(row=4, column=0, sticky=tk.W)
btn_row = ttk.Frame(f)
btn_row.grid(row=5, column=0, columnspan=3, pady=5)
ttk.Button(btn_row, text="设备复位 (device_reset)",
command=self._do_device_reset).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_row, text="出厂初始化 (factory_reset)",
command=self._do_factory_reset).pack(side=tk.LEFT, padx=5)
self.sys_resp = ttk.Label(f, text="", foreground="gray")
self.sys_resp.grid(row=6, column=0, columnspan=3, pady=5)
return f
def _tab_raw(self, parent):
f = ttk.Frame(parent, padding=10)
ttk.Label(f, text="直接输入 JSON 命令(一行一条,不含换行符):").pack(anchor=tk.W)
self.raw_input = scrolledtext.ScrolledText(f, height=5, font=("Consolas", 10))
self.raw_input.pack(fill=tk.X, pady=5)
self.raw_input.insert(tk.END,
'{"msg_id":1,"cmd":"dev_info_query","ts":0}\n'
'{"msg_id":1,"cmd":"pwd_verify","ts":0,"data":{"password":"123456"}}\n')
ttk.Button(f, text="发送选中行", command=self._do_raw_send).pack(anchor=tk.W)
self.raw_resp = scrolledtext.ScrolledText(f, height=8, font=("Consolas", 9))
self.raw_resp.pack(fill=tk.BOTH, expand=True, pady=5)
return f
# ==================================================================
# Helpers
# ==================================================================
def _entry_set(self, var, value):
var.set(value)
def _check_set(self, var, value):
var.set(value)
def log(self, msg: str):
"""线程安全的日志输出"""
self.root.after(0, self._log_append, msg)
def _log_append(self, msg: str):
tag = "recv"
if msg.startswith(">>>"):
tag = "sent"
elif msg.startswith("<<<"):
tag = "recv" if "code=" in msg else "push"
elif "error" in msg.lower() or "fail" in msg.lower():
tag = "error"
self.log_text.insert(tk.END, msg + "\n", tag)
self.log_text.see(tk.END)
def _clear_log(self):
self.log_text.delete("1.0", tk.END)
def _set_status(self, connected: bool):
if connected:
self.conn_status.config(text="● Connected", foreground="green")
self.btn_conn.config(text="Disconnect")
else:
self.conn_status.config(text="● Disconnected", foreground="red")
self.btn_conn.config(text="Connect")
def _bg_run(self, fn, on_done=None):
"""在后台线程执行 fn完成后回调 on_done(result)"""
def _run():
try:
result = fn()
except TcpJsonError as e:
result = e
except Exception as e:
result = e
if on_done:
self.root.after(0, on_done, result)
threading.Thread(target=_run, daemon=True).start()
def _show_resp(self, widget, resp):
"""将响应 dict 显示在 Text widget 中"""
import json as _json
if isinstance(resp, Exception):
widget.delete("1.0", tk.END)
widget.insert(tk.END, f"Error: {resp}")
else:
widget.delete("1.0", tk.END)
widget.insert(tk.END, _json.dumps(resp, indent=2, ensure_ascii=False))
def _show_label(self, label_widget, resp):
if isinstance(resp, Exception):
label_widget.config(text=str(resp), foreground="red")
else:
code = resp.get("code", -1)
msg = resp.get("msg", "")
if code == 0:
label_widget.config(text=f"{msg}", foreground="green")
else:
label_widget.config(text=f"✗ [{code}] {msg}", foreground="red")
# ==================================================================
# Button handlers
# ==================================================================
def _toggle_connect(self):
if self.client.is_connected:
self.client.disconnect()
self._set_status(False)
else:
host = self.host_var.get().strip()
port = self.port_var.get()
try:
self.client.connect(host, port)
self._set_status(True)
except Exception as e:
messagebox.showerror("Connection Error", str(e))
# Auth
def _do_pwd_verify(self):
pwd = self.pwd_var.get().strip()
self._bg_run(lambda: self.client.pwd_verify(pwd),
lambda r: messagebox.showinfo("鉴权结果",
"✓ 鉴权成功" if (not isinstance(r, Exception) and r.get("code") == 0)
else f"{r}"))
# Device Info
def _do_dev_info_query(self):
self._bg_run(lambda: self.client.dev_info_query(),
lambda r: self._show_resp(self.devinfo_text, r))
def _do_dev_serial_set(self):
s = self.serial_var.get().strip()
if len(s) != 12:
messagebox.showerror("Error", "序列码必须为12位HEX")
return
self._bg_run(lambda: self.client.dev_serial_set(s),
lambda r: self._show_resp(self.devinfo_text, r))
# SSC Network
def _do_ssc_net_query(self):
self._bg_run(lambda: self.client.ssc_net_query(),
lambda r: self._show_label(self.net_resp, r) if isinstance(r, Exception)
else self._fill_net_vars(r.get("data", {})))
def _fill_net_vars(self, data: dict):
self.net_vars["dev_ip"].set(data.get("dev_ip", ""))
self.net_vars["subnet"].set(data.get("subnet_mask", ""))
self.net_vars["route_ip"].set(data.get("route_ip", ""))
self.net_vars["lssc_ip"].set(data.get("lssc_ip", ""))
self.net_vars["dns"].set(data.get("dns", ""))
self.net_vars["ssc_port"].set(str(data.get("port", "")))
self.net_resp.config(text="✓ 查询成功", foreground="green")
def _do_ssc_net_set(self):
port_str = self.net_vars["ssc_port"].get().strip()
port = int(port_str) if port_str else 0
self._bg_run(
lambda: self.client.ssc_net_set(
dev_ip=self.net_vars["dev_ip"].get().strip(),
subnet_mask=self.net_vars["subnet"].get().strip(),
route_ip=self.net_vars["route_ip"].get().strip(),
lssc_ip=self.net_vars["lssc_ip"].get().strip(),
dns=self.net_vars["dns"].get().strip(),
port=port),
lambda r: self._show_label(self.net_resp, r))
# IoT
def _do_iot_net_query(self):
self._bg_run(lambda: self.client.iot_net_query(),
lambda r: self._show_label(self.iot_resp, r) if isinstance(r, Exception)
else self._fill_iot_vars(r.get("data", {})))
def _fill_iot_vars(self, data: dict):
self.iot_vars["iot_host"].set(data.get("host", ""))
self.iot_vars["iot_port"].set(str(data.get("port", "")))
self.iot_vars["iot_client_id"].set(data.get("client_id", ""))
self.iot_vars["iot_user"].set(data.get("username", ""))
self.iot_vars["iot_pwd"].set(data.get("password", ""))
self.iot_resp.config(text="✓ 查询成功", foreground="green")
def _do_iot_net_set(self):
port_str = self.iot_vars["iot_port"].get().strip()
port = int(port_str) if port_str else 0
self._bg_run(
lambda: self.client.iot_net_set(
host=self.iot_vars["iot_host"].get().strip(),
port=port,
client_id=self.iot_vars["iot_client_id"].get().strip(),
username=self.iot_vars["iot_user"].get().strip(),
password=self.iot_vars["iot_pwd"].get().strip()),
lambda r: self._show_label(self.iot_resp, r))
def _do_iot_topic_query(self):
self._bg_run(lambda: self.client.iot_topic_query(),
lambda r: self._show_label(self.iot_resp, r) if isinstance(r, Exception)
else self._fill_topic_vars(r.get("data", {})))
def _fill_topic_vars(self, data: dict):
self.iot_vars["topic_pub"].set(data.get("topic_pub", ""))
self.iot_vars["topic_sub"].set(data.get("topic_sub", ""))
self.iot_cid_var.set(data.get("client_id_enable", False))
self.iot_resp.config(text="✓ 查询成功", foreground="green")
def _do_iot_topic_set(self):
self._bg_run(
lambda: self.client.iot_topic_set(
client_id_enable=self.iot_cid_var.get(),
topic_pub=self.iot_vars["topic_pub"].get().strip(),
topic_sub=self.iot_vars["topic_sub"].get().strip()),
lambda r: self._show_label(self.iot_resp, r))
# Loop
def _do_loop_param_query(self):
self._bg_run(lambda: self.client.loop_param_query(),
lambda r: self._show_resp(self.loop_text, r))
# System
def _do_pwd_set(self):
old = self.old_pwd_var.get().strip()
new = self.new_pwd_var.get().strip()
if len(old) != 6 or len(new) != 6:
messagebox.showerror("Error", "密码必须为6位数字")
return
self._bg_run(lambda: self.client.pwd_set(old, new),
lambda r: self._show_label(self.sys_resp, r))
def _do_device_reset(self):
if not messagebox.askyesno("确认", "设备复位后连接将断开,确定?"):
return
self._bg_run(lambda: self.client.device_reset(),
lambda r: self._set_status(False))
def _do_factory_reset(self):
if not messagebox.askyesno("确认", "恢复出厂设置将清除所有配置并复位,确定?"):
return
self._bg_run(lambda: self.client.factory_reset(),
lambda r: self._set_status(False))
# Raw JSON
def _do_raw_send(self):
raw = self.raw_input.get("sel.first", "sel.last")
if not raw:
raw = self.raw_input.get("1.0", tk.END)
for line in raw.strip().split("\n"):
line = line.strip()
if not line:
continue
try:
import json as _json
obj = _json.loads(line)
cmd = obj.get("cmd", "unknown")
data = obj.get("data")
self._bg_run(
lambda c=cmd, d=data: self.client._tcp.send_command(c, d),
lambda r: self._show_resp(self.raw_resp, r))
except Exception as e:
self.raw_resp.insert(tk.END, f"Parse error: {e}\n")
# Push handlers
def on_loop_data(self, cmd, data):
self.log(f"<<< PUSH loop_data: {data}")
def on_event_report(self, cmd, data):
self.log(f"<<< PUSH event_report: {data}")
def main():
root = tk.Tk()
app = DBNetApp(root)
# 窗口关闭时断开连接
def on_close():
if app.client.is_connected:
app.client.disconnect()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_close)
root.mainloop()
if __name__ == "__main__":
main()