From 8b4404d5b37171e11b9486d41dfdafff94a1bcbe Mon Sep 17 00:00:00 2001 From: wangfq Date: Tue, 30 Jun 2026 15:32:57 +0800 Subject: [PATCH] =?UTF-8?q?feat(DBNetClient):=20TCP=20JSON=20=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE=E6=A1=8C=E9=9D=A2=E6=B5=8B=E8=AF=95=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - tcp_json_client.py: 协议客户端库 — 行分隔JSON, 请求-响应, 主动推送接收 - main.py: tkinter 跨平台 GUI — 7个标签页覆盖全部15条命令 - 虚拟环境: venv/, 无额外依赖(tkinter 标准库) - 支持: 鉴权/设备信息/网络配置/IoT配置/线圈参数/系统操作/Raw JSON --- DBNetClient/.gitignore | 3 + DBNetClient/main.py | 491 +++++++++++++++++++++++++++++++++ DBNetClient/requirements.txt | 3 + DBNetClient/tcp_json_client.py | 287 +++++++++++++++++++ 4 files changed, 784 insertions(+) create mode 100644 DBNetClient/.gitignore create mode 100644 DBNetClient/main.py create mode 100644 DBNetClient/requirements.txt create mode 100644 DBNetClient/tcp_json_client.py diff --git a/DBNetClient/.gitignore b/DBNetClient/.gitignore new file mode 100644 index 0000000..d0ee3b1 --- /dev/null +++ b/DBNetClient/.gitignore @@ -0,0 +1,3 @@ +venv/ +__pycache__/ +*.pyc diff --git a/DBNetClient/main.py b/DBNetClient/main.py new file mode 100644 index 0000000..449b29c --- /dev/null +++ b/DBNetClient/main.py @@ -0,0 +1,491 @@ +#!/usr/bin/env python3 +""" +DBNetClient — DLD960 TCP JSON 协议测试工具 +跨平台桌面应用 (tkinter),用于测试 vd960DBN 的网络接口协议。 +""" + +import sys +import os +import threading +import tkinter as tk +from tkinter import ttk, scrolledtext, messagebox +from tcp_json_client import DBNetClient, TcpJsonError + +APP_TITLE = "DBNetClient — DLD960 TCP JSON 协议测试工具" + + +class DBNetApp: + def __init__(self, root: tk.Tk): + self.root = root + self.root.title(APP_TITLE) + self.root.geometry("960x720") + self.root.minsize(800, 600) + + self.client = DBNetClient(log_callback=self.log) + self.client.on_push("loop_data", self.on_loop_data) + self.client.on_push("event_report", self.on_event_report) + + self._build_ui() + self._set_defaults() + + # ================================================================== + # UI + # ================================================================== + + def _build_ui(self): + # ---- Top: Connection bar ---- + top = ttk.Frame(self.root, padding=5) + top.pack(fill=tk.X) + + ttk.Label(top, text="Host:").pack(side=tk.LEFT) + self.host_var = tk.StringVar(value="192.168.1.188") + ttk.Entry(top, textvariable=self.host_var, width=14).pack(side=tk.LEFT, padx=2) + + ttk.Label(top, text="Port:").pack(side=tk.LEFT, padx=(8, 0)) + self.port_var = tk.IntVar(value=5960) + ttk.Entry(top, textvariable=self.port_var, width=6).pack(side=tk.LEFT, padx=2) + + self.btn_conn = ttk.Button(top, text="Connect", command=self._toggle_connect) + self.btn_conn.pack(side=tk.LEFT, padx=8) + + self.conn_status = ttk.Label(top, text="● Disconnected", foreground="red") + self.conn_status.pack(side=tk.LEFT, padx=5) + + ttk.Separator(self.root, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=2) + + # ---- Notebook ---- + nb = ttk.Notebook(self.root) + nb.pack(fill=tk.BOTH, expand=True, padx=5, pady=2) + + nb.add(self._tab_auth(nb), text="鉴权") + nb.add(self._tab_devinfo(nb), text="设备信息") + nb.add(self._tab_network(nb), text="网络配置") + nb.add(self._tab_iot(nb), text="IoT 配置") + nb.add(self._tab_loop(nb), text="线圈参数") + nb.add(self._tab_system(nb), text="系统") + nb.add(self._tab_raw(nb), text="Raw JSON") + + # ---- Bottom: Log ---- + log_frame = ttk.LabelFrame(self.root, text="日志 / 原始响应", padding=2) + log_frame.pack(fill=tk.BOTH, expand=True, padx=5, pady=(0, 5)) + + self.log_text = scrolledtext.ScrolledText(log_frame, height=8, wrap=tk.WORD, + font=("Consolas", 9)) + self.log_text.pack(fill=tk.BOTH, expand=True) + self.log_text.tag_configure("sent", foreground="blue") + self.log_text.tag_configure("recv", foreground="green") + self.log_text.tag_configure("error", foreground="red") + self.log_text.tag_configure("push", foreground="purple") + + # ---- Bottom buttons ---- + btn_frame = ttk.Frame(self.root, padding=5) + btn_frame.pack(fill=tk.X) + ttk.Button(btn_frame, text="Clear Log", command=self._clear_log).pack(side=tk.RIGHT) + + def _set_defaults(self): + self._entry_set(self.pwd_var, "123456") + + # ---- Tab builders ---- + + def _tab_auth(self, parent): + f = ttk.Frame(parent, padding=10) + ttk.Label(f, text="设备密码 (6位数字):").grid(row=0, column=0, sticky=tk.W, pady=5) + self.pwd_var = tk.StringVar() + ttk.Entry(f, textvariable=self.pwd_var, width=12, show="*").grid(row=0, column=1, sticky=tk.W, padx=5) + ttk.Button(f, text="鉴权 (pwd_verify)", command=self._do_pwd_verify).grid(row=0, column=2, padx=10) + + ttk.Separator(f, orient=tk.HORIZONTAL).grid(row=1, column=0, columnspan=3, + sticky=tk.EW, pady=10) + + ttk.Label(f, text="连接后必须先鉴权,才能执行其他命令。\n连续3次错误将锁定60秒。", + foreground="gray").grid(row=2, column=0, columnspan=3, pady=5) + return f + + def _tab_devinfo(self, parent): + f = ttk.Frame(parent, padding=10) + ttk.Button(f, text="查询设备信息", command=self._do_dev_info_query).pack(anchor=tk.W, pady=3) + + ttk.Separator(f, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=8) + + ttk.Label(f, text="更改设备序列码 (12位HEX):").pack(anchor=tk.W) + row = ttk.Frame(f) + row.pack(fill=tk.X, pady=5) + self.serial_var = tk.StringVar() + ttk.Entry(row, textvariable=self.serial_var, width=18).pack(side=tk.LEFT, padx=5) + ttk.Button(row, text="设置", command=self._do_dev_serial_set).pack(side=tk.LEFT) + + ttk.Separator(f, orient=tk.HORIZONTAL).pack(fill=tk.X, pady=8) + + # Response display + self.devinfo_text = scrolledtext.ScrolledText(f, height=10, wrap=tk.WORD, + font=("Consolas", 9)) + self.devinfo_text.pack(fill=tk.BOTH, expand=True) + return f + + def _tab_network(self, parent): + f = ttk.Frame(parent, padding=10) + # SSC Net fields + fields = [ + ("设备 IP:", "dev_ip"), + ("子网掩码:", "subnet"), + ("网关:", "route_ip"), + ("LSSC IP:", "lssc_ip"), + ("DNS:", "dns"), + ("LSSC Port:", "ssc_port"), + ] + self.net_vars = {} + for i, (label, key) in enumerate(fields): + ttk.Label(f, text=label).grid(row=i, column=0, sticky=tk.W, pady=2) + var = tk.StringVar() + self.net_vars[key] = var + ttk.Entry(f, textvariable=var, width=18).grid(row=i, column=1, sticky=tk.W, padx=5) + + btn_row = ttk.Frame(f) + btn_row.grid(row=len(fields), column=0, columnspan=2, pady=10) + ttk.Button(btn_row, text="查询 (ssc_net_query)", command=self._do_ssc_net_query).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_row, text="设置 (ssc_net_set)", command=self._do_ssc_net_set).pack(side=tk.LEFT, padx=5) + + self.net_resp = ttk.Label(f, text="", foreground="gray") + self.net_resp.grid(row=len(fields) + 1, column=0, columnspan=2, pady=5) + return f + + def _tab_iot(self, parent): + f = ttk.Frame(parent, padding=10) + iot_fields = [ + ("MQTT Host:", "iot_host"), + ("MQTT Port:", "iot_port"), + ("Client ID:", "iot_client_id"), + ("Username:", "iot_user"), + ("Password:", "iot_pwd"), + ("Topic Pub:", "topic_pub"), + ("Topic Sub:", "topic_sub"), + ] + self.iot_vars = {} + for i, (label, key) in enumerate(iot_fields): + ttk.Label(f, text=label).grid(row=i, column=0, sticky=tk.W, pady=2) + var = tk.StringVar() + self.iot_vars[key] = var + ttk.Entry(f, textvariable=var, width=24).grid(row=i, column=1, sticky=tk.W, padx=5) + + self.iot_cid_var = tk.BooleanVar(value=False) + ttk.Checkbutton(f, text="Topic 追加 Client ID", + variable=self.iot_cid_var).grid(row=len(iot_fields), column=0, columnspan=2, pady=5) + + btn_row = ttk.Frame(f) + btn_row.grid(row=len(iot_fields) + 1, column=0, columnspan=2, pady=5) + ttk.Button(btn_row, text="IoT 查询", command=self._do_iot_net_query).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_row, text="IoT 设置", command=self._do_iot_net_set).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_row, text="Topic 查询", command=self._do_iot_topic_query).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_row, text="Topic 设置", command=self._do_iot_topic_set).pack(side=tk.LEFT, padx=5) + + self.iot_resp = ttk.Label(f, text="", foreground="gray") + self.iot_resp.grid(row=len(iot_fields) + 2, column=0, columnspan=2, pady=5) + return f + + def _tab_loop(self, parent): + f = ttk.Frame(parent, padding=10) + ttk.Button(f, text="查询线圈参数 (loop_param_query)", + command=self._do_loop_param_query).pack(anchor=tk.W, pady=3) + ttk.Label(f, text="参数设置通过 BLE/小程序,或 Raw JSON 标签页发送。", + foreground="gray").pack(anchor=tk.W, pady=5) + + self.loop_text = scrolledtext.ScrolledText(f, height=12, wrap=tk.WORD, + font=("Consolas", 9)) + self.loop_text.pack(fill=tk.BOTH, expand=True) + return f + + def _tab_system(self, parent): + f = ttk.Frame(parent, padding=10) + + # Change password + ttk.Label(f, text="修改设备密码:").grid(row=0, column=0, sticky=tk.W, pady=3) + ttk.Label(f, text="旧密码:").grid(row=1, column=0, sticky=tk.W) + self.old_pwd_var = tk.StringVar() + ttk.Entry(f, textvariable=self.old_pwd_var, width=12, show="*").grid(row=1, column=1, sticky=tk.W, padx=5) + ttk.Label(f, text="新密码:").grid(row=2, column=0, sticky=tk.W) + self.new_pwd_var = tk.StringVar() + ttk.Entry(f, textvariable=self.new_pwd_var, width=12, show="*").grid(row=2, column=1, sticky=tk.W, padx=5) + ttk.Button(f, text="修改密码", command=self._do_pwd_set).grid(row=2, column=2, padx=10) + + ttk.Separator(f, orient=tk.HORIZONTAL).grid(row=3, column=0, columnspan=3, + sticky=tk.EW, pady=10) + + # Reset + ttk.Label(f, text="⚠ 危险操作:", foreground="red").grid(row=4, column=0, sticky=tk.W) + btn_row = ttk.Frame(f) + btn_row.grid(row=5, column=0, columnspan=3, pady=5) + ttk.Button(btn_row, text="设备复位 (device_reset)", + command=self._do_device_reset).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_row, text="出厂初始化 (factory_reset)", + command=self._do_factory_reset).pack(side=tk.LEFT, padx=5) + + self.sys_resp = ttk.Label(f, text="", foreground="gray") + self.sys_resp.grid(row=6, column=0, columnspan=3, pady=5) + return f + + def _tab_raw(self, parent): + f = ttk.Frame(parent, padding=10) + ttk.Label(f, text="直接输入 JSON 命令(一行一条,不含换行符):").pack(anchor=tk.W) + self.raw_input = scrolledtext.ScrolledText(f, height=5, font=("Consolas", 10)) + self.raw_input.pack(fill=tk.X, pady=5) + self.raw_input.insert(tk.END, + '{"msg_id":1,"cmd":"dev_info_query","ts":0}\n' + '{"msg_id":1,"cmd":"pwd_verify","ts":0,"data":{"password":"123456"}}\n') + ttk.Button(f, text="发送选中行", command=self._do_raw_send).pack(anchor=tk.W) + + self.raw_resp = scrolledtext.ScrolledText(f, height=8, font=("Consolas", 9)) + self.raw_resp.pack(fill=tk.BOTH, expand=True, pady=5) + return f + + # ================================================================== + # Helpers + # ================================================================== + + def _entry_set(self, var, value): + var.set(value) + + def _check_set(self, var, value): + var.set(value) + + def log(self, msg: str): + """线程安全的日志输出""" + self.root.after(0, self._log_append, msg) + + def _log_append(self, msg: str): + tag = "recv" + if msg.startswith(">>>"): + tag = "sent" + elif msg.startswith("<<<"): + tag = "recv" if "code=" in msg else "push" + elif "error" in msg.lower() or "fail" in msg.lower(): + tag = "error" + self.log_text.insert(tk.END, msg + "\n", tag) + self.log_text.see(tk.END) + + def _clear_log(self): + self.log_text.delete("1.0", tk.END) + + def _set_status(self, connected: bool): + if connected: + self.conn_status.config(text="● Connected", foreground="green") + self.btn_conn.config(text="Disconnect") + else: + self.conn_status.config(text="● Disconnected", foreground="red") + self.btn_conn.config(text="Connect") + + def _bg_run(self, fn, on_done=None): + """在后台线程执行 fn,完成后回调 on_done(result)""" + def _run(): + try: + result = fn() + except TcpJsonError as e: + result = e + except Exception as e: + result = e + if on_done: + self.root.after(0, on_done, result) + threading.Thread(target=_run, daemon=True).start() + + def _show_resp(self, widget, resp): + """将响应 dict 显示在 Text widget 中""" + import json as _json + if isinstance(resp, Exception): + widget.delete("1.0", tk.END) + widget.insert(tk.END, f"Error: {resp}") + else: + widget.delete("1.0", tk.END) + widget.insert(tk.END, _json.dumps(resp, indent=2, ensure_ascii=False)) + + def _show_label(self, label_widget, resp): + if isinstance(resp, Exception): + label_widget.config(text=str(resp), foreground="red") + else: + code = resp.get("code", -1) + msg = resp.get("msg", "") + if code == 0: + label_widget.config(text=f"✓ {msg}", foreground="green") + else: + label_widget.config(text=f"✗ [{code}] {msg}", foreground="red") + + # ================================================================== + # Button handlers + # ================================================================== + + def _toggle_connect(self): + if self.client.is_connected: + self.client.disconnect() + self._set_status(False) + else: + host = self.host_var.get().strip() + port = self.port_var.get() + try: + self.client.connect(host, port) + self._set_status(True) + except Exception as e: + messagebox.showerror("Connection Error", str(e)) + + # Auth + def _do_pwd_verify(self): + pwd = self.pwd_var.get().strip() + self._bg_run(lambda: self.client.pwd_verify(pwd), + lambda r: messagebox.showinfo("鉴权结果", + "✓ 鉴权成功" if (not isinstance(r, Exception) and r.get("code") == 0) + else f"✗ {r}")) + + # Device Info + def _do_dev_info_query(self): + self._bg_run(lambda: self.client.dev_info_query(), + lambda r: self._show_resp(self.devinfo_text, r)) + + def _do_dev_serial_set(self): + s = self.serial_var.get().strip() + if len(s) != 12: + messagebox.showerror("Error", "序列码必须为12位HEX") + return + self._bg_run(lambda: self.client.dev_serial_set(s), + lambda r: self._show_resp(self.devinfo_text, r)) + + # SSC Network + def _do_ssc_net_query(self): + self._bg_run(lambda: self.client.ssc_net_query(), + lambda r: self._show_label(self.net_resp, r) if isinstance(r, Exception) + else self._fill_net_vars(r.get("data", {}))) + + def _fill_net_vars(self, data: dict): + self.net_vars["dev_ip"].set(data.get("dev_ip", "")) + self.net_vars["subnet"].set(data.get("subnet_mask", "")) + self.net_vars["route_ip"].set(data.get("route_ip", "")) + self.net_vars["lssc_ip"].set(data.get("lssc_ip", "")) + self.net_vars["dns"].set(data.get("dns", "")) + self.net_vars["ssc_port"].set(str(data.get("port", ""))) + self.net_resp.config(text="✓ 查询成功", foreground="green") + + def _do_ssc_net_set(self): + port_str = self.net_vars["ssc_port"].get().strip() + port = int(port_str) if port_str else 0 + self._bg_run( + lambda: self.client.ssc_net_set( + dev_ip=self.net_vars["dev_ip"].get().strip(), + subnet_mask=self.net_vars["subnet"].get().strip(), + route_ip=self.net_vars["route_ip"].get().strip(), + lssc_ip=self.net_vars["lssc_ip"].get().strip(), + dns=self.net_vars["dns"].get().strip(), + port=port), + lambda r: self._show_label(self.net_resp, r)) + + # IoT + def _do_iot_net_query(self): + self._bg_run(lambda: self.client.iot_net_query(), + lambda r: self._show_label(self.iot_resp, r) if isinstance(r, Exception) + else self._fill_iot_vars(r.get("data", {}))) + + def _fill_iot_vars(self, data: dict): + self.iot_vars["iot_host"].set(data.get("host", "")) + self.iot_vars["iot_port"].set(str(data.get("port", ""))) + self.iot_vars["iot_client_id"].set(data.get("client_id", "")) + self.iot_vars["iot_user"].set(data.get("username", "")) + self.iot_vars["iot_pwd"].set(data.get("password", "")) + self.iot_resp.config(text="✓ 查询成功", foreground="green") + + def _do_iot_net_set(self): + port_str = self.iot_vars["iot_port"].get().strip() + port = int(port_str) if port_str else 0 + self._bg_run( + lambda: self.client.iot_net_set( + host=self.iot_vars["iot_host"].get().strip(), + port=port, + client_id=self.iot_vars["iot_client_id"].get().strip(), + username=self.iot_vars["iot_user"].get().strip(), + password=self.iot_vars["iot_pwd"].get().strip()), + lambda r: self._show_label(self.iot_resp, r)) + + def _do_iot_topic_query(self): + self._bg_run(lambda: self.client.iot_topic_query(), + lambda r: self._show_label(self.iot_resp, r) if isinstance(r, Exception) + else self._fill_topic_vars(r.get("data", {}))) + + def _fill_topic_vars(self, data: dict): + self.iot_vars["topic_pub"].set(data.get("topic_pub", "")) + self.iot_vars["topic_sub"].set(data.get("topic_sub", "")) + self.iot_cid_var.set(data.get("client_id_enable", False)) + self.iot_resp.config(text="✓ 查询成功", foreground="green") + + def _do_iot_topic_set(self): + self._bg_run( + lambda: self.client.iot_topic_set( + client_id_enable=self.iot_cid_var.get(), + topic_pub=self.iot_vars["topic_pub"].get().strip(), + topic_sub=self.iot_vars["topic_sub"].get().strip()), + lambda r: self._show_label(self.iot_resp, r)) + + # Loop + def _do_loop_param_query(self): + self._bg_run(lambda: self.client.loop_param_query(), + lambda r: self._show_resp(self.loop_text, r)) + + # System + def _do_pwd_set(self): + old = self.old_pwd_var.get().strip() + new = self.new_pwd_var.get().strip() + if len(old) != 6 or len(new) != 6: + messagebox.showerror("Error", "密码必须为6位数字") + return + self._bg_run(lambda: self.client.pwd_set(old, new), + lambda r: self._show_label(self.sys_resp, r)) + + def _do_device_reset(self): + if not messagebox.askyesno("确认", "设备复位后连接将断开,确定?"): + return + self._bg_run(lambda: self.client.device_reset(), + lambda r: self._set_status(False)) + + def _do_factory_reset(self): + if not messagebox.askyesno("确认", "恢复出厂设置将清除所有配置并复位,确定?"): + return + self._bg_run(lambda: self.client.factory_reset(), + lambda r: self._set_status(False)) + + # Raw JSON + def _do_raw_send(self): + raw = self.raw_input.get("sel.first", "sel.last") + if not raw: + raw = self.raw_input.get("1.0", tk.END) + for line in raw.strip().split("\n"): + line = line.strip() + if not line: + continue + try: + import json as _json + obj = _json.loads(line) + cmd = obj.get("cmd", "unknown") + data = obj.get("data") + self._bg_run( + lambda c=cmd, d=data: self.client._tcp.send_command(c, d), + lambda r: self._show_resp(self.raw_resp, r)) + except Exception as e: + self.raw_resp.insert(tk.END, f"Parse error: {e}\n") + + # Push handlers + def on_loop_data(self, cmd, data): + self.log(f"<<< PUSH loop_data: {data}") + + def on_event_report(self, cmd, data): + self.log(f"<<< PUSH event_report: {data}") + + +def main(): + root = tk.Tk() + app = DBNetApp(root) + + # 窗口关闭时断开连接 + def on_close(): + if app.client.is_connected: + app.client.disconnect() + root.destroy() + + root.protocol("WM_DELETE_WINDOW", on_close) + root.mainloop() + + +if __name__ == "__main__": + main() diff --git a/DBNetClient/requirements.txt b/DBNetClient/requirements.txt new file mode 100644 index 0000000..4a94f1c --- /dev/null +++ b/DBNetClient/requirements.txt @@ -0,0 +1,3 @@ +# DBNetClient — DLD960 TCP JSON 协议测试工具 +# 无需额外依赖,tkinter 为 Python 标准库 +# 如果 Linux 系统缺少 tkinter:sudo apt install python3-tk diff --git a/DBNetClient/tcp_json_client.py b/DBNetClient/tcp_json_client.py new file mode 100644 index 0000000..49b45bd --- /dev/null +++ b/DBNetClient/tcp_json_client.py @@ -0,0 +1,287 @@ +""" +TCP JSON Protocol Client — DLD960 TCP JSON 协议客户端 + +行分隔 JSON 帧,请求-响应模式,自动 msg_id 递增。 +""" + +import json +import socket +import threading +import time + + +class TcpJsonError(Exception): + """协议错误""" + pass + + +class TcpJsonClient: + """DLD960 TCP JSON 协议客户端""" + + def __init__(self, log_callback=None): + self._sock: socket.socket | None = None + self._msg_id = 0 + self._buf = b"" + self._lock = threading.Lock() + self._running = False + self._recv_thread: threading.Thread | None = None + self._pending: dict[int, dict] = {} # msg_id -> response future + self._push_handlers: dict[str, callable] = {} + self._log = log_callback or (lambda msg: None) + + # ---- Connection ---- + + def connect(self, host: str, port: int = 5960, timeout: float = 5.0) -> None: + """建立 TCP 连接""" + if self._sock: + self.disconnect() + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.settimeout(timeout) + self._sock.connect((host, port)) + self._sock.settimeout(None) # 接收线程用阻塞模式 + self._running = True + self._buf = b"" + self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True) + self._recv_thread.start() + self._log(f"Connected to {host}:{port}") + + def disconnect(self) -> None: + """断开连接""" + self._running = False + if self._sock: + try: + self._sock.shutdown(socket.SHUT_RDWR) + except OSError: + pass + self._sock.close() + self._sock = None + self._buf = b"" + with self._lock: + # 唤醒所有等待的请求 + for mid, fut in self._pending.items(): + fut["error"] = "disconnected" + fut["event"].set() + self._pending.clear() + self._log("Disconnected") + + @property + def is_connected(self) -> bool: + return self._sock is not None and self._running + + # ---- Sending ---- + + def _next_msg_id(self) -> int: + self._msg_id += 1 + return self._msg_id + + def send_command(self, cmd: str, data: dict | None = None, + timeout: float = 5.0) -> dict: + """发送命令并等待响应,返回完整响应 dict""" + if not self.is_connected: + raise TcpJsonError("Not connected") + + msg_id = self._next_msg_id() + frame = {"msg_id": msg_id, "cmd": cmd, "ts": int(time.time())} + if data: + frame["data"] = data + + raw = json.dumps(frame, separators=(",", ":")) + "\n" + event = threading.Event() + with self._lock: + self._pending[msg_id] = {"event": event, "response": None, "error": None} + + try: + self._sock.sendall(raw.encode("utf-8")) + self._log(f">>> {cmd} (msg_id={msg_id})") + except OSError as e: + with self._lock: + self._pending.pop(msg_id, None) + raise TcpJsonError(f"Send failed: {e}") + + if not event.wait(timeout): + with self._lock: + self._pending.pop(msg_id, None) + raise TcpJsonError(f"Timeout waiting for response to {cmd}") + + with self._lock: + fut = self._pending.pop(msg_id, None) + if fut is None: + raise TcpJsonError("Response already consumed") + if fut["error"]: + raise TcpJsonError(fut["error"]) + return fut["response"] + + # ---- Push handlers ---- + + def on_push(self, cmd: str, handler: callable) -> None: + """注册主动推送处理器 handler(cmd, data_dict)""" + self._push_handlers[cmd] = handler + + # ---- Receive loop ---- + + def _recv_loop(self) -> None: + """后台接收线程""" + while self._running: + try: + data = self._sock.recv(4096) + if not data: + self._log("Connection closed by server") + break + self._buf += data + self._parse_frames() + except OSError: + if self._running: + self._log("Receive error") + break + + def _parse_frames(self) -> None: + """从缓冲区提取完整 JSON 帧并分发""" + while True: + idx = self._buf.find(b"\n") + if idx == -1: + break + raw = self._buf[:idx].decode("utf-8", errors="replace") + self._buf = self._buf[idx + 1:] + if not raw.strip(): + continue + try: + obj = json.loads(raw) + except json.JSONDecodeError: + self._log(f"<<< (invalid JSON) {raw[:80]}") + continue + + cmd = obj.get("cmd", "") + msg_id = obj.get("msg_id", 0) + # 检查是否有 code 字段(响应帧) + if "code" in obj: + code = obj.get("code", -1) + self._log(f"<<< {cmd} code={code} msg_id={msg_id}") + with self._lock: + fut = self._pending.get(msg_id) + if fut: + fut["response"] = obj + fut["event"].set() + else: + # 主动推送帧 + self._log(f"<<< PUSH {cmd} msg_id={msg_id}") + handler = self._push_handlers.get(cmd) + if handler: + try: + handler(cmd, obj.get("data", {})) + except Exception as e: + self._log(f"Push handler error: {e}") + + +# ---- 便捷高层 API ---- + +class DBNetClient: + """DLD960 设备 TCP JSON 协议高层接口""" + + def __init__(self, log_callback=None): + self._tcp = TcpJsonClient(log_callback) + + # Connection + def connect(self, host: str, port: int = 5960) -> None: + self._tcp.connect(host, port) + + def disconnect(self) -> None: + self._tcp.disconnect() + + @property + def is_connected(self) -> bool: + return self._tcp.is_connected + + # Auth + def pwd_verify(self, password: str) -> dict: + return self._tcp.send_command("pwd_verify", {"password": password}) + + # Device Info + def dev_info_query(self) -> dict: + return self._tcp.send_command("dev_info_query") + + def dev_serial_set(self, dev_serial: str) -> dict: + return self._tcp.send_command("dev_serial_set", {"dev_serial": dev_serial}) + + # SSC Network + def ssc_net_query(self) -> dict: + return self._tcp.send_command("ssc_net_query") + + def ssc_net_set(self, dev_ip: str = "", subnet_mask: str = "", + route_ip: str = "", lssc_ip: str = "", + dns: str = "", port: int = 0) -> dict: + data = {} + if dev_ip: + data["dev_ip"] = dev_ip + if subnet_mask: + data["subnet_mask"] = subnet_mask + if route_ip: + data["route_ip"] = route_ip + if lssc_ip: + data["lssc_ip"] = lssc_ip + if dns: + data["dns"] = dns + if port: + data["port"] = port + return self._tcp.send_command("ssc_net_set", data) + + # IoT Network + def iot_net_query(self) -> dict: + return self._tcp.send_command("iot_net_query") + + def iot_net_set(self, host: str = "", port: int = 0, + client_id: str = "", username: str = "", + password: str = "") -> dict: + data = {} + if host: + data["host"] = host + if port: + data["port"] = port + if client_id: + data["client_id"] = client_id + if username: + data["username"] = username + if password: + data["password"] = password + return self._tcp.send_command("iot_net_set", data) + + # IoT Topics + def iot_topic_query(self) -> dict: + return self._tcp.send_command("iot_topic_query") + + def iot_topic_set(self, client_id_enable: bool = None, + topic_pub: str = "", topic_sub: str = "") -> dict: + data = {} + if client_id_enable is not None: + data["client_id_enable"] = client_id_enable + if topic_pub: + data["topic_pub"] = topic_pub + if topic_sub: + data["topic_sub"] = topic_sub + return self._tcp.send_command("iot_topic_set", data) + + # Password + def pwd_set(self, old_password: str, new_password: str) -> dict: + return self._tcp.send_command("pwd_set", + {"old_password": old_password, + "new_password": new_password}) + + # System + def factory_reset(self) -> dict: + return self._tcp.send_command("factory_reset") + + def device_reset(self) -> dict: + return self._tcp.send_command("device_reset") + + # Loop + def loop_param_query(self) -> dict: + return self._tcp.send_command("loop_param_query") + + def loop_param_set(self, channels: list[dict], + auto_mode: bool = False) -> dict: + return self._tcp.send_command("loop_param_set", + {"auto_mode": auto_mode, + "channels": channels}) + + # Push + def on_push(self, cmd: str, handler: callable) -> None: + self._tcp.on_push(cmd, handler)