feat: CLI commands for device management, broker monitoring, and serve

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
walkpan
2026-03-29 21:48:08 +08:00
parent 957f489a0d
commit bac099c805
2 changed files with 275 additions and 0 deletions

248
src/mqtt_home/cli.py Normal file
View File

@@ -0,0 +1,248 @@
import asyncio
import click
from mqtt_home.config import get_settings
from mqtt_home.database import init_db, get_session_factory
from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.device_registry import (
list_devices, get_device, create_device, delete_device,
send_command, get_device_logs,
)
from mqtt_home.schemas import DeviceCreate
from mqtt_home.mqtt_client import MqttClient
def run_async(coro):
asyncio.run(coro)
@click.group()
def cli():
"""MQTT 智能家居管理工具"""
pass
@cli.group()
def device():
"""设备管理"""
pass
@device.command("list")
def device_list():
"""列出所有设备"""
async def _run():
await init_db()
factory = get_session_factory()
async with factory() as db:
devices = await list_devices(db)
if not devices:
click.echo("暂无设备")
return
for d in devices:
status = "ON " if d.is_online else "OFF"
state = d.state or "-"
click.echo(f"[{status}] [{d.id[:8]}] {d.name} ({d.type}) state={state}")
run_async(_run())
@device.command("add")
@click.option("--name", required=True, help="设备名称")
@click.option("--type", "device_type", default="switch", help="设备类型")
@click.option("--state-topic", required=True, help="状态主题")
@click.option("--command-topic", default=None, help="命令主题")
def device_add(name, device_type, state_topic, command_topic):
"""手动添加设备"""
async def _run():
await init_db()
factory = get_session_factory()
async with factory() as db:
d = await create_device(db, DeviceCreate(
name=name,
type=device_type,
mqtt_topic=state_topic,
command_topic=command_topic,
))
click.echo(f"设备已创建: {d.id} - {d.name}")
run_async(_run())
@device.command("info")
@click.argument("device_id")
def device_info(device_id):
"""查看设备详情"""
async def _run():
await init_db()
factory = get_session_factory()
async with factory() as db:
d = await get_device(db, device_id)
if not d:
click.echo(f"设备不存在: {device_id}", err=True)
return
click.echo(f"ID: {d.id}")
click.echo(f"名称: {d.name}")
click.echo(f"类型: {d.type}")
click.echo(f"协议: {d.protocol}")
click.echo(f"状态主题: {d.mqtt_topic}")
click.echo(f"命令主题: {d.command_topic or ''}")
click.echo(f"当前状态: {d.state or ''}")
click.echo(f"在线: {'' if d.is_online else ''}")
click.echo(f"最后活跃: {d.last_seen or '从未'}")
run_async(_run())
@device.command("remove")
@click.argument("device_id")
def device_remove(device_id):
"""删除设备"""
async def _run():
await init_db()
factory = get_session_factory()
async with factory() as db:
if await delete_device(db, device_id):
click.echo(f"设备已删除: {device_id}")
else:
click.echo(f"设备不存在: {device_id}", err=True)
run_async(_run())
@device.command("command")
@click.argument("device_id")
@click.option("--payload", required=True, help="命令内容JSON")
def device_command(device_id, payload):
"""向设备发送命令"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
import aiomqtt
async with aiomqtt.Client(
hostname=settings.mqtt_host,
port=settings.mqtt_port,
username=settings.mqtt_username or None,
password=settings.mqtt_password or None,
) as client:
async def publish_fn(topic, p):
await client.publish(topic, p, qos=1)
async with factory() as db:
try:
log = await send_command(db, device_id, payload, publish_fn=publish_fn)
if log:
click.echo(f"命令已发送到 {log.topic}: {log.payload}")
else:
click.echo(f"设备不存在: {device_id}", err=True)
except ValueError as e:
click.echo(str(e), err=True)
run_async(_run())
@device.command("logs")
@click.argument("device_id")
@click.option("--limit", default=20, help="日志条数")
def device_logs(device_id, limit):
"""查看设备消息日志"""
async def _run():
await init_db()
factory = get_session_factory()
async with factory() as db:
logs = await get_device_logs(db, device_id, limit)
if not logs:
click.echo("暂无日志")
return
for log in logs:
direction = "RX" if log.direction == "rx" else "TX"
click.echo(f"[{log.timestamp}] {direction} {log.topic} | {log.payload}")
run_async(_run())
@cli.group()
def broker():
"""Broker 管理"""
pass
@broker.command("status")
def broker_status():
"""查看 Broker 状态"""
async def _run():
settings = get_settings()
emqx = EmqxApiClient(settings)
try:
status = await emqx.get_broker_status()
click.echo(f"版本: {status.get('version', 'unknown')}")
click.echo(f"运行时间: {status.get('uptime', 0)}s")
metrics = await emqx.get_metrics()
click.echo(f"连接数: {metrics.get('connections.count', 0)}")
click.echo(f"订阅数: {metrics.get('subscriptions.count', 0)}")
click.echo(f"主题数: {metrics.get('topics.count', 0)}")
except Exception as e:
click.echo(f"连接 EMQX 失败: {e}", err=True)
finally:
await emqx.close()
run_async(_run())
@broker.command("clients")
def broker_clients():
"""列出已连接客户端"""
async def _run():
settings = get_settings()
emqx = EmqxApiClient(settings)
try:
clients = await emqx.get_clients()
if not clients:
click.echo("暂无客户端连接")
return
for c in clients:
status = "在线" if c.get("connected") else "离线"
click.echo(f"[{c.get('clientid')}] {status} ip={c.get('ip_address')}")
except Exception as e:
click.echo(f"连接 EMQX 失败: {e}", err=True)
finally:
await emqx.close()
run_async(_run())
@broker.command("topics")
def broker_topics():
"""列出活跃主题"""
async def _run():
settings = get_settings()
emqx = EmqxApiClient(settings)
try:
topics = await emqx.get_topics()
if not topics:
click.echo("暂无活跃主题")
return
for t in topics:
click.echo(t.get("topic", ""))
except Exception as e:
click.echo(f"连接 EMQX 失败: {e}", err=True)
finally:
await emqx.close()
run_async(_run())
@cli.command("serve")
def serve():
"""启动 Web 服务和 MQTT 客户端"""
import uvicorn
from mqtt_home.config import get_settings
settings = get_settings()
uvicorn.run(
"mqtt_home.main:app",
host=settings.web_host,
port=settings.web_port,
reload=False,
)

27
tests/test_cli.py Normal file
View File

@@ -0,0 +1,27 @@
import pytest
from click.testing import CliRunner
from mqtt_home.cli import cli
def test_device_list_empty(monkeypatch):
runner = CliRunner()
async def mock_init():
pass
async def mock_list(db):
return []
monkeypatch.setattr("mqtt_home.cli.init_db", mock_init)
monkeypatch.setattr("mqtt_home.cli.list_devices", mock_list)
result = runner.invoke(cli, ["device", "list"])
assert result.exit_code == 0
assert "暂无设备" in result.output
def test_cli_groups():
runner = CliRunner()
result = runner.invoke(cli, ["--help"])
assert "device" in result.output
assert "broker" in result.output