62 KiB
MQTT 智能家居管理系统 - 后端实现计划
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 实现后端核心功能,包括 FastAPI REST API、MQTT 客户端、EMQX API 封装、CLI 命令、SQLite 数据库。
Architecture: 单体 FastAPI 应用,通过 aiomqtt 连接 EMQX Broker,使用 SQLite + SQLAlchemy 存储设备数据。CLI 和 REST API 共享 device_registry 业务逻辑层。
Tech Stack: Python 3.11+, FastAPI, uvicorn, aiomqtt, SQLAlchemy 2.0 + aiosqlite, pydantic-settings, click, httpx
文件结构总览
mqtt-home/
├── pyproject.toml
├── .env.example
├── .gitignore
├── src/
│ └── mqtt_home/
│ ├── __init__.py
│ ├── config.py # Settings (pydantic-settings)
│ ├── database.py # SQLite engine, async session
│ ├── models.py # SQLAlchemy ORM (Device, DeviceLog)
│ ├── schemas.py # Pydantic schemas (request/response)
│ ├── emqx_api.py # EMQX REST API client (httpx)
│ ├── mqtt_client.py # MQTT connection lifecycle (aiomqtt)
│ ├── discovery.py # HA Discovery protocol handler
│ ├── device_registry.py # Business logic (CRUD + control)
│ ├── main.py # FastAPI app + lifespan
│ ├── cli.py # click CLI commands
│ └── api/
│ ├── __init__.py # Router aggregation
│ ├── devices.py # Device REST routes
│ ├── broker.py # Broker REST routes
│ └── dashboard.py # Dashboard REST routes
└── tests/
├── conftest.py
├── test_config.py
├── test_models.py
├── test_device_registry.py
├── test_emqx_api.py
├── test_discovery.py
├── test_api_devices.py
├── test_api_broker.py
└── test_cli.py
Task 1: 项目骨架与配置
Files:
-
Create:
pyproject.toml -
Create:
.env.example -
Create:
.gitignore -
Create:
src/mqtt_home/__init__.py -
Create:
src/mqtt_home/config.py -
Create:
tests/conftest.py -
Create:
tests/test_config.py -
Step 1: 创建 pyproject.toml
[build-system]
requires = ["setuptools>=68.0", "wheel"]
build-backend = "setuptools.backends._legacy:_Backend"
[project]
name = "mqtt-home"
version = "0.1.0"
description = "轻量级智能家居 MQTT 设备管理系统"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.115.0",
"uvicorn[standard]>=0.30.0",
"aiomqtt>=2.0.0",
"sqlalchemy[asyncio]>=2.0.0",
"aiosqlite>=0.20.0",
"pydantic-settings>=2.0.0",
"click>=8.1.0",
"httpx>=0.27.0",
"websockets>=12.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"pytest-httpx>=0.30.0",
"httpx>=0.27.0",
]
[project.scripts]
mqtt-home = "mqtt_home.cli:cli"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
- Step 2: 创建 .env.example
MQTT_HOST=192.168.0.31
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
EMQX_API_URL=http://192.168.0.31:18083/api/v5
EMQX_API_KEY=your-api-key
EMQX_API_SECRET=your-secret
DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db
WEB_HOST=0.0.0.0
WEB_PORT=8000
- Step 3: 创建 .gitignore
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.venv/
.env
data/
*.db
frontend/node_modules/
frontend/dist/
- Step 4: 创建 src/mqtt_home/init.py
- Step 5: 创建 tests/conftest.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import StaticPool
from mqtt_home.database import Base
from mqtt_home.config import Settings
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture
def settings():
return Settings(
mqtt_host="localhost",
mqtt_port=1883,
emqx_api_url="http://localhost:18083/api/v5",
emqx_api_key="test-key",
emqx_api_secret="test-secret",
database_url=TEST_DATABASE_URL,
)
@pytest.fixture
async def engine():
engine = create_async_engine(
TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def db_session(engine):
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
yield session
- Step 6: 创建 src/mqtt_home/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
mqtt_host: str = "localhost"
mqtt_port: int = 1883
mqtt_username: str = ""
mqtt_password: str = ""
emqx_api_url: str = "http://localhost:18083/api/v5"
emqx_api_key: str = ""
emqx_api_secret: str = ""
database_url: str = "sqlite+aiosqlite:///./data/mqtt_home.db"
web_host: str = "0.0.0.0"
web_port: int = 8000
model_config = {"env_file": ".env", "env_prefix": ""}
def get_settings() -> Settings:
return Settings()
- Step 7: 创建 tests/test_config.py
import os
from mqtt_home.config import Settings
def test_default_settings():
s = Settings()
assert s.mqtt_host == "localhost"
assert s.mqtt_port == 1883
assert s.web_port == 8000
def test_settings_from_env(monkeypatch):
monkeypatch.setenv("MQTT_HOST", "192.168.1.1")
monkeypatch.setenv("MQTT_PORT", "1884")
s = Settings()
assert s.mqtt_host == "192.168.1.1"
assert s.mqtt_port == 1884
- Step 8: 安装依赖并运行测试
Run: pip install -e ".[dev]" (在 D:\home\mqtt 目录下)
Run: pytest tests/test_config.py -v
Expected: 2 passed
- Step 9: 初始化 git 仓库并提交
Run:
cd D:\home\mqtt
git init
git add -A
git commit -m "feat: project skeleton with config, dependencies, and test setup"
Task 2: 数据库与 ORM 模型
Files:
-
Create:
src/mqtt_home/database.py -
Create:
src/mqtt_home/models.py -
Create:
tests/test_models.py -
Step 1: 创建 src/mqtt_home/database.py
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, async_session
from sqlalchemy.orm import DeclarativeBase
from mqtt_home.config import get_settings
class Base(DeclarativeBase):
pass
def get_engine():
settings = get_settings()
return create_async_engine(
settings.database_url,
echo=False,
)
def get_session_factory(engine=None):
_engine = engine or get_engine()
return async_sessionmaker(_engine, expire_on_commit=False)
async def init_db():
engine = get_engine()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await engine.dispose()
async def get_db() -> async_session:
factory = get_session_factory()
async with factory() as session:
yield session
- Step 2: 创建 src/mqtt_home/models.py
import uuid
from datetime import datetime, timezone
from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, ForeignKey
from sqlalchemy.orm import relationship
from mqtt_home.database import Base
def _utcnow():
return datetime.now(timezone.utc)
def _new_id():
return str(uuid.uuid4())
class Device(Base):
__tablename__ = "devices"
id = Column(String(36), primary_key=True, default=_new_id)
name = Column(String(200), nullable=False)
type = Column(String(50), nullable=False, default="switch")
protocol = Column(String(20), nullable=False, default="custom")
mqtt_topic = Column(String(500), nullable=False)
command_topic = Column(String(500), nullable=True)
discovery_topic = Column(String(500), nullable=True)
discovery_payload = Column(Text, nullable=True)
attributes = Column(Text, nullable=True, default="{}")
state = Column(String(500), nullable=True, default=None)
is_online = Column(Boolean, nullable=False, default=False)
last_seen = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow)
updated_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow, onupdate=_utcnow)
logs = relationship("DeviceLog", back_populates="device", lazy="dynamic")
class DeviceLog(Base):
__tablename__ = "device_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
device_id = Column(String(36), ForeignKey("devices.id", ondelete="CASCADE"), nullable=False)
direction = Column(String(10), nullable=False) # "rx" or "tx"
topic = Column(String(500), nullable=False)
payload = Column(Text, nullable=False)
timestamp = Column(DateTime(timezone=True), nullable=False, default=_utcnow)
device = relationship("Device", back_populates="logs")
- Step 3: 创建 tests/test_models.py
import pytest
from mqtt_home.models import Device, DeviceLog
async def test_create_device(db_session):
device = Device(
name="客厅灯",
type="light",
protocol="custom",
mqtt_topic="home/living/light",
command_topic="home/living/light/set",
)
db_session.add(device)
await db_session.commit()
await db_session.refresh(device)
assert device.id is not None
assert len(device.id) == 36
assert device.name == "客厅灯"
assert device.state is None
assert device.is_online is False
async def test_create_device_log(db_session):
device = Device(
name="温度传感器",
type="sensor",
protocol="custom",
mqtt_topic="home/temperature",
)
db_session.add(device)
await db_session.flush()
log = DeviceLog(
device_id=device.id,
direction="rx",
topic="home/temperature",
payload='{"temperature": 25.5}',
)
db_session.add(log)
await db_session.commit()
fetched_device = await db_session.get(Device, device.id)
assert fetched_device.logs.count() == 1
- Step 4: 运行测试
Run: pytest tests/test_models.py -v
Expected: 2 passed
- Step 5: 提交
git add -A
git commit -m "feat: database setup with SQLAlchemy ORM models for Device and DeviceLog"
Task 3: Pydantic Schemas
Files:
-
Create:
src/mqtt_home/schemas.py -
Step 1: 创建 src/mqtt_home/schemas.py
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
class DeviceCreate(BaseModel):
name: str
type: str = "switch"
protocol: str = "custom"
mqtt_topic: str
command_topic: Optional[str] = None
class DeviceUpdate(BaseModel):
name: Optional[str] = None
type: Optional[str] = None
command_topic: Optional[str] = None
class DeviceCommand(BaseModel):
payload: str
class DeviceResponse(BaseModel):
id: str
name: str
type: str
protocol: str
mqtt_topic: str
command_topic: Optional[str] = None
state: Optional[str] = None
is_online: bool
last_seen: Optional[datetime] = None
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
class DeviceLogResponse(BaseModel):
id: int
device_id: str
direction: str
topic: str
payload: str
timestamp: datetime
model_config = {"from_attributes": True}
class BrokerClient(BaseModel):
clientid: str
username: Optional[str] = None
ip_address: Optional[str] = None
connected: bool
keepalive: int
proto_ver: Optional[int] = None
clean_start: Optional[bool] = None
class BrokerTopic(BaseModel):
topic: str
node: Optional[str] = None
class DashboardStats(BaseModel):
total_devices: int
online_devices: int
offline_devices: int
recent_logs: list[DeviceLogResponse]
- Step 2: 提交
git add -A
git commit -m "feat: Pydantic schemas for API request/response models"
Task 4: EMQX REST API 客户端
Files:
-
Create:
src/mqtt_home/emqx_api.py -
Create:
tests/test_emqx_api.py -
Step 1: 创建 src/mqtt_home/emqx_api.py
import httpx
from typing import Any
from mqtt_home.config import Settings
class EmqxApiClient:
def __init__(self, settings: Settings):
self._client = httpx.AsyncClient(
base_url=settings.emqx_api_url,
auth=(settings.emqx_api_key, settings.emqx_api_secret),
timeout=10.0,
)
async def close(self):
await self._client.aclose()
async def get_broker_status(self) -> dict[str, Any]:
resp = await self._client.get("/status")
resp.raise_for_status()
return resp.json().get("data", {})
async def get_metrics(self) -> dict[str, Any]:
resp = await self._client.get("/metrics")
resp.raise_for_status()
return resp.json().get("data", {})
async def get_clients(self, limit: int = 100) -> list[dict[str, Any]]:
resp = await self._client.get("/clients", params={"limit": limit})
resp.raise_for_status()
return resp.json().get("data", [])
async def get_subscriptions(self, limit: int = 100) -> list[dict[str, Any]]:
resp = await self._client.get("/subscriptions", params={"limit": limit})
resp.raise_for_status()
return resp.json().get("data", [])
async def get_topics(self, limit: int = 100) -> list[dict[str, Any]]:
resp = await self._client.get("/topics", params={"limit": limit})
resp.raise_for_status()
return resp.json().get("data", [])
- Step 2: 创建 tests/test_emqx_api.py
import pytest
import httpx
from unittest.mock import AsyncMock, patch
from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.config import Settings
@pytest.fixture
def settings():
return Settings(
mqtt_host="localhost",
emqx_api_url="http://localhost:18083/api/v5",
emqx_api_key="test-key",
emqx_api_secret="test-secret",
database_url="sqlite+aiosqlite:///:memory:",
)
@pytest.fixture
def emqx_client(settings):
return EmqxApiClient(settings)
async def test_get_broker_status(emqx_client):
with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
mock_get.return_value = httpx.Response(
200, json={"data": {"uptime": 12345, "version": "5.0.0"}}
)
result = await emqx_client.get_broker_status()
assert result["uptime"] == 12345
async def test_get_clients(emqx_client):
with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
mock_get.return_value = httpx.Response(
200, json={"data": [{"clientid": "dev1", "connected": True}]}
)
result = await emqx_client.get_clients()
assert len(result) == 1
assert result[0]["clientid"] == "dev1"
async def test_get_topics(emqx_client):
with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
mock_get.return_value = httpx.Response(
200, json={"data": [{"topic": "home/light"}]}
)
result = await emqx_client.get_topics()
assert len(result) == 1
- Step 3: 运行测试
Run: pytest tests/test_emqx_api.py -v
Expected: 3 passed
- Step 4: 提交
git add -A
git commit -m "feat: EMQX REST API client for broker status, clients, and topics"
Task 5: MQTT 客户端
Files:
-
Create:
src/mqtt_home/mqtt_client.py -
Create:
tests/test_mqtt_client.py -
Step 1: 创建 src/mqtt_home/mqtt_client.py
import asyncio
import uuid
import logging
from typing import Callable, Awaitable
import aiomqtt
from mqtt_home.config import Settings
logger = logging.getLogger(__name__)
MessageCallback = Callable[[str, str], Awaitable[None]]
class MqttClient:
def __init__(self, settings: Settings):
self._settings = settings
self._client: aiomqtt.Client | None = None
self._callbacks: dict[str, list[MessageCallback]] = {}
self._connected = False
self._task: asyncio.Task | None = None
self._client_id = f"mqtt-home-{uuid.uuid4().hex[:8]}"
@property
def is_connected(self) -> bool:
return self._connected
def on_message(self, topic_filter: str, callback: MessageCallback):
self._callbacks.setdefault(topic_filter, []).append(callback)
async def start(self):
self._task = asyncio.create_task(self._run())
async def stop(self):
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def publish(self, topic: str, payload: str, qos: int = 1, retain: bool = False):
if not self._client:
raise RuntimeError("MQTT client not connected")
await self._client.publish(topic, payload, qos=qos, retain=retain)
async def subscribe(self, topic: str, qos: int = 1):
if not self._client:
raise RuntimeError("MQTT client not connected")
await self._client.subscribe(topic, qos=qos)
async def _run(self):
retry_delay = 1
max_delay = 60
while True:
try:
async with aiomqtt.Client(
hostname=self._settings.mqtt_host,
port=self._settings.mqtt_port,
username=self._settings.mqtt_username or None,
password=self._settings.mqtt_password or None,
identifier=self._client_id,
clean_session=True,
) as client:
self._client = client
self._connected = True
logger.info("MQTT connected to %s:%d", self._settings.mqtt_host, self._settings.mqtt_port)
retry_delay = 1
# 重新订阅所有已注册的主题
for topic_filter in self._callbacks:
await client.subscribe(topic_filter, qos=1)
async for message in client.messages:
topic = str(message.topic)
payload = message.payload.decode("utf-8", errors="replace")
await self._dispatch(topic, payload)
except asyncio.CancelledError:
self._connected = False
raise
except Exception as e:
self._connected = False
logger.warning("MQTT connection failed: %s, retrying in %ds", e, retry_delay)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_delay)
async def _dispatch(self, topic: str, payload: str):
matched = False
for topic_filter, callbacks in self._callbacks.items():
if self._topic_matches(topic, topic_filter):
matched = True
for cb in callbacks:
try:
await cb(topic, payload)
except Exception as e:
logger.error("MQTT callback error for %s: %s", topic, e)
if not matched:
logger.debug("No callback for topic: %s", topic)
@staticmethod
def _topic_matches(topic: str, pattern: str) -> bool:
"""简单 MQTT 通配符匹配(支持 + 和 #)"""
topic_parts = topic.split("/")
pattern_parts = pattern.split("/")
pi = 0
for ti, tp in enumerate(pattern_parts):
if tp == "#":
return True
if ti >= len(topic_parts):
return False
if tp != "+" and tp != topic_parts[ti]:
return False
pi = ti
return len(topic_parts) == len(pattern_parts)
- Step 2: 创建 tests/test_mqtt_client.py
import pytest
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.config import Settings
@pytest.fixture
def mqtt_client():
settings = Settings(
mqtt_host="localhost",
mqtt_port=1883,
emqx_api_url="http://localhost:18083/api/v5",
emqx_api_key="test-key",
emqx_api_secret="test-secret",
database_url="sqlite+aiosqlite:///:memory:",
)
return MqttClient(settings)
def test_topic_matches_exact(mqtt_client):
assert MqttClient._topic_matches("home/light", "home/light") is True
assert MqttClient._topic_matches("home/light", "home/switch") is False
def test_topic_matches_single_level_wildcard(mqtt_client):
assert MqttClient._topic_matches("home/light/status", "home/+/status") is True
assert MqttClient._topic_matches("home/switch/status", "home/+/status") is True
assert MqttClient._topic_matches("home/light", "home/+") is True
assert MqttClient._topic_matches("home/light/extra", "home/+") is False
def test_topic_matches_multi_level_wildcard(mqtt_client):
assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/#") is True
assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/light/#") is True
assert Mqtt_client._topic_matches("other/topic", "homeassistant/#") is False
def test_register_callback(mqtt_client):
async def cb(topic, payload):
pass
mqtt_client.on_message("home/#", cb)
assert "home/#" in mqtt_client._callbacks
assert len(mqtt_client._callbacks["home/#"]) == 1
def test_publish_not_connected(mqtt_client):
with pytest.raises(RuntimeError, match="not connected"):
import asyncio
asyncio.get_event_loop().run_until_complete(
mqtt_client.publish("test", "payload")
)
- Step 3: 运行测试
Run: pytest tests/test_mqtt_client.py -v
Expected: 5 passed
- Step 4: 提交
git add -A
git commit -m "feat: MQTT client with reconnection, topic matching, and callback dispatch"
Task 6: 设备注册表(核心业务逻辑)
Files:
-
Create:
src/mqtt_home/device_registry.py -
Create:
tests/test_device_registry.py -
Step 1: 创建 src/mqtt_home/device_registry.py
import json
import logging
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select, func, desc
from sqlalchemy.ext.asyncio import AsyncSession
from mqtt_home.models import Device, DeviceLog
from mqtt_home.schemas import DeviceCreate, DeviceUpdate
logger = logging.getLogger(__name__)
MAX_LOGS_PER_DEVICE = 100
async def list_devices(db: AsyncSession) -> list[Device]:
result = await db.execute(select(Device).order_by(Device.created_at.desc()))
return list(result.scalars().all())
async def get_device(db: AsyncSession, device_id: str) -> Optional[Device]:
return await db.get(Device, device_id)
async def create_device(db: AsyncSession, data: DeviceCreate) -> Device:
device = Device(
name=data.name,
type=data.type,
protocol=data.protocol,
mqtt_topic=data.mqtt_topic,
command_topic=data.command_topic,
)
db.add(device)
await db.commit()
await db.refresh(device)
logger.info("Device created: %s (%s)", device.name, device.id)
return device
async def update_device(db: AsyncSession, device_id: str, data: DeviceUpdate) -> Optional[Device]:
device = await db.get(Device, device_id)
if not device:
return None
update_data = data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(device, key, value)
await db.commit()
await db.refresh(device)
return device
async def delete_device(db: AsyncSession, device_id: str) -> bool:
device = await db.get(Device, device_id)
if not device:
return False
await db.delete(device)
await db.commit()
logger.info("Device deleted: %s", device_id)
return True
async def send_command(db: AsyncSession, device_id: str, payload: str, publish_fn=None) -> Optional[DeviceLog]:
device = await db.get(Device, device_id)
if not device:
return None
if not device.command_topic:
raise ValueError(f"Device {device_id} has no command_topic configured")
log = DeviceLog(
device_id=device_id,
direction="tx",
topic=device.command_topic,
payload=payload,
)
db.add(log)
if publish_fn:
await publish_fn(device.command_topic, payload)
await db.commit()
await db.refresh(log)
logger.info("Command sent to %s: %s", device.command_topic, payload)
return log
async def handle_state_update(db: AsyncSession, topic: str, payload: str) -> Optional[Device]:
"""处理从 MQTT 收到的状态更新"""
result = await db.execute(select(Device).where(Device.mqtt_topic == topic))
device = result.scalar_one_or_none()
if not device:
return None
old_state = device.state
device.state = payload
device.last_seen = datetime.now(timezone.utc)
device.is_online = True
log = DeviceLog(
device_id=device.id,
direction="rx",
topic=topic,
payload=payload,
)
db.add(log)
# 清理旧日志,保留最近 MAX_LOGS_PER_DEVICE 条
count_result = await db.execute(
select(func.count()).select_from(DeviceLog).where(DeviceLog.device_id == device.id)
)
count = count_result.scalar() or 0
if count >= MAX_LOGS_PER_DEVICE:
oldest = await db.execute(
select(DeviceLog).where(DeviceLog.device_id == device.id)
.order_by(DeviceLog.timestamp.asc())
.limit(count - MAX_LOGS_PER_DEVICE + 1)
)
for old_log in oldest.scalars().all():
await db.delete(old_log)
await db.commit()
await db.refresh(device)
return device
async def get_device_logs(db: AsyncSession, device_id: str, limit: int = 20) -> list[DeviceLog]:
result = await db.execute(
select(DeviceLog)
.where(DeviceLog.device_id == device_id)
.order_by(desc(DeviceLog.timestamp))
.limit(limit)
)
return list(result.scalars().all())
async def update_device_online_status(db: AsyncSession, topic: str, payload: str) -> Optional[Device]:
"""处理 HA availability 主题消息"""
result = await db.execute(select(Device).where(Device.mqtt_topic == topic))
device = result.scalar_one_or_none()
if not device:
return None
device.is_online = payload.lower() == "online"
device.last_seen = datetime.now(timezone.utc)
await db.commit()
await db.refresh(device)
return device
async def get_dashboard_stats(db: AsyncSession) -> dict:
total = await db.execute(select(func.count()).select_from(Device))
online = await db.execute(select(func.count()).select_from(Device).where(Device.is_online == True))
recent_logs_result = await db.execute(
select(DeviceLog).order_by(desc(DeviceLog.timestamp)).limit(10)
)
return {
"total_devices": total.scalar() or 0,
"online_devices": online.scalar() or 0,
"offline_devices": (total.scalar() or 0) - (online.scalar() or 0),
"recent_logs": list(recent_logs_result.scalars().all()),
}
- Step 2: 创建 tests/test_device_registry.py
import pytest
from mqtt_home.device_registry import (
create_device, get_device, list_devices, delete_device,
update_device, send_command, handle_state_update, get_device_logs,
get_dashboard_stats,
)
from mqtt_home.schemas import DeviceCreate, DeviceUpdate
async def test_create_and_get_device(db_session):
data = DeviceCreate(name="客厅灯", type="light", mqtt_topic="home/light")
device = await create_device(db_session, data)
assert device.name == "客厅灯"
fetched = await get_device(db_session, device.id)
assert fetched is not None
assert fetched.id == device.id
async def test_list_devices(db_session):
await create_device(db_session, DeviceCreate(name="设备1", type="switch", mqtt_topic="t1"))
await create_device(db_session, DeviceCreate(name="设备2", type="sensor", mqtt_topic="t2"))
devices = await list_devices(db_session)
assert len(devices) == 2
async def test_update_device(db_session):
device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t"))
updated = await update_device(db_session, device.id, DeviceUpdate(name="新名字"))
assert updated.name == "新名字"
async def test_delete_device(db_session):
device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t"))
assert await delete_device(db_session, device.id) is True
assert await get_device(db_session, device.id) is None
async def test_delete_nonexistent_device(db_session):
assert await delete_device(db_session, "nonexistent") is False
async def test_send_command(db_session):
device = await create_device(
db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t", command_topic="t/set")
)
published = {}
async def mock_publish(topic, payload):
published[topic] = payload
log = await send_command(db_session, device.id, '{"state":"on"}', mock_publish)
assert log is not None
assert log.direction == "tx"
assert published["t/set"] == '{"state":"on"}'
async def test_send_command_no_command_topic(db_session):
device = await create_device(db_session, DeviceCreate(name="传感器", type="sensor", mqtt_topic="t"))
with pytest.raises(ValueError, match="no command_topic"):
await send_command(db_session, device.id, '{"value":1}')
async def test_handle_state_update(db_session):
device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="home/light"))
updated = await handle_state_update(db_session, "home/light", '{"state":"on"}')
assert updated is not None
assert updated.state == '{"state":"on"}'
assert updated.is_online is True
async def test_handle_state_update_unknown_topic(db_session):
result = await handle_state_update(db_session, "unknown/topic", "on")
assert result is None
async def test_get_device_logs(db_session):
device = await create_device(
db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t", command_topic="t/set")
)
await send_command(db_session, device.id, '{"state":"on"}', lambda t, p: None)
await send_command(db_session, device.id, '{"state":"off"}', lambda t, p: None)
logs = await get_device_logs(db_session, device.id, limit=10)
assert len(logs) == 2
async def test_dashboard_stats(db_session):
await create_device(db_session, DeviceCreate(name="在线设备", type="switch", mqtt_topic="t1"))
device2 = await create_device(db_session, DeviceCreate(name="离线设备", type="sensor", mqtt_topic="t2"))
await handle_state_update(db_session, "t1", "online")
stats = await get_dashboard_stats(db_session)
assert stats["total_devices"] == 2
assert stats["online_devices"] == 1
assert stats["offline_devices"] == 1
- Step 3: 运行测试
Run: pytest tests/test_device_registry.py -v
Expected: 11 passed
- Step 4: 提交
git add -A
git commit -m "feat: device registry with CRUD, state tracking, command sending, and log management"
Task 7: HA Discovery 协议处理器
Files:
-
Create:
src/mqtt_home/discovery.py -
Create:
tests/test_discovery.py -
Step 1: 创建 src/mqtt_home/discovery.py
import json
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from mqtt_home.models import Device
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.device_registry import create_device, handle_state_update
logger = logging.getLogger(__name__)
# HA Discovery 配置主题格式: homeassistant/<component>/<node_id>/config
DISCOVERY_TOPIC_PREFIX = "homeassistant/"
def parse_discovery_topic(topic: str) -> dict[str, str] | None:
"""解析 HA Discovery 主题,返回 component 和 node_id"""
parts = topic.split("/")
if len(parts) < 4 or parts[0] != DISCOVERY_TOPIC_PREFIX.strip("/"):
return None
if parts[-1] != "config":
return None
return {
"component": parts[1],
"node_id": "/".join(parts[2:-1]),
}
async def handle_discovery_message(
topic: str, payload: str, db: AsyncSession, mqtt_client: MqttClient
) -> Device | None:
"""处理 HA Discovery config 消息,自动注册设备并订阅状态主题"""
parsed = parse_discovery_topic(topic)
if not parsed:
return None
try:
config = json.loads(payload)
except json.JSONDecodeError:
logger.warning("Invalid JSON in discovery payload for %s", topic)
return None
state_topic = config.get("state_topic")
command_topic = config.get("command_topic")
device_name = config.get("name", config.get("device", {}).get("name", parsed["node_id"]))
device_class = config.get("device_class", "")
if not state_topic:
logger.warning("Discovery config for %s has no state_topic", topic)
return None
# 检查是否已存在(按 discovery_topic 去重)
from sqlalchemy import select
result = await db.execute(
select(Device).where(Device.discovery_topic == topic)
)
existing = result.scalar_one_or_none()
if existing:
logger.debug("Device already registered: %s", topic)
return existing
from mqtt_home.schemas import DeviceCreate
device = await create_device(db, DeviceCreate(
name=device_name,
type=parsed["component"],
protocol="ha_discovery",
mqtt_topic=state_topic,
command_topic=command_topic,
))
# 更新 discovery 相关字段
device.discovery_topic = topic
device.discovery_payload = payload
device.attributes = json.dumps({
"device_class": device_class,
"unit_of_measurement": config.get("unit_of_measurement"),
"icon": config.get("icon"),
})
await db.commit()
await db.refresh(device)
# 订阅状态主题
await mqtt_client.subscribe(state_topic, qos=1)
logger.info("HA Discovery device registered: %s -> %s", device.name, state_topic)
return device
- Step 2: 创建 tests/test_discovery.py
import json
import pytest
from unittest.mock import AsyncMock
from mqtt_home.discovery import parse_discovery_topic, handle_discovery_message
from mqtt_home.config import Settings
from mqtt_home.mqtt_client import MqttClient
@pytest.fixture
def settings():
return Settings(
mqtt_host="localhost",
emqx_api_url="http://localhost:18083/api/v5",
emqx_api_key="test-key",
emqx_api_secret="test-secret",
database_url="sqlite+aiosqlite:///:memory:",
)
def test_parse_discovery_topic_valid():
result = parse_discovery_topic("homeassistant/light/abc123/config")
assert result is not None
assert result["component"] == "light"
assert result["node_id"] == "abc123"
def test_parse_discovery_topic_nested_node():
result = parse_discovery_topic("homeassistant/sensor/room/temperature/config")
assert result is not None
assert result["component"] == "sensor"
assert result["node_id"] == "room/temperature"
def test_parse_discovery_topic_invalid():
assert parse_discovery_topic("other/topic/config") is None
assert parse_discovery_topic("homeassistant/light/abc") is None
assert parse_discovery_topic("homeassistant/light/abc/status") is None
async def test_handle_discovery_creates_device(db_session, settings):
mqtt_client = MqttClient(settings)
mqtt_client.subscribe = AsyncMock()
payload = json.dumps({
"name": "客厅灯",
"state_topic": "home/living/light/status",
"command_topic": "home/living/light/set",
"device_class": "light",
})
device = await handle_discovery_message(
"homeassistant/light/living_room/config",
payload,
db_session,
mqtt_client,
)
assert device is not None
assert device.name == "客厅灯"
assert device.protocol == "ha_discovery"
assert device.mqtt_topic == "home/living/light/status"
mqtt_client.subscribe.assert_called_once_with("home/living/light/status", qos=1)
async def test_handle_discovery_duplicate(db_session, settings):
mqtt_client = MqttClient(settings)
mqtt_client.subscribe = AsyncMock()
payload = json.dumps({
"name": "灯",
"state_topic": "home/light",
})
await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client)
device2 = await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client)
# 同一个 discovery_topic 不应该创建重复设备
assert device2 is not None
mqtt_client.subscribe.assert_called_once() # 只调用一次
- Step 3: 运行测试
Run: pytest tests/test_discovery.py -v
Expected: 5 passed
- Step 4: 提交
git add -A
git commit -m "feat: HA Discovery protocol handler for auto-registering devices"
Task 8: FastAPI 应用与 REST API 路由
Files:
-
Create:
src/mqtt_home/main.py -
Create:
src/mqtt_home/api/__init__.py -
Create:
src/mqtt_home/api/devices.py -
Create:
src/mqtt_home/api/broker.py -
Create:
src/mqtt_home/api/dashboard.py -
Create:
tests/test_api_devices.py -
Create:
tests/test_api_broker.py -
Step 1: 创建 src/mqtt_home/api/init.py
from fastapi import APIRouter
from mqtt_home.api.devices import router as devices_router
from mqtt_home.api.broker import router as broker_router
from mqtt_home.api.dashboard import router as dashboard_router
api_router = APIRouter(prefix="/api")
api_router.include_router(devices_router, prefix="/devices", tags=["devices"])
api_router.include_router(broker_router, prefix="/broker", tags=["broker"])
api_router.include_router(dashboard_router, prefix="/dashboard", tags=["dashboard"])
- Step 2: 创建 src/mqtt_home/api/devices.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from mqtt_home.database import get_db
from mqtt_home.device_registry import (
list_devices, get_device, create_device, update_device,
delete_device, send_command, get_device_logs,
)
from mqtt_home.schemas import (
DeviceCreate, DeviceUpdate, DeviceCommand,
DeviceResponse, DeviceLogResponse,
)
router = APIRouter()
@router.get("", response_model=list[DeviceResponse])
async def get_devices(db: AsyncSession = Depends(get_db)):
return await list_devices(db)
@router.post("", response_model=DeviceResponse, status_code=201)
async def add_device(data: DeviceCreate, db: AsyncSession = Depends(get_db)):
return await create_device(db, data)
@router.get("/{device_id}", response_model=DeviceResponse)
async def get_device_detail(device_id: str, db: AsyncSession = Depends(get_db)):
device = await get_device(db, device_id)
if not device:
raise HTTPException(status_code=404, detail="Device not found")
return device
@router.put("/{device_id}", response_model=DeviceResponse)
async def patch_device(device_id: str, data: DeviceUpdate, db: AsyncSession = Depends(get_db)):
device = await update_device(db, device_id, data)
if not device:
raise HTTPException(status_code=404, detail="Device not found")
return device
@router.delete("/{device_id}", status_code=204)
async def remove_device(device_id: str, db: AsyncSession = Depends(get_db)):
if not await delete_device(db, device_id):
raise HTTPException(status_code=404, detail="Device not found")
@router.post("/{device_id}/command", response_model=DeviceLogResponse)
async def command_device(
device_id: str, data: DeviceCommand,
db: AsyncSession = Depends(get_db),
):
# 从应用状态获取 mqtt_client 和 publish_fn
from mqtt_home.main import app
mqtt_client = getattr(app.state, "mqtt_client", None)
if not mqtt_client or not mqtt_client.is_connected:
raise HTTPException(status_code=503, detail="MQTT not connected")
try:
log = await send_command(
db, device_id, data.payload,
publish_fn=mqtt_client.publish,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not log:
raise HTTPException(status_code=404, detail="Device not found")
return log
@router.get("/{device_id}/logs", response_model=list[DeviceLogResponse])
async def read_device_logs(device_id: str, limit: int = 20, db: AsyncSession = Depends(get_db)):
return await get_device_logs(db, device_id, limit)
- Step 3: 创建 src/mqtt_home/api/broker.py
from fastapi import APIRouter, Depends, HTTPException
from mqtt_home.schemas import BrokerClient, BrokerTopic
router = APIRouter()
@router.get("/status")
async def broker_status():
from mqtt_home.main import app
emqx = getattr(app.state, "emqx_client", None)
if not emqx:
raise HTTPException(status_code=503, detail="EMQX API client not configured")
try:
status = await emqx.get_broker_status()
metrics = await emqx.get_metrics()
return {"status": status, "metrics": metrics}
except Exception as e:
raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
@router.get("/clients", response_model=list[BrokerClient])
async def broker_clients(limit: int = 100):
from mqtt_home.main import app
emqx = getattr(app.state, "emqx_client", None)
if not emqx:
raise HTTPException(status_code=503, detail="EMQX API client not configured")
try:
return await emqx.get_clients(limit)
except Exception as e:
raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
@router.get("/topics", response_model=list[BrokerTopic])
async def broker_topics(limit: int = 100):
from mqtt_home.main import app
emqx = getattr(app.state, "emqx_client", None)
if not emqx:
raise HTTPException(status_code=503, detail="EMQX API client not configured")
try:
return await emqx.get_topics(limit)
except Exception as e:
raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
- Step 4: 创建 src/mqtt_home/api/dashboard.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from mqtt_home.database import get_db
from mqtt_home.device_registry import get_dashboard_stats
from mqtt_home.schemas import DashboardStats
router = APIRouter()
@router.get("", response_model=DashboardStats)
async def dashboard(db: AsyncSession = Depends(get_db)):
return await get_dashboard_stats(db)
- Step 5: 创建 src/mqtt_home/main.py
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from mqtt_home.config import get_settings
from mqtt_home.database import init_db, get_session_factory, Base
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.discovery import handle_discovery_message
from mqtt_home.device_registry import handle_state_update
from mqtt_home.api import api_router
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
# 初始化数据库
await init_db()
logger.info("Database initialized")
# 初始化 EMQX API 客户端
emqx = EmqxApiClient(settings)
app.state.emqx_client = emqx
logger.info("EMQX API client initialized")
# 初始化 MQTT 客户端
mqtt = MqttClient(settings)
app.state.mqtt_client = mqtt
session_factory = get_session_factory()
# 注册 HA Discovery 回调
async def on_discovery(topic: str, payload: str):
async with session_factory() as db:
await handle_discovery_message(topic, payload, db, mqtt)
# 注册状态更新回调
async def on_state(topic: str, payload: str):
async with session_factory() as db:
device = await handle_state_update(db, topic, payload)
# WebSocket 推送会在前端 Task 中添加
mqtt.on_message("homeassistant/#", on_discovery)
mqtt.on_message("home/#", on_state)
await mqtt.start()
logger.info("MQTT client started")
yield
await mqtt.stop()
await emqx.close()
logger.info("Shutdown complete")
app = FastAPI(title="MQTT Home", version="0.1.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router)
@app.get("/health")
async def health():
mqtt = getattr(app.state, "mqtt_client", None)
return {
"status": "ok",
"mqtt_connected": mqtt.is_connected if mqtt else False,
}
- Step 6: 创建 tests/test_api_devices.py
import pytest
from httpx import AsyncClient, ASGITransport
from mqtt_home.main import app
from mqtt_home.database import Base, get_engine, get_session_factory
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.pool import StaticPool
TEST_DB = "sqlite+aiosqlite:///:memory:"
@pytest.fixture
async def client():
engine = create_async_engine(TEST_DB, connect_args={"check_same_thread": False}, poolclass=StaticPool)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
test_factory = async_sessionmaker(engine, expire_on_commit=False)
async def override_get_db():
async with test_factory() as session:
yield session
app.dependency_overrides[get_db.__wrapped__ if hasattr(get_db, '__wrapped__') else get_db] = override_get_db
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
await engine.dispose()
async def test_get_devices_empty(client):
resp = await client.get("/api/devices")
assert resp.status_code == 200
assert resp.json() == []
async def test_create_device(client):
resp = await client.post("/api/devices", json={
"name": "客厅灯",
"type": "light",
"mqtt_topic": "home/light",
"command_topic": "home/light/set",
})
assert resp.status_code == 201
data = resp.json()
assert data["name"] == "客厅灯"
assert data["type"] == "light"
async def test_get_device_detail(client):
create_resp = await client.post("/api/devices", json={
"name": "灯", "type": "switch", "mqtt_topic": "t"
})
device_id = create_resp.json()["id"]
resp = await client.get(f"/api/devices/{device_id}")
assert resp.status_code == 200
assert resp.json()["name"] == "灯"
async def test_get_device_not_found(client):
resp = await client.get("/api/devices/nonexistent")
assert resp.status_code == 404
async def test_update_device(client):
create_resp = await client.post("/api/devices", json={
"name": "灯", "type": "switch", "mqtt_topic": "t"
})
device_id = create_resp.json()["id"]
resp = await client.put(f"/api/devices/{device_id}", json={"name": "新名字"})
assert resp.status_code == 200
assert resp.json()["name"] == "新名字"
async def test_delete_device(client):
create_resp = await client.post("/api/devices", json={
"name": "灯", "type": "switch", "mqtt_topic": "t"
})
device_id = create_resp.json()["id"]
resp = await client.delete(f"/api/devices/{device_id}")
assert resp.status_code == 204
async def test_delete_device_not_found(client):
resp = await client.delete("/api/devices/nonexistent")
assert resp.status_code == 404
- Step 7: 创建 tests/test_api_broker.py
import pytest
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock, patch
from mqtt_home.main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
async def test_broker_status_no_client(client):
# 没有 mqtt_client 时应返回 503
resp = await client.get("/api/broker/status")
assert resp.status_code == 503
async def test_broker_clients_no_client(client):
resp = await client.get("/api/broker/clients")
assert resp.status_code == 503
async def test_health_endpoint(client):
resp = await client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert "status" in data
assert "mqtt_connected" in data
- Step 8: 运行所有测试
Run: pytest tests/ -v
Expected: All tests pass
- Step 9: 提交
git add -A
git commit -m "feat: FastAPI app with device, broker, dashboard REST API routes"
Task 9: WebSocket 实时推送
Files:
-
Modify:
src/mqtt_home/main.py -
Create:
src/mqtt_home/ws.py -
Step 1: 创建 src/mqtt_home/ws.py
import asyncio
import json
import logging
from fastapi import WebSocket, WebSocketDisconnect
logger = logging.getLogger(__name__)
class ConnectionManager:
def __init__(self):
self._connections: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self._connections.append(ws)
logger.info("WebSocket connected, total: %d", len(self._connections))
def disconnect(self, ws: WebSocket):
self._connections.remove(ws)
logger.info("WebSocket disconnected, total: %d", len(self._connections))
async def broadcast(self, message: dict):
dead = []
for ws in self._connections:
try:
await ws.send_json(message)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws)
ws_manager = ConnectionManager()
async def websocket_endpoint(ws: WebSocket):
await ws_manager.connect(ws)
try:
while True:
await ws.receive_text() # 保持连接,忽略客户端消息
except WebSocketDisconnect:
ws_manager.disconnect(ws)
async def broadcast_device_update(device_id: str, data: dict):
await ws_manager.broadcast({
"type": "device_update",
"device_id": device_id,
**data,
})
- Step 2: 修改 src/mqtt_home/main.py,添加 WebSocket 路由和广播
在 from mqtt_home.api import api_router 后面添加:
from mqtt_home.ws import websocket_endpoint, broadcast_device_update
在 app.include_router(api_router) 后面添加:
from mqtt_home.ws import ws_manager
app.websocket("/ws/devices")(websocket_endpoint)
修改 on_state 回调,添加 WebSocket 广播:
async def on_state(topic: str, payload: str):
async with session_factory() as db:
device = await handle_state_update(db, topic, payload)
if device:
await broadcast_device_update(device.id, {
"state": device.state,
"is_online": device.is_online,
"last_seen": device.last_seen.isoformat() if device.last_seen else None,
})
- Step 3: 提交
git add -A
git commit -m "feat: WebSocket endpoint for real-time device state updates"
Task 10: CLI 命令
Files:
-
Create:
src/mqtt_home/cli.py -
Create:
tests/test_cli.py -
Step 1: 创建 src/mqtt_home/cli.py
import asyncio
import json
import click
from mqtt_home.config import get_settings
from mqtt_home.database import init_db, get_session_factory
from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.device_registry import (
list_devices, get_device, create_device, delete_device,
send_command, get_device_logs,
)
from mqtt_home.schemas import DeviceCreate
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.discovery import handle_discovery_message, DISCOVERY_TOPIC_PREFIX
from mqtt_home.device_registry import handle_state_update
def run_async(coro):
return asyncio.get_event_loop().run_until_complete(coro)
@click.group()
def cli():
"""MQTT 智能家居管理工具"""
pass
@cli.group()
def device():
"""设备管理"""
pass
@device.command("list")
def device_list():
"""列出所有设备"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
devices = await list_devices(db)
if not devices:
click.echo("暂无设备")
return
for d in devices:
status = "🟢" if d.is_online else "🔴"
state = d.state or "-"
click.echo(f"{status} [{d.id[:8]}] {d.name} ({d.type}) state={state}")
run_async(_run())
@device.command("add")
@click.option("--name", required=True, help="设备名称")
@click.option("--type", "device_type", default="switch", help="设备类型")
@click.option("--state-topic", required=True, help="状态主题")
@click.option("--command-topic", default=None, help="命令主题")
def device_add(name, device_type, state_topic, command_topic):
"""手动添加设备"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
d = await create_device(db, DeviceCreate(
name=name,
type=device_type,
mqtt_topic=state_topic,
command_topic=command_topic,
))
click.echo(f"设备已创建: {d.id} - {d.name}")
run_async(_run())
@device.command("info")
@click.argument("device_id")
def device_info(device_id):
"""查看设备详情"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
d = await get_device(db, device_id)
if not d:
click.echo(f"设备不存在: {device_id}", err=True)
return
click.echo(f"ID: {d.id}")
click.echo(f"名称: {d.name}")
click.echo(f"类型: {d.type}")
click.echo(f"协议: {d.protocol}")
click.echo(f"状态主题: {d.mqtt_topic}")
click.echo(f"命令主题: {d.command_topic or '无'}")
click.echo(f"当前状态: {d.state or '无'}")
click.echo(f"在线: {'是' if d.is_online else '否'}")
click.echo(f"最后活跃: {d.last_seen or '从未'}")
run_async(_run())
@device.command("remove")
@click.argument("device_id")
def device_remove(device_id):
"""删除设备"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
if await delete_device(db, device_id):
click.echo(f"设备已删除: {device_id}")
else:
click.echo(f"设备不存在: {device_id}", err=True)
run_async(_run())
@device.command("command")
@click.argument("device_id")
@click.option("--payload", required=True, help="命令内容(JSON)")
def device_command(device_id, payload):
"""向设备发送命令"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
# 连接 MQTT
mqtt = MqttClient(settings)
await mqtt._run.__wrapped__(mqtt) if False else None # 不启动消息循环
# 直接使用 aiomqtt 发送
import aiomqtt
async with aiomqtt.Client(
hostname=settings.mqtt_host,
port=settings.mqtt_port,
username=settings.mqtt_username or None,
password=settings.mqtt_password or None,
) as client:
async with factory() as db:
try:
log = await send_command(
db, device_id, payload,
publish_fn=lambda topic, p: client.publish(topic, p, qos=1),
)
if log:
click.echo(f"命令已发送到 {log.topic}: {log.payload}")
else:
click.echo(f"设备不存在: {device_id}", err=True)
except ValueError as e:
click.echo(str(e), err=True)
run_async(_run())
@device.command("logs")
@click.argument("device_id")
@click.option("--limit", default=20, help="日志条数")
def device_logs(device_id, limit):
"""查看设备消息日志"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
logs = await get_device_logs(db, device_id, limit)
if not logs:
click.echo("暂无日志")
return
for log in logs:
direction = "RX" if log.direction == "rx" else "TX"
click.echo(f"[{log.timestamp}] {direction} {log.topic} | {log.payload}")
run_async(_run())
@cli.group()
def broker():
"""Broker 管理"""
pass
@broker.command("status")
def broker_status():
"""查看 Broker 状态"""
async def _run():
settings = get_settings()
emqx = EmqxApiClient(settings)
try:
status = await emqx.get_broker_status()
click.echo(f"版本: {status.get('version', 'unknown')}")
click.echo(f"运行时间: {status.get('uptime', 0)}s")
metrics = await emqx.get_metrics()
click.echo(f"连接数: {metrics.get('connections.count', 0)}")
click.echo(f"订阅数: {metrics.get('subscriptions.count', 0)}")
click.echo(f"主题数: {metrics.get('topics.count', 0)}")
except Exception as e:
click.echo(f"连接 EMQX 失败: {e}", err=True)
finally:
await emqx.close()
run_async(_run())
@broker.command("clients")
def broker_clients():
"""列出已连接客户端"""
async def _run():
settings = get_settings()
emqx = EmqxApiClient(settings)
try:
clients = await emqx.get_clients()
if not clients:
click.echo("暂无客户端连接")
return
for c in clients:
status = "在线" if c.get("connected") else "离线"
click.echo(f"[{c.get('clientid')}] {status} ip={c.get('ip_address')}")
except Exception as e:
click.echo(f"连接 EMQX 失败: {e}", err=True)
finally:
await emqx.close()
run_async(_run())
@broker.command("topics")
def broker_topics():
"""列出活跃主题"""
async def _run():
settings = get_settings()
emqx = EmqxApiClient(settings)
try:
topics = await emqx.get_topics()
if not topics:
click.echo("暂无活跃主题")
return
for t in topics:
click.echo(t.get("topic", ""))
except Exception as e:
click.echo(f"连接 EMQX 失败: {e}", err=True)
finally:
await emqx.close()
run_async(_run())
- Step 2: 创建 tests/test_cli.py
import pytest
from click.testing import CliRunner
from mqtt_home.cli import cli
def test_device_list_empty(monkeypatch):
"""测试无设备时的 list 命令"""
runner = CliRunner()
async def mock_init():
pass
async def mock_list(db):
return []
monkeypatch.setattr("mqtt_home.cli.init_db", mock_init)
monkeypatch.setattr("mqtt_home.cli.list_devices", mock_list)
result = runner.invoke(cli, ["device", "list"])
assert result.exit_code == 0
assert "暂无设备" in result.output
def test_cli_groups():
runner = CliRunner()
result = runner.invoke(cli, ["--help"])
assert "device" in result.output
assert "broker" in result.output
- Step 3: 运行测试
Run: pytest tests/test_cli.py -v
Expected: 2 passed
- Step 4: 提交
git add -A
git commit -m "feat: CLI commands for device management and broker monitoring"
Task 11: 创建 .env 并验证端到端
Files:
-
Create:
.env -
Step 1: 创建 .env(从 .env.example 复制并填入实际值)
MQTT_HOST=192.168.0.31
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
EMQX_API_URL=http://192.168.0.31:18083/api/v5
EMQX_API_KEY=ed74cf28e117c5f6
EMQX_API_SECRET=fED31TaybOUXx9CtkabICmT0wEZuUPXSU0vPwIekSbbC
DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db
WEB_HOST=0.0.0.0
WEB_PORT=8000
- Step 2: 启动服务验证
Run: python -m mqtt_home serve
Expected: 服务启动成功,日志显示 "MQTT connected" 和 "Database initialized"
- Step 3: 用 CLI 验证 broker 连接
Run: mqtt-home broker status
Expected: 显示 EMQX 版本和统计信息
- Step 4: 用 API 验证
Run: curl http://localhost:8000/health
Expected: {"status":"ok","mqtt_connected":true}
Run: curl http://localhost:8000/api/broker/clients
Expected: 客户端列表 JSON
- Step 5: 提交(.env 已在 .gitignore 中)
git add -A
git commit -m "feat: verify end-to-end backend with EMQX broker connection"
Task 12: 全量测试确认
- Step 1: 运行所有测试
Run: pytest tests/ -v
Expected: 所有测试通过
- Step 2: 修复任何失败的测试
根据实际输出修复问题。
- Step 3: 最终提交
git add -A
git commit -m "test: ensure all backend tests pass"
自检结果
| 设计文档章节 | 对应 Task |
|---|---|
| 项目结构 | Task 1 |
| 数据模型 (devices, device_logs) | Task 2 |
| Pydantic Schemas | Task 3 |
| EMQX REST API 封装 | Task 4 |
| MQTT 客户端 + 重连 + 通配符 | Task 5 |
| 设备注册表 (CRUD + 状态 + 命令) | Task 6 |
| HA Discovery 协议 | Task 7 |
| REST API 路由 (devices, broker, dashboard) | Task 8 |
| WebSocket 实时推送 | Task 9 |
| CLI 命令 | Task 10 |
| 配置 / .env | Task 11 |