From bac099c805d73b43429a7039c7a21b50dfa381f2 Mon Sep 17 00:00:00 2001 From: walkpan Date: Sun, 29 Mar 2026 21:48:08 +0800 Subject: [PATCH] feat: CLI commands for device management, broker monitoring, and serve MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/mqtt_home/cli.py | 248 +++++++++++++++++++++++++++++++++++++++++++ tests/test_cli.py | 27 +++++ 2 files changed, 275 insertions(+) create mode 100644 src/mqtt_home/cli.py create mode 100644 tests/test_cli.py diff --git a/src/mqtt_home/cli.py b/src/mqtt_home/cli.py new file mode 100644 index 0000000..6b4cae5 --- /dev/null +++ b/src/mqtt_home/cli.py @@ -0,0 +1,248 @@ +import asyncio +import click + +from mqtt_home.config import get_settings +from mqtt_home.database import init_db, get_session_factory +from mqtt_home.emqx_api import EmqxApiClient +from mqtt_home.device_registry import ( + list_devices, get_device, create_device, delete_device, + send_command, get_device_logs, +) +from mqtt_home.schemas import DeviceCreate +from mqtt_home.mqtt_client import MqttClient + + +def run_async(coro): + asyncio.run(coro) + + +@click.group() +def cli(): + """MQTT 智能家居管理工具""" + pass + + +@cli.group() +def device(): + """设备管理""" + pass + + +@device.command("list") +def device_list(): + """列出所有设备""" + async def _run(): + await init_db() + factory = get_session_factory() + async with factory() as db: + devices = await list_devices(db) + if not devices: + click.echo("暂无设备") + return + for d in devices: + status = "ON " if d.is_online else "OFF" + state = d.state or "-" + click.echo(f"[{status}] [{d.id[:8]}] {d.name} ({d.type}) state={state}") + + run_async(_run()) + + +@device.command("add") +@click.option("--name", required=True, help="设备名称") +@click.option("--type", "device_type", default="switch", help="设备类型") +@click.option("--state-topic", required=True, help="状态主题") +@click.option("--command-topic", default=None, help="命令主题") +def device_add(name, device_type, state_topic, command_topic): + """手动添加设备""" + async def _run(): + await init_db() + factory = get_session_factory() + async with factory() as db: + d = await create_device(db, DeviceCreate( + name=name, + type=device_type, + mqtt_topic=state_topic, + command_topic=command_topic, + )) + click.echo(f"设备已创建: {d.id} - {d.name}") + + run_async(_run()) + + +@device.command("info") +@click.argument("device_id") +def device_info(device_id): + """查看设备详情""" + async def _run(): + await init_db() + factory = get_session_factory() + async with factory() as db: + d = await get_device(db, device_id) + if not d: + click.echo(f"设备不存在: {device_id}", err=True) + return + click.echo(f"ID: {d.id}") + click.echo(f"名称: {d.name}") + click.echo(f"类型: {d.type}") + click.echo(f"协议: {d.protocol}") + click.echo(f"状态主题: {d.mqtt_topic}") + click.echo(f"命令主题: {d.command_topic or '无'}") + click.echo(f"当前状态: {d.state or '无'}") + click.echo(f"在线: {'是' if d.is_online else '否'}") + click.echo(f"最后活跃: {d.last_seen or '从未'}") + + run_async(_run()) + + +@device.command("remove") +@click.argument("device_id") +def device_remove(device_id): + """删除设备""" + async def _run(): + await init_db() + factory = get_session_factory() + async with factory() as db: + if await delete_device(db, device_id): + click.echo(f"设备已删除: {device_id}") + else: + click.echo(f"设备不存在: {device_id}", err=True) + + run_async(_run()) + + +@device.command("command") +@click.argument("device_id") +@click.option("--payload", required=True, help="命令内容(JSON)") +def device_command(device_id, payload): + """向设备发送命令""" + async def _run(): + settings = get_settings() + await init_db() + factory = get_session_factory() + + import aiomqtt + async with aiomqtt.Client( + hostname=settings.mqtt_host, + port=settings.mqtt_port, + username=settings.mqtt_username or None, + password=settings.mqtt_password or None, + ) as client: + async def publish_fn(topic, p): + await client.publish(topic, p, qos=1) + + async with factory() as db: + try: + log = await send_command(db, device_id, payload, publish_fn=publish_fn) + if log: + click.echo(f"命令已发送到 {log.topic}: {log.payload}") + else: + click.echo(f"设备不存在: {device_id}", err=True) + except ValueError as e: + click.echo(str(e), err=True) + + run_async(_run()) + + +@device.command("logs") +@click.argument("device_id") +@click.option("--limit", default=20, help="日志条数") +def device_logs(device_id, limit): + """查看设备消息日志""" + async def _run(): + await init_db() + factory = get_session_factory() + async with factory() as db: + logs = await get_device_logs(db, device_id, limit) + if not logs: + click.echo("暂无日志") + return + for log in logs: + direction = "RX" if log.direction == "rx" else "TX" + click.echo(f"[{log.timestamp}] {direction} {log.topic} | {log.payload}") + + run_async(_run()) + + +@cli.group() +def broker(): + """Broker 管理""" + pass + + +@broker.command("status") +def broker_status(): + """查看 Broker 状态""" + async def _run(): + settings = get_settings() + emqx = EmqxApiClient(settings) + try: + status = await emqx.get_broker_status() + click.echo(f"版本: {status.get('version', 'unknown')}") + click.echo(f"运行时间: {status.get('uptime', 0)}s") + metrics = await emqx.get_metrics() + click.echo(f"连接数: {metrics.get('connections.count', 0)}") + click.echo(f"订阅数: {metrics.get('subscriptions.count', 0)}") + click.echo(f"主题数: {metrics.get('topics.count', 0)}") + except Exception as e: + click.echo(f"连接 EMQX 失败: {e}", err=True) + finally: + await emqx.close() + + run_async(_run()) + + +@broker.command("clients") +def broker_clients(): + """列出已连接客户端""" + async def _run(): + settings = get_settings() + emqx = EmqxApiClient(settings) + try: + clients = await emqx.get_clients() + if not clients: + click.echo("暂无客户端连接") + return + for c in clients: + status = "在线" if c.get("connected") else "离线" + click.echo(f"[{c.get('clientid')}] {status} ip={c.get('ip_address')}") + except Exception as e: + click.echo(f"连接 EMQX 失败: {e}", err=True) + finally: + await emqx.close() + + run_async(_run()) + + +@broker.command("topics") +def broker_topics(): + """列出活跃主题""" + async def _run(): + settings = get_settings() + emqx = EmqxApiClient(settings) + try: + topics = await emqx.get_topics() + if not topics: + click.echo("暂无活跃主题") + return + for t in topics: + click.echo(t.get("topic", "")) + except Exception as e: + click.echo(f"连接 EMQX 失败: {e}", err=True) + finally: + await emqx.close() + + run_async(_run()) + + +@cli.command("serve") +def serve(): + """启动 Web 服务和 MQTT 客户端""" + import uvicorn + from mqtt_home.config import get_settings + settings = get_settings() + uvicorn.run( + "mqtt_home.main:app", + host=settings.web_host, + port=settings.web_port, + reload=False, + ) diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..a120e5c --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,27 @@ +import pytest +from click.testing import CliRunner +from mqtt_home.cli import cli + + +def test_device_list_empty(monkeypatch): + runner = CliRunner() + + async def mock_init(): + pass + + async def mock_list(db): + return [] + + monkeypatch.setattr("mqtt_home.cli.init_db", mock_init) + monkeypatch.setattr("mqtt_home.cli.list_devices", mock_list) + + result = runner.invoke(cli, ["device", "list"]) + assert result.exit_code == 0 + assert "暂无设备" in result.output + + +def test_cli_groups(): + runner = CliRunner() + result = runner.invoke(cli, ["--help"]) + assert "device" in result.output + assert "broker" in result.output