#!/usr/bin/env python3 """ DBNetClient — DLD960 TCP JSON 协议测试工具 跨平台桌面应用 (tkinter),用于测试 vd960DBN 的网络接口协议。 """ import sys import os import threading import tkinter as tk from tkinter import ttk, scrolledtext, messagebox from tcp_json_client import DBNetClient, TcpJsonError APP_TITLE = "DBNetClient — DLD960 TCP JSON 协议测试工具" class DBNetApp: def __init__(self, root: tk.Tk): self.root = root self.root.title(APP_TITLE) self.root.geometry("960x720") self.root.minsize(800, 600) self.client = DBNetClient(log_callback=self.log) self.client.on_push("loop_data", self.on_loop_data) self.client.on_push("event_report", self.on_event_report) self._build_ui() self._set_defaults() # ================================================================== # UI # ================================================================== def _build_ui(self): # ---- Top: Connection bar ---- top = ttk.Frame(self.root, padding=5) top.pack(fill=tk.X) ttk.Label(top, text="Host:").pack(side=tk.LEFT) self.host_var = tk.StringVar(value="192.168.1.188") ttk.Entry(top, textvariable=self.host_var, width=14).pack(side=tk.LEFT, padx=2) ttk.Label(top, text="Port:").pack(side=tk.LEFT, padx=(8, 0)) self.port_var = tk.IntVar(value=5960) ttk.Entry(top, textvariable=self.port_var, width=6).pack(side=tk.LEFT, padx=2) self.btn_conn = ttk.Button(top, text="Connect", command=self._toggle_connect) self.btn_conn.pack(side=tk.LEFT, padx=8) self.conn_status = ttk.Label(top, text="● Disconnected", foreground="red") self.conn_status.pack(side=tk.LEFT, padx=5) ttk.Separator(self.root, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=2) # ---- Notebook ---- nb = ttk.Notebook(self.root) nb.pack(fill=tk.BOTH, expand=True, padx=5, pady=2) nb.add(self._tab_auth(nb), text="鉴权") nb.add(self._tab_devinfo(nb), text="设备信息") nb.add(self._tab_network(nb), text="网络配置") nb.add(self._tab_iot(nb), text="IoT 配置") nb.add(self._tab_loop(nb), text="线圈参数") nb.add(self._tab_system(nb), text="系统") nb.add(self._tab_raw(nb), text="Raw JSON") # ---- Bottom: Log ---- log_frame = ttk.LabelFrame(self.root, text="日志 / 原始响应", padding=2) log_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=(0, 5)) self.log_text = scrolledtext.ScrolledText(log_frame, height=8, wrap=tk.WORD, font=("Consolas", 9)) self.log_text.pack(fill=tk.BOTH, expand=True) self.log_text.tag_configure("sent", foreground="blue") self.log_text.tag_configure("recv", foreground="green") self.log_text.tag_configure("error", foreground="red") self.log_text.tag_configure("push", foreground="purple") # ---- Bottom buttons ---- btn_frame = ttk.Frame(self.root, padding=5) btn_frame.pack(fill=tk.X) ttk.Button(btn_frame, text="Clear Log", command=self._clear_log).pack(side=tk.RIGHT) def _set_defaults(self): self._entry_set(self.pwd_var, "123456") # ---- Tab builders ---- def _tab_auth(self, parent): f = ttk.Frame(parent, padding=10) ttk.Label(f, text="设备密码 (6位数字):").grid(row=0, column=0, sticky=tk.W, pady=5) self.pwd_var = tk.StringVar() ttk.Entry(f, textvariable=self.pwd_var, width=12, show="*").grid(row=0, column=1, sticky=tk.W, padx=5) ttk.Button(f, text="鉴权 (pwd_verify)", command=self._do_pwd_verify).grid(row=0, column=2, padx=10) ttk.Separator(f, orient=tk.HORIZONTAL).grid(row=1, column=0, columnspan=3, sticky=tk.EW, pady=10) ttk.Label(f, text="连接后必须先鉴权,才能执行其他命令。\n连续3次错误将锁定60秒。", foreground="gray").grid(row=2, column=0, columnspan=3, pady=5) return f def _tab_devinfo(self, parent): f = ttk.Frame(parent, padding=10) ttk.Button(f, text="查询设备信息", command=self._do_dev_info_query).pack(anchor=tk.W, pady=3) ttk.Separator(f, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=8) ttk.Label(f, text="更改设备序列码 (12位HEX):").pack(anchor=tk.W) row = ttk.Frame(f) row.pack(fill=tk.X, pady=5) self.serial_var = tk.StringVar() ttk.Entry(row, textvariable=self.serial_var, width=18).pack(side=tk.LEFT, padx=5) ttk.Button(row, text="设置", command=self._do_dev_serial_set).pack(side=tk.LEFT) ttk.Separator(f, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=8) # Response display self.devinfo_text = scrolledtext.ScrolledText(f, height=10, wrap=tk.WORD, font=("Consolas", 9)) self.devinfo_text.pack(fill=tk.BOTH, expand=True) return f def _tab_network(self, parent): f = ttk.Frame(parent, padding=10) # SSC Net fields fields = [ ("设备 IP:", "dev_ip"), ("子网掩码:", "subnet"), ("网关:", "route_ip"), ("LSSC IP:", "lssc_ip"), ("DNS:", "dns"), ("LSSC Port:", "ssc_port"), ] self.net_vars = {} for i, (label, key) in enumerate(fields): ttk.Label(f, text=label).grid(row=i, column=0, sticky=tk.W, pady=2) var = tk.StringVar() self.net_vars[key] = var ttk.Entry(f, textvariable=var, width=18).grid(row=i, column=1, sticky=tk.W, padx=5) btn_row = ttk.Frame(f) btn_row.grid(row=len(fields), column=0, columnspan=2, pady=10) ttk.Button(btn_row, text="查询 (ssc_net_query)", command=self._do_ssc_net_query).pack(side=tk.LEFT, padx=5) ttk.Button(btn_row, text="设置 (ssc_net_set)", command=self._do_ssc_net_set).pack(side=tk.LEFT, padx=5) self.net_resp = ttk.Label(f, text="", foreground="gray") self.net_resp.grid(row=len(fields) + 1, column=0, columnspan=2, pady=5) return f def _tab_iot(self, parent): f = ttk.Frame(parent, padding=10) iot_fields = [ ("MQTT Host:", "iot_host"), ("MQTT Port:", "iot_port"), ("Client ID:", "iot_client_id"), ("Username:", "iot_user"), ("Password:", "iot_pwd"), ("Topic Pub:", "topic_pub"), ("Topic Sub:", "topic_sub"), ] self.iot_vars = {} for i, (label, key) in enumerate(iot_fields): ttk.Label(f, text=label).grid(row=i, column=0, sticky=tk.W, pady=2) var = tk.StringVar() self.iot_vars[key] = var ttk.Entry(f, textvariable=var, width=24).grid(row=i, column=1, sticky=tk.W, padx=5) self.iot_cid_var = tk.BooleanVar(value=False) ttk.Checkbutton(f, text="Topic 追加 Client ID", variable=self.iot_cid_var).grid(row=len(iot_fields), column=0, columnspan=2, pady=5) btn_row = ttk.Frame(f) btn_row.grid(row=len(iot_fields) + 1, column=0, columnspan=2, pady=5) ttk.Button(btn_row, text="IoT 查询", command=self._do_iot_net_query).pack(side=tk.LEFT, padx=5) ttk.Button(btn_row, text="IoT 设置", command=self._do_iot_net_set).pack(side=tk.LEFT, padx=5) ttk.Button(btn_row, text="Topic 查询", command=self._do_iot_topic_query).pack(side=tk.LEFT, padx=5) ttk.Button(btn_row, text="Topic 设置", command=self._do_iot_topic_set).pack(side=tk.LEFT, padx=5) self.iot_resp = ttk.Label(f, text="", foreground="gray") self.iot_resp.grid(row=len(iot_fields) + 2, column=0, columnspan=2, pady=5) return f def _tab_loop(self, parent): f = ttk.Frame(parent, padding=10) ttk.Button(f, text="查询线圈参数 (loop_param_query)", command=self._do_loop_param_query).pack(anchor=tk.W, pady=3) ttk.Label(f, text="参数设置通过 BLE/小程序,或 Raw JSON 标签页发送。", foreground="gray").pack(anchor=tk.W, pady=5) self.loop_text = scrolledtext.ScrolledText(f, height=12, wrap=tk.WORD, font=("Consolas", 9)) self.loop_text.pack(fill=tk.BOTH, expand=True) return f def _tab_system(self, parent): f = ttk.Frame(parent, padding=10) # Change password ttk.Label(f, text="修改设备密码:").grid(row=0, column=0, sticky=tk.W, pady=3) ttk.Label(f, text="旧密码:").grid(row=1, column=0, sticky=tk.W) self.old_pwd_var = tk.StringVar() ttk.Entry(f, textvariable=self.old_pwd_var, width=12, show="*").grid(row=1, column=1, sticky=tk.W, padx=5) ttk.Label(f, text="新密码:").grid(row=2, column=0, sticky=tk.W) self.new_pwd_var = tk.StringVar() ttk.Entry(f, textvariable=self.new_pwd_var, width=12, show="*").grid(row=2, column=1, sticky=tk.W, padx=5) ttk.Button(f, text="修改密码", command=self._do_pwd_set).grid(row=2, column=2, padx=10) ttk.Separator(f, orient=tk.HORIZONTAL).grid(row=3, column=0, columnspan=3, sticky=tk.EW, pady=10) # Reset ttk.Label(f, text="⚠ 危险操作:", foreground="red").grid(row=4, column=0, sticky=tk.W) btn_row = ttk.Frame(f) btn_row.grid(row=5, column=0, columnspan=3, pady=5) ttk.Button(btn_row, text="设备复位 (device_reset)", command=self._do_device_reset).pack(side=tk.LEFT, padx=5) ttk.Button(btn_row, text="出厂初始化 (factory_reset)", command=self._do_factory_reset).pack(side=tk.LEFT, padx=5) self.sys_resp = ttk.Label(f, text="", foreground="gray") self.sys_resp.grid(row=6, column=0, columnspan=3, pady=5) return f def _tab_raw(self, parent): f = ttk.Frame(parent, padding=10) ttk.Label(f, text="直接输入 JSON 命令(一行一条,不含换行符):").pack(anchor=tk.W) self.raw_input = scrolledtext.ScrolledText(f, height=5, font=("Consolas", 10)) self.raw_input.pack(fill=tk.X, pady=5) self.raw_input.insert(tk.END, '{"msg_id":1,"cmd":"dev_info_query","ts":0}\n' '{"msg_id":1,"cmd":"pwd_verify","ts":0,"data":{"password":"123456"}}\n') ttk.Button(f, text="发送选中行", command=self._do_raw_send).pack(anchor=tk.W) self.raw_resp = scrolledtext.ScrolledText(f, height=8, font=("Consolas", 9)) self.raw_resp.pack(fill=tk.BOTH, expand=True, pady=5) return f # ================================================================== # Helpers # ================================================================== def _entry_set(self, var, value): var.set(value) def _check_set(self, var, value): var.set(value) def log(self, msg: str): """线程安全的日志输出""" self.root.after(0, self._log_append, msg) def _log_append(self, msg: str): tag = "recv" if msg.startswith(">>>"): tag = "sent" elif msg.startswith("<<<"): tag = "recv" if "code=" in msg else "push" elif "error" in msg.lower() or "fail" in msg.lower(): tag = "error" self.log_text.insert(tk.END, msg + "\n", tag) self.log_text.see(tk.END) def _clear_log(self): self.log_text.delete("1.0", tk.END) def _set_status(self, connected: bool): if connected: self.conn_status.config(text="● Connected", foreground="green") self.btn_conn.config(text="Disconnect") else: self.conn_status.config(text="● Disconnected", foreground="red") self.btn_conn.config(text="Connect") def _bg_run(self, fn, on_done=None): """在后台线程执行 fn,完成后回调 on_done(result)""" def _run(): try: result = fn() except TcpJsonError as e: result = e except Exception as e: result = e if on_done: self.root.after(0, on_done, result) threading.Thread(target=_run, daemon=True).start() def _show_resp(self, widget, resp): """将响应 dict 显示在 Text widget 中""" import json as _json if isinstance(resp, Exception): widget.delete("1.0", tk.END) widget.insert(tk.END, f"Error: {resp}") else: widget.delete("1.0", tk.END) widget.insert(tk.END, _json.dumps(resp, indent=2, ensure_ascii=False)) def _show_label(self, label_widget, resp): if isinstance(resp, Exception): label_widget.config(text=str(resp), foreground="red") else: code = resp.get("code", -1) msg = resp.get("msg", "") if code == 0: label_widget.config(text=f"✓ {msg}", foreground="green") else: label_widget.config(text=f"✗ [{code}] {msg}", foreground="red") # ================================================================== # Button handlers # ================================================================== def _toggle_connect(self): if self.client.is_connected: self.client.disconnect() self._set_status(False) else: host = self.host_var.get().strip() port = self.port_var.get() try: self.client.connect(host, port) self._set_status(True) except Exception as e: messagebox.showerror("Connection Error", str(e)) # Auth def _do_pwd_verify(self): pwd = self.pwd_var.get().strip() self._bg_run(lambda: self.client.pwd_verify(pwd), lambda r: messagebox.showinfo("鉴权结果", "✓ 鉴权成功" if (not isinstance(r, Exception) and r.get("code") == 0) else f"✗ {r}")) # Device Info def _do_dev_info_query(self): self._bg_run(lambda: self.client.dev_info_query(), lambda r: self._show_resp(self.devinfo_text, r)) def _do_dev_serial_set(self): s = self.serial_var.get().strip() if len(s) != 12: messagebox.showerror("Error", "序列码必须为12位HEX") return self._bg_run(lambda: self.client.dev_serial_set(s), lambda r: self._show_resp(self.devinfo_text, r)) # SSC Network def _do_ssc_net_query(self): self._bg_run(lambda: self.client.ssc_net_query(), lambda r: self._show_label(self.net_resp, r) if isinstance(r, Exception) else self._fill_net_vars(r.get("data", {}))) def _fill_net_vars(self, data: dict): self.net_vars["dev_ip"].set(data.get("dev_ip", "")) self.net_vars["subnet"].set(data.get("subnet_mask", "")) self.net_vars["route_ip"].set(data.get("route_ip", "")) self.net_vars["lssc_ip"].set(data.get("lssc_ip", "")) self.net_vars["dns"].set(data.get("dns", "")) self.net_vars["ssc_port"].set(str(data.get("port", ""))) self.net_resp.config(text="✓ 查询成功", foreground="green") def _do_ssc_net_set(self): port_str = self.net_vars["ssc_port"].get().strip() port = int(port_str) if port_str else 0 self._bg_run( lambda: self.client.ssc_net_set( dev_ip=self.net_vars["dev_ip"].get().strip(), subnet_mask=self.net_vars["subnet"].get().strip(), route_ip=self.net_vars["route_ip"].get().strip(), lssc_ip=self.net_vars["lssc_ip"].get().strip(), dns=self.net_vars["dns"].get().strip(), port=port), lambda r: self._show_label(self.net_resp, r)) # IoT def _do_iot_net_query(self): self._bg_run(lambda: self.client.iot_net_query(), lambda r: self._show_label(self.iot_resp, r) if isinstance(r, Exception) else self._fill_iot_vars(r.get("data", {}))) def _fill_iot_vars(self, data: dict): self.iot_vars["iot_host"].set(data.get("host", "")) self.iot_vars["iot_port"].set(str(data.get("port", ""))) self.iot_vars["iot_client_id"].set(data.get("client_id", "")) self.iot_vars["iot_user"].set(data.get("username", "")) self.iot_vars["iot_pwd"].set(data.get("password", "")) self.iot_resp.config(text="✓ 查询成功", foreground="green") def _do_iot_net_set(self): port_str = self.iot_vars["iot_port"].get().strip() port = int(port_str) if port_str else 0 self._bg_run( lambda: self.client.iot_net_set( host=self.iot_vars["iot_host"].get().strip(), port=port, client_id=self.iot_vars["iot_client_id"].get().strip(), username=self.iot_vars["iot_user"].get().strip(), password=self.iot_vars["iot_pwd"].get().strip()), lambda r: self._show_label(self.iot_resp, r)) def _do_iot_topic_query(self): self._bg_run(lambda: self.client.iot_topic_query(), lambda r: self._show_label(self.iot_resp, r) if isinstance(r, Exception) else self._fill_topic_vars(r.get("data", {}))) def _fill_topic_vars(self, data: dict): self.iot_vars["topic_pub"].set(data.get("topic_pub", "")) self.iot_vars["topic_sub"].set(data.get("topic_sub", "")) self.iot_cid_var.set(data.get("client_id_enable", False)) self.iot_resp.config(text="✓ 查询成功", foreground="green") def _do_iot_topic_set(self): self._bg_run( lambda: self.client.iot_topic_set( client_id_enable=self.iot_cid_var.get(), topic_pub=self.iot_vars["topic_pub"].get().strip(), topic_sub=self.iot_vars["topic_sub"].get().strip()), lambda r: self._show_label(self.iot_resp, r)) # Loop def _do_loop_param_query(self): self._bg_run(lambda: self.client.loop_param_query(), lambda r: self._show_resp(self.loop_text, r)) # System def _do_pwd_set(self): old = self.old_pwd_var.get().strip() new = self.new_pwd_var.get().strip() if len(old) != 6 or len(new) != 6: messagebox.showerror("Error", "密码必须为6位数字") return self._bg_run(lambda: self.client.pwd_set(old, new), lambda r: self._show_label(self.sys_resp, r)) def _do_device_reset(self): if not messagebox.askyesno("确认", "设备复位后连接将断开,确定?"): return self._bg_run(lambda: self.client.device_reset(), lambda r: self._set_status(False)) def _do_factory_reset(self): if not messagebox.askyesno("确认", "恢复出厂设置将清除所有配置并复位,确定?"): return self._bg_run(lambda: self.client.factory_reset(), lambda r: self._set_status(False)) # Raw JSON def _do_raw_send(self): raw = self.raw_input.get("sel.first", "sel.last") if not raw: raw = self.raw_input.get("1.0", tk.END) for line in raw.strip().split("\n"): line = line.strip() if not line: continue try: import json as _json obj = _json.loads(line) cmd = obj.get("cmd", "unknown") data = obj.get("data") self._bg_run( lambda c=cmd, d=data: self.client._tcp.send_command(c, d), lambda r: self._show_resp(self.raw_resp, r)) except Exception as e: self.raw_resp.insert(tk.END, f"Parse error: {e}\n") # Push handlers def on_loop_data(self, cmd, data): self.log(f"<<< PUSH loop_data: {data}") def on_event_report(self, cmd, data): self.log(f"<<< PUSH event_report: {data}") def main(): root = tk.Tk() app = DBNetApp(root) # 窗口关闭时断开连接 def on_close(): if app.client.is_connected: app.client.disconnect() root.destroy() root.protocol("WM_DELETE_WINDOW", on_close) root.mainloop() if __name__ == "__main__": main()