From ce10907619d4b8ad01d34d55f045ff5179ddf5ca Mon Sep 17 00:00:00 2001 From: walkpan Date: Sun, 29 Mar 2026 21:21:44 +0800 Subject: [PATCH] docs: design spec and backend implementation plan --- .../plans/2026-03-29-mqtt-home-backend.md | 2189 +++++++++++++++++ .../specs/2026-03-29-mqtt-home-design.md | 232 ++ 2 files changed, 2421 insertions(+) create mode 100644 docs/superpowers/plans/2026-03-29-mqtt-home-backend.md create mode 100644 docs/superpowers/specs/2026-03-29-mqtt-home-design.md diff --git a/docs/superpowers/plans/2026-03-29-mqtt-home-backend.md b/docs/superpowers/plans/2026-03-29-mqtt-home-backend.md new file mode 100644 index 0000000..561a9e8 --- /dev/null +++ b/docs/superpowers/plans/2026-03-29-mqtt-home-backend.md @@ -0,0 +1,2189 @@ +# MQTT 智能家居管理系统 - 后端实现计划 + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 实现后端核心功能,包括 FastAPI REST API、MQTT 客户端、EMQX API 封装、CLI 命令、SQLite 数据库。 + +**Architecture:** 单体 FastAPI 应用,通过 aiomqtt 连接 EMQX Broker,使用 SQLite + SQLAlchemy 存储设备数据。CLI 和 REST API 共享 device_registry 业务逻辑层。 + +**Tech Stack:** Python 3.11+, FastAPI, uvicorn, aiomqtt, SQLAlchemy 2.0 + aiosqlite, pydantic-settings, click, httpx + +--- + +## 文件结构总览 + +``` +mqtt-home/ +├── pyproject.toml +├── .env.example +├── .gitignore +├── src/ +│ └── mqtt_home/ +│ ├── __init__.py +│ ├── config.py # Settings (pydantic-settings) +│ ├── database.py # SQLite engine, async session +│ ├── models.py # SQLAlchemy ORM (Device, DeviceLog) +│ ├── schemas.py # Pydantic schemas (request/response) +│ ├── emqx_api.py # EMQX REST API client (httpx) +│ ├── mqtt_client.py # MQTT connection lifecycle (aiomqtt) +│ ├── discovery.py # HA Discovery protocol handler +│ ├── device_registry.py # Business logic (CRUD + control) +│ ├── main.py # FastAPI app + lifespan +│ ├── cli.py # click CLI commands +│ └── api/ +│ ├── __init__.py # Router aggregation +│ ├── devices.py # Device REST routes +│ ├── broker.py # Broker REST routes +│ └── dashboard.py # Dashboard REST routes +└── tests/ + ├── conftest.py + ├── test_config.py + ├── test_models.py + ├── test_device_registry.py + ├── test_emqx_api.py + ├── test_discovery.py + ├── test_api_devices.py + ├── test_api_broker.py + └── test_cli.py +``` + +--- + +### Task 1: 项目骨架与配置 + +**Files:** +- Create: `pyproject.toml` +- Create: `.env.example` +- Create: `.gitignore` +- Create: `src/mqtt_home/__init__.py` +- Create: `src/mqtt_home/config.py` +- Create: `tests/conftest.py` +- Create: `tests/test_config.py` + +- [ ] **Step 1: 创建 pyproject.toml** + +```toml +[build-system] +requires = ["setuptools>=68.0", "wheel"] +build-backend = "setuptools.backends._legacy:_Backend" + +[project] +name = "mqtt-home" +version = "0.1.0" +description = "轻量级智能家居 MQTT 设备管理系统" +requires-python = ">=3.11" +dependencies = [ + "fastapi>=0.115.0", + "uvicorn[standard]>=0.30.0", + "aiomqtt>=2.0.0", + "sqlalchemy[asyncio]>=2.0.0", + "aiosqlite>=0.20.0", + "pydantic-settings>=2.0.0", + "click>=8.1.0", + "httpx>=0.27.0", + "websockets>=12.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0.0", + "pytest-asyncio>=0.23.0", + "pytest-httpx>=0.30.0", + "httpx>=0.27.0", +] + +[project.scripts] +mqtt-home = "mqtt_home.cli:cli" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +``` + +- [ ] **Step 2: 创建 .env.example** + +``` +MQTT_HOST=192.168.0.31 +MQTT_PORT=1883 +MQTT_USERNAME= +MQTT_PASSWORD= +EMQX_API_URL=http://192.168.0.31:18083/api/v5 +EMQX_API_KEY=your-api-key +EMQX_API_SECRET=your-secret +DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db +WEB_HOST=0.0.0.0 +WEB_PORT=8000 +``` + +- [ ] **Step 3: 创建 .gitignore** + +``` +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ +.venv/ +.env +data/ +*.db +frontend/node_modules/ +frontend/dist/ +``` + +- [ ] **Step 4: 创建 src/mqtt_home/__init__.py** + +```python +``` + +- [ ] **Step 5: 创建 tests/conftest.py** + +```python +import pytest +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession +from sqlalchemy.pool import StaticPool + +from mqtt_home.database import Base +from mqtt_home.config import Settings + +TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" + + +@pytest.fixture +def settings(): + return Settings( + mqtt_host="localhost", + mqtt_port=1883, + emqx_api_url="http://localhost:18083/api/v5", + emqx_api_key="test-key", + emqx_api_secret="test-secret", + database_url=TEST_DATABASE_URL, + ) + + +@pytest.fixture +async def engine(): + engine = create_async_engine( + TEST_DATABASE_URL, + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + yield engine + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + await engine.dispose() + + +@pytest.fixture +async def db_session(engine): + async_session = async_sessionmaker(engine, expire_on_commit=False) + async with async_session() as session: + yield session +``` + +- [ ] **Step 6: 创建 src/mqtt_home/config.py** + +```python +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + mqtt_host: str = "localhost" + mqtt_port: int = 1883 + mqtt_username: str = "" + mqtt_password: str = "" + emqx_api_url: str = "http://localhost:18083/api/v5" + emqx_api_key: str = "" + emqx_api_secret: str = "" + database_url: str = "sqlite+aiosqlite:///./data/mqtt_home.db" + web_host: str = "0.0.0.0" + web_port: int = 8000 + + model_config = {"env_file": ".env", "env_prefix": ""} + + +def get_settings() -> Settings: + return Settings() +``` + +- [ ] **Step 7: 创建 tests/test_config.py** + +```python +import os +from mqtt_home.config import Settings + + +def test_default_settings(): + s = Settings() + assert s.mqtt_host == "localhost" + assert s.mqtt_port == 1883 + assert s.web_port == 8000 + + +def test_settings_from_env(monkeypatch): + monkeypatch.setenv("MQTT_HOST", "192.168.1.1") + monkeypatch.setenv("MQTT_PORT", "1884") + s = Settings() + assert s.mqtt_host == "192.168.1.1" + assert s.mqtt_port == 1884 +``` + +- [ ] **Step 8: 安装依赖并运行测试** + +Run: `pip install -e ".[dev]"` (在 `D:\home\mqtt` 目录下) +Run: `pytest tests/test_config.py -v` +Expected: 2 passed + +- [ ] **Step 9: 初始化 git 仓库并提交** + +Run: +```bash +cd D:\home\mqtt +git init +git add -A +git commit -m "feat: project skeleton with config, dependencies, and test setup" +``` + +--- + +### Task 2: 数据库与 ORM 模型 + +**Files:** +- Create: `src/mqtt_home/database.py` +- Create: `src/mqtt_home/models.py` +- Create: `tests/test_models.py` + +- [ ] **Step 1: 创建 src/mqtt_home/database.py** + +```python +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, async_session +from sqlalchemy.orm import DeclarativeBase +from mqtt_home.config import get_settings + + +class Base(DeclarativeBase): + pass + + +def get_engine(): + settings = get_settings() + return create_async_engine( + settings.database_url, + echo=False, + ) + + +def get_session_factory(engine=None): + _engine = engine or get_engine() + return async_sessionmaker(_engine, expire_on_commit=False) + + +async def init_db(): + engine = get_engine() + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + await engine.dispose() + + +async def get_db() -> async_session: + factory = get_session_factory() + async with factory() as session: + yield session +``` + +- [ ] **Step 2: 创建 src/mqtt_home/models.py** + +```python +import uuid +from datetime import datetime, timezone + +from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, ForeignKey +from sqlalchemy.orm import relationship + +from mqtt_home.database import Base + + +def _utcnow(): + return datetime.now(timezone.utc) + + +def _new_id(): + return str(uuid.uuid4()) + + +class Device(Base): + __tablename__ = "devices" + + id = Column(String(36), primary_key=True, default=_new_id) + name = Column(String(200), nullable=False) + type = Column(String(50), nullable=False, default="switch") + protocol = Column(String(20), nullable=False, default="custom") + mqtt_topic = Column(String(500), nullable=False) + command_topic = Column(String(500), nullable=True) + discovery_topic = Column(String(500), nullable=True) + discovery_payload = Column(Text, nullable=True) + attributes = Column(Text, nullable=True, default="{}") + state = Column(String(500), nullable=True, default=None) + is_online = Column(Boolean, nullable=False, default=False) + last_seen = Column(DateTime(timezone=True), nullable=True) + created_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow) + updated_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow, onupdate=_utcnow) + + logs = relationship("DeviceLog", back_populates="device", lazy="dynamic") + + +class DeviceLog(Base): + __tablename__ = "device_logs" + + id = Column(Integer, primary_key=True, autoincrement=True) + device_id = Column(String(36), ForeignKey("devices.id", ondelete="CASCADE"), nullable=False) + direction = Column(String(10), nullable=False) # "rx" or "tx" + topic = Column(String(500), nullable=False) + payload = Column(Text, nullable=False) + timestamp = Column(DateTime(timezone=True), nullable=False, default=_utcnow) + + device = relationship("Device", back_populates="logs") +``` + +- [ ] **Step 3: 创建 tests/test_models.py** + +```python +import pytest +from mqtt_home.models import Device, DeviceLog + + +async def test_create_device(db_session): + device = Device( + name="客厅灯", + type="light", + protocol="custom", + mqtt_topic="home/living/light", + command_topic="home/living/light/set", + ) + db_session.add(device) + await db_session.commit() + await db_session.refresh(device) + + assert device.id is not None + assert len(device.id) == 36 + assert device.name == "客厅灯" + assert device.state is None + assert device.is_online is False + + +async def test_create_device_log(db_session): + device = Device( + name="温度传感器", + type="sensor", + protocol="custom", + mqtt_topic="home/temperature", + ) + db_session.add(device) + await db_session.flush() + + log = DeviceLog( + device_id=device.id, + direction="rx", + topic="home/temperature", + payload='{"temperature": 25.5}', + ) + db_session.add(log) + await db_session.commit() + + fetched_device = await db_session.get(Device, device.id) + assert fetched_device.logs.count() == 1 +``` + +- [ ] **Step 4: 运行测试** + +Run: `pytest tests/test_models.py -v` +Expected: 2 passed + +- [ ] **Step 5: 提交** + +```bash +git add -A +git commit -m "feat: database setup with SQLAlchemy ORM models for Device and DeviceLog" +``` + +--- + +### Task 3: Pydantic Schemas + +**Files:** +- Create: `src/mqtt_home/schemas.py` + +- [ ] **Step 1: 创建 src/mqtt_home/schemas.py** + +```python +from datetime import datetime +from typing import Optional +from pydantic import BaseModel, Field + + +class DeviceCreate(BaseModel): + name: str + type: str = "switch" + protocol: str = "custom" + mqtt_topic: str + command_topic: Optional[str] = None + + +class DeviceUpdate(BaseModel): + name: Optional[str] = None + type: Optional[str] = None + command_topic: Optional[str] = None + + +class DeviceCommand(BaseModel): + payload: str + + +class DeviceResponse(BaseModel): + id: str + name: str + type: str + protocol: str + mqtt_topic: str + command_topic: Optional[str] = None + state: Optional[str] = None + is_online: bool + last_seen: Optional[datetime] = None + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +class DeviceLogResponse(BaseModel): + id: int + device_id: str + direction: str + topic: str + payload: str + timestamp: datetime + + model_config = {"from_attributes": True} + + +class BrokerClient(BaseModel): + clientid: str + username: Optional[str] = None + ip_address: Optional[str] = None + connected: bool + keepalive: int + proto_ver: Optional[int] = None + clean_start: Optional[bool] = None + + +class BrokerTopic(BaseModel): + topic: str + node: Optional[str] = None + + +class DashboardStats(BaseModel): + total_devices: int + online_devices: int + offline_devices: int + recent_logs: list[DeviceLogResponse] +``` + +- [ ] **Step 2: 提交** + +```bash +git add -A +git commit -m "feat: Pydantic schemas for API request/response models" +``` + +--- + +### Task 4: EMQX REST API 客户端 + +**Files:** +- Create: `src/mqtt_home/emqx_api.py` +- Create: `tests/test_emqx_api.py` + +- [ ] **Step 1: 创建 src/mqtt_home/emqx_api.py** + +```python +import httpx +from typing import Any + +from mqtt_home.config import Settings + + +class EmqxApiClient: + def __init__(self, settings: Settings): + self._client = httpx.AsyncClient( + base_url=settings.emqx_api_url, + auth=(settings.emqx_api_key, settings.emqx_api_secret), + timeout=10.0, + ) + + async def close(self): + await self._client.aclose() + + async def get_broker_status(self) -> dict[str, Any]: + resp = await self._client.get("/status") + resp.raise_for_status() + return resp.json().get("data", {}) + + async def get_metrics(self) -> dict[str, Any]: + resp = await self._client.get("/metrics") + resp.raise_for_status() + return resp.json().get("data", {}) + + async def get_clients(self, limit: int = 100) -> list[dict[str, Any]]: + resp = await self._client.get("/clients", params={"limit": limit}) + resp.raise_for_status() + return resp.json().get("data", []) + + async def get_subscriptions(self, limit: int = 100) -> list[dict[str, Any]]: + resp = await self._client.get("/subscriptions", params={"limit": limit}) + resp.raise_for_status() + return resp.json().get("data", []) + + async def get_topics(self, limit: int = 100) -> list[dict[str, Any]]: + resp = await self._client.get("/topics", params={"limit": limit}) + resp.raise_for_status() + return resp.json().get("data", []) +``` + +- [ ] **Step 2: 创建 tests/test_emqx_api.py** + +```python +import pytest +import httpx +from unittest.mock import AsyncMock, patch + +from mqtt_home.emqx_api import EmqxApiClient +from mqtt_home.config import Settings + + +@pytest.fixture +def settings(): + return Settings( + mqtt_host="localhost", + emqx_api_url="http://localhost:18083/api/v5", + emqx_api_key="test-key", + emqx_api_secret="test-secret", + database_url="sqlite+aiosqlite:///:memory:", + ) + + +@pytest.fixture +def emqx_client(settings): + return EmqxApiClient(settings) + + +async def test_get_broker_status(emqx_client): + with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get: + mock_get.return_value = httpx.Response( + 200, json={"data": {"uptime": 12345, "version": "5.0.0"}} + ) + result = await emqx_client.get_broker_status() + assert result["uptime"] == 12345 + + +async def test_get_clients(emqx_client): + with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get: + mock_get.return_value = httpx.Response( + 200, json={"data": [{"clientid": "dev1", "connected": True}]} + ) + result = await emqx_client.get_clients() + assert len(result) == 1 + assert result[0]["clientid"] == "dev1" + + +async def test_get_topics(emqx_client): + with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get: + mock_get.return_value = httpx.Response( + 200, json={"data": [{"topic": "home/light"}]} + ) + result = await emqx_client.get_topics() + assert len(result) == 1 +``` + +- [ ] **Step 3: 运行测试** + +Run: `pytest tests/test_emqx_api.py -v` +Expected: 3 passed + +- [ ] **Step 4: 提交** + +```bash +git add -A +git commit -m "feat: EMQX REST API client for broker status, clients, and topics" +``` + +--- + +### Task 5: MQTT 客户端 + +**Files:** +- Create: `src/mqtt_home/mqtt_client.py` +- Create: `tests/test_mqtt_client.py` + +- [ ] **Step 1: 创建 src/mqtt_home/mqtt_client.py** + +```python +import asyncio +import uuid +import logging +from typing import Callable, Awaitable + +import aiomqtt + +from mqtt_home.config import Settings + +logger = logging.getLogger(__name__) + +MessageCallback = Callable[[str, str], Awaitable[None]] + + +class MqttClient: + def __init__(self, settings: Settings): + self._settings = settings + self._client: aiomqtt.Client | None = None + self._callbacks: dict[str, list[MessageCallback]] = {} + self._connected = False + self._task: asyncio.Task | None = None + self._client_id = f"mqtt-home-{uuid.uuid4().hex[:8]}" + + @property + def is_connected(self) -> bool: + return self._connected + + def on_message(self, topic_filter: str, callback: MessageCallback): + self._callbacks.setdefault(topic_filter, []).append(callback) + + async def start(self): + self._task = asyncio.create_task(self._run()) + + async def stop(self): + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + async def publish(self, topic: str, payload: str, qos: int = 1, retain: bool = False): + if not self._client: + raise RuntimeError("MQTT client not connected") + await self._client.publish(topic, payload, qos=qos, retain=retain) + + async def subscribe(self, topic: str, qos: int = 1): + if not self._client: + raise RuntimeError("MQTT client not connected") + await self._client.subscribe(topic, qos=qos) + + async def _run(self): + retry_delay = 1 + max_delay = 60 + while True: + try: + async with aiomqtt.Client( + hostname=self._settings.mqtt_host, + port=self._settings.mqtt_port, + username=self._settings.mqtt_username or None, + password=self._settings.mqtt_password or None, + identifier=self._client_id, + clean_session=True, + ) as client: + self._client = client + self._connected = True + logger.info("MQTT connected to %s:%d", self._settings.mqtt_host, self._settings.mqtt_port) + retry_delay = 1 + + # 重新订阅所有已注册的主题 + for topic_filter in self._callbacks: + await client.subscribe(topic_filter, qos=1) + + async for message in client.messages: + topic = str(message.topic) + payload = message.payload.decode("utf-8", errors="replace") + await self._dispatch(topic, payload) + + except asyncio.CancelledError: + self._connected = False + raise + except Exception as e: + self._connected = False + logger.warning("MQTT connection failed: %s, retrying in %ds", e, retry_delay) + await asyncio.sleep(retry_delay) + retry_delay = min(retry_delay * 2, max_delay) + + async def _dispatch(self, topic: str, payload: str): + matched = False + for topic_filter, callbacks in self._callbacks.items(): + if self._topic_matches(topic, topic_filter): + matched = True + for cb in callbacks: + try: + await cb(topic, payload) + except Exception as e: + logger.error("MQTT callback error for %s: %s", topic, e) + if not matched: + logger.debug("No callback for topic: %s", topic) + + @staticmethod + def _topic_matches(topic: str, pattern: str) -> bool: + """简单 MQTT 通配符匹配(支持 + 和 #)""" + topic_parts = topic.split("/") + pattern_parts = pattern.split("/") + pi = 0 + for ti, tp in enumerate(pattern_parts): + if tp == "#": + return True + if ti >= len(topic_parts): + return False + if tp != "+" and tp != topic_parts[ti]: + return False + pi = ti + return len(topic_parts) == len(pattern_parts) +``` + +- [ ] **Step 2: 创建 tests/test_mqtt_client.py** + +```python +import pytest +from mqtt_home.mqtt_client import MqttClient +from mqtt_home.config import Settings + + +@pytest.fixture +def mqtt_client(): + settings = Settings( + mqtt_host="localhost", + mqtt_port=1883, + emqx_api_url="http://localhost:18083/api/v5", + emqx_api_key="test-key", + emqx_api_secret="test-secret", + database_url="sqlite+aiosqlite:///:memory:", + ) + return MqttClient(settings) + + +def test_topic_matches_exact(mqtt_client): + assert MqttClient._topic_matches("home/light", "home/light") is True + assert MqttClient._topic_matches("home/light", "home/switch") is False + + +def test_topic_matches_single_level_wildcard(mqtt_client): + assert MqttClient._topic_matches("home/light/status", "home/+/status") is True + assert MqttClient._topic_matches("home/switch/status", "home/+/status") is True + assert MqttClient._topic_matches("home/light", "home/+") is True + assert MqttClient._topic_matches("home/light/extra", "home/+") is False + + +def test_topic_matches_multi_level_wildcard(mqtt_client): + assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/#") is True + assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/light/#") is True + assert Mqtt_client._topic_matches("other/topic", "homeassistant/#") is False + + +def test_register_callback(mqtt_client): + async def cb(topic, payload): + pass + + mqtt_client.on_message("home/#", cb) + assert "home/#" in mqtt_client._callbacks + assert len(mqtt_client._callbacks["home/#"]) == 1 + + +def test_publish_not_connected(mqtt_client): + with pytest.raises(RuntimeError, match="not connected"): + import asyncio + asyncio.get_event_loop().run_until_complete( + mqtt_client.publish("test", "payload") + ) +``` + +- [ ] **Step 3: 运行测试** + +Run: `pytest tests/test_mqtt_client.py -v` +Expected: 5 passed + +- [ ] **Step 4: 提交** + +```bash +git add -A +git commit -m "feat: MQTT client with reconnection, topic matching, and callback dispatch" +``` + +--- + +### Task 6: 设备注册表(核心业务逻辑) + +**Files:** +- Create: `src/mqtt_home/device_registry.py` +- Create: `tests/test_device_registry.py` + +- [ ] **Step 1: 创建 src/mqtt_home/device_registry.py** + +```python +import json +import logging +from datetime import datetime, timezone +from typing import Optional + +from sqlalchemy import select, func, desc +from sqlalchemy.ext.asyncio import AsyncSession + +from mqtt_home.models import Device, DeviceLog +from mqtt_home.schemas import DeviceCreate, DeviceUpdate + +logger = logging.getLogger(__name__) + +MAX_LOGS_PER_DEVICE = 100 + + +async def list_devices(db: AsyncSession) -> list[Device]: + result = await db.execute(select(Device).order_by(Device.created_at.desc())) + return list(result.scalars().all()) + + +async def get_device(db: AsyncSession, device_id: str) -> Optional[Device]: + return await db.get(Device, device_id) + + +async def create_device(db: AsyncSession, data: DeviceCreate) -> Device: + device = Device( + name=data.name, + type=data.type, + protocol=data.protocol, + mqtt_topic=data.mqtt_topic, + command_topic=data.command_topic, + ) + db.add(device) + await db.commit() + await db.refresh(device) + logger.info("Device created: %s (%s)", device.name, device.id) + return device + + +async def update_device(db: AsyncSession, device_id: str, data: DeviceUpdate) -> Optional[Device]: + device = await db.get(Device, device_id) + if not device: + return None + update_data = data.model_dump(exclude_unset=True) + for key, value in update_data.items(): + setattr(device, key, value) + await db.commit() + await db.refresh(device) + return device + + +async def delete_device(db: AsyncSession, device_id: str) -> bool: + device = await db.get(Device, device_id) + if not device: + return False + await db.delete(device) + await db.commit() + logger.info("Device deleted: %s", device_id) + return True + + +async def send_command(db: AsyncSession, device_id: str, payload: str, publish_fn=None) -> Optional[DeviceLog]: + device = await db.get(Device, device_id) + if not device: + return None + if not device.command_topic: + raise ValueError(f"Device {device_id} has no command_topic configured") + + log = DeviceLog( + device_id=device_id, + direction="tx", + topic=device.command_topic, + payload=payload, + ) + db.add(log) + + if publish_fn: + await publish_fn(device.command_topic, payload) + + await db.commit() + await db.refresh(log) + logger.info("Command sent to %s: %s", device.command_topic, payload) + return log + + +async def handle_state_update(db: AsyncSession, topic: str, payload: str) -> Optional[Device]: + """处理从 MQTT 收到的状态更新""" + result = await db.execute(select(Device).where(Device.mqtt_topic == topic)) + device = result.scalar_one_or_none() + if not device: + return None + + old_state = device.state + device.state = payload + device.last_seen = datetime.now(timezone.utc) + device.is_online = True + + log = DeviceLog( + device_id=device.id, + direction="rx", + topic=topic, + payload=payload, + ) + db.add(log) + + # 清理旧日志,保留最近 MAX_LOGS_PER_DEVICE 条 + count_result = await db.execute( + select(func.count()).select_from(DeviceLog).where(DeviceLog.device_id == device.id) + ) + count = count_result.scalar() or 0 + if count >= MAX_LOGS_PER_DEVICE: + oldest = await db.execute( + select(DeviceLog).where(DeviceLog.device_id == device.id) + .order_by(DeviceLog.timestamp.asc()) + .limit(count - MAX_LOGS_PER_DEVICE + 1) + ) + for old_log in oldest.scalars().all(): + await db.delete(old_log) + + await db.commit() + await db.refresh(device) + return device + + +async def get_device_logs(db: AsyncSession, device_id: str, limit: int = 20) -> list[DeviceLog]: + result = await db.execute( + select(DeviceLog) + .where(DeviceLog.device_id == device_id) + .order_by(desc(DeviceLog.timestamp)) + .limit(limit) + ) + return list(result.scalars().all()) + + +async def update_device_online_status(db: AsyncSession, topic: str, payload: str) -> Optional[Device]: + """处理 HA availability 主题消息""" + result = await db.execute(select(Device).where(Device.mqtt_topic == topic)) + device = result.scalar_one_or_none() + if not device: + return None + device.is_online = payload.lower() == "online" + device.last_seen = datetime.now(timezone.utc) + await db.commit() + await db.refresh(device) + return device + + +async def get_dashboard_stats(db: AsyncSession) -> dict: + total = await db.execute(select(func.count()).select_from(Device)) + online = await db.execute(select(func.count()).select_from(Device).where(Device.is_online == True)) + recent_logs_result = await db.execute( + select(DeviceLog).order_by(desc(DeviceLog.timestamp)).limit(10) + ) + return { + "total_devices": total.scalar() or 0, + "online_devices": online.scalar() or 0, + "offline_devices": (total.scalar() or 0) - (online.scalar() or 0), + "recent_logs": list(recent_logs_result.scalars().all()), + } +``` + +- [ ] **Step 2: 创建 tests/test_device_registry.py** + +```python +import pytest +from mqtt_home.device_registry import ( + create_device, get_device, list_devices, delete_device, + update_device, send_command, handle_state_update, get_device_logs, + get_dashboard_stats, +) +from mqtt_home.schemas import DeviceCreate, DeviceUpdate + + +async def test_create_and_get_device(db_session): + data = DeviceCreate(name="客厅灯", type="light", mqtt_topic="home/light") + device = await create_device(db_session, data) + assert device.name == "客厅灯" + + fetched = await get_device(db_session, device.id) + assert fetched is not None + assert fetched.id == device.id + + +async def test_list_devices(db_session): + await create_device(db_session, DeviceCreate(name="设备1", type="switch", mqtt_topic="t1")) + await create_device(db_session, DeviceCreate(name="设备2", type="sensor", mqtt_topic="t2")) + devices = await list_devices(db_session) + assert len(devices) == 2 + + +async def test_update_device(db_session): + device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t")) + updated = await update_device(db_session, device.id, DeviceUpdate(name="新名字")) + assert updated.name == "新名字" + + +async def test_delete_device(db_session): + device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t")) + assert await delete_device(db_session, device.id) is True + assert await get_device(db_session, device.id) is None + + +async def test_delete_nonexistent_device(db_session): + assert await delete_device(db_session, "nonexistent") is False + + +async def test_send_command(db_session): + device = await create_device( + db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t", command_topic="t/set") + ) + published = {} + + async def mock_publish(topic, payload): + published[topic] = payload + + log = await send_command(db_session, device.id, '{"state":"on"}', mock_publish) + assert log is not None + assert log.direction == "tx" + assert published["t/set"] == '{"state":"on"}' + + +async def test_send_command_no_command_topic(db_session): + device = await create_device(db_session, DeviceCreate(name="传感器", type="sensor", mqtt_topic="t")) + with pytest.raises(ValueError, match="no command_topic"): + await send_command(db_session, device.id, '{"value":1}') + + +async def test_handle_state_update(db_session): + device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="home/light")) + updated = await handle_state_update(db_session, "home/light", '{"state":"on"}') + assert updated is not None + assert updated.state == '{"state":"on"}' + assert updated.is_online is True + + +async def test_handle_state_update_unknown_topic(db_session): + result = await handle_state_update(db_session, "unknown/topic", "on") + assert result is None + + +async def test_get_device_logs(db_session): + device = await create_device( + db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t", command_topic="t/set") + ) + await send_command(db_session, device.id, '{"state":"on"}', lambda t, p: None) + await send_command(db_session, device.id, '{"state":"off"}', lambda t, p: None) + + logs = await get_device_logs(db_session, device.id, limit=10) + assert len(logs) == 2 + + +async def test_dashboard_stats(db_session): + await create_device(db_session, DeviceCreate(name="在线设备", type="switch", mqtt_topic="t1")) + device2 = await create_device(db_session, DeviceCreate(name="离线设备", type="sensor", mqtt_topic="t2")) + await handle_state_update(db_session, "t1", "online") + + stats = await get_dashboard_stats(db_session) + assert stats["total_devices"] == 2 + assert stats["online_devices"] == 1 + assert stats["offline_devices"] == 1 +``` + +- [ ] **Step 3: 运行测试** + +Run: `pytest tests/test_device_registry.py -v` +Expected: 11 passed + +- [ ] **Step 4: 提交** + +```bash +git add -A +git commit -m "feat: device registry with CRUD, state tracking, command sending, and log management" +``` + +--- + +### Task 7: HA Discovery 协议处理器 + +**Files:** +- Create: `src/mqtt_home/discovery.py` +- Create: `tests/test_discovery.py` + +- [ ] **Step 1: 创建 src/mqtt_home/discovery.py** + +```python +import json +import logging +from sqlalchemy.ext.asyncio import AsyncSession + +from mqtt_home.models import Device +from mqtt_home.mqtt_client import MqttClient +from mqtt_home.device_registry import create_device, handle_state_update + +logger = logging.getLogger(__name__) + +# HA Discovery 配置主题格式: homeassistant///config +DISCOVERY_TOPIC_PREFIX = "homeassistant/" + + +def parse_discovery_topic(topic: str) -> dict[str, str] | None: + """解析 HA Discovery 主题,返回 component 和 node_id""" + parts = topic.split("/") + if len(parts) < 4 or parts[0] != DISCOVERY_TOPIC_PREFIX.strip("/"): + return None + if parts[-1] != "config": + return None + return { + "component": parts[1], + "node_id": "/".join(parts[2:-1]), + } + + +async def handle_discovery_message( + topic: str, payload: str, db: AsyncSession, mqtt_client: MqttClient +) -> Device | None: + """处理 HA Discovery config 消息,自动注册设备并订阅状态主题""" + parsed = parse_discovery_topic(topic) + if not parsed: + return None + + try: + config = json.loads(payload) + except json.JSONDecodeError: + logger.warning("Invalid JSON in discovery payload for %s", topic) + return None + + state_topic = config.get("state_topic") + command_topic = config.get("command_topic") + device_name = config.get("name", config.get("device", {}).get("name", parsed["node_id"])) + device_class = config.get("device_class", "") + + if not state_topic: + logger.warning("Discovery config for %s has no state_topic", topic) + return None + + # 检查是否已存在(按 discovery_topic 去重) + from sqlalchemy import select + result = await db.execute( + select(Device).where(Device.discovery_topic == topic) + ) + existing = result.scalar_one_or_none() + if existing: + logger.debug("Device already registered: %s", topic) + return existing + + from mqtt_home.schemas import DeviceCreate + device = await create_device(db, DeviceCreate( + name=device_name, + type=parsed["component"], + protocol="ha_discovery", + mqtt_topic=state_topic, + command_topic=command_topic, + )) + + # 更新 discovery 相关字段 + device.discovery_topic = topic + device.discovery_payload = payload + device.attributes = json.dumps({ + "device_class": device_class, + "unit_of_measurement": config.get("unit_of_measurement"), + "icon": config.get("icon"), + }) + await db.commit() + await db.refresh(device) + + # 订阅状态主题 + await mqtt_client.subscribe(state_topic, qos=1) + logger.info("HA Discovery device registered: %s -> %s", device.name, state_topic) + + return device +``` + +- [ ] **Step 2: 创建 tests/test_discovery.py** + +```python +import json +import pytest +from unittest.mock import AsyncMock + +from mqtt_home.discovery import parse_discovery_topic, handle_discovery_message +from mqtt_home.config import Settings +from mqtt_home.mqtt_client import MqttClient + + +@pytest.fixture +def settings(): + return Settings( + mqtt_host="localhost", + emqx_api_url="http://localhost:18083/api/v5", + emqx_api_key="test-key", + emqx_api_secret="test-secret", + database_url="sqlite+aiosqlite:///:memory:", + ) + + +def test_parse_discovery_topic_valid(): + result = parse_discovery_topic("homeassistant/light/abc123/config") + assert result is not None + assert result["component"] == "light" + assert result["node_id"] == "abc123" + + +def test_parse_discovery_topic_nested_node(): + result = parse_discovery_topic("homeassistant/sensor/room/temperature/config") + assert result is not None + assert result["component"] == "sensor" + assert result["node_id"] == "room/temperature" + + +def test_parse_discovery_topic_invalid(): + assert parse_discovery_topic("other/topic/config") is None + assert parse_discovery_topic("homeassistant/light/abc") is None + assert parse_discovery_topic("homeassistant/light/abc/status") is None + + +async def test_handle_discovery_creates_device(db_session, settings): + mqtt_client = MqttClient(settings) + mqtt_client.subscribe = AsyncMock() + + payload = json.dumps({ + "name": "客厅灯", + "state_topic": "home/living/light/status", + "command_topic": "home/living/light/set", + "device_class": "light", + }) + + device = await handle_discovery_message( + "homeassistant/light/living_room/config", + payload, + db_session, + mqtt_client, + ) + assert device is not None + assert device.name == "客厅灯" + assert device.protocol == "ha_discovery" + assert device.mqtt_topic == "home/living/light/status" + mqtt_client.subscribe.assert_called_once_with("home/living/light/status", qos=1) + + +async def test_handle_discovery_duplicate(db_session, settings): + mqtt_client = MqttClient(settings) + mqtt_client.subscribe = AsyncMock() + + payload = json.dumps({ + "name": "灯", + "state_topic": "home/light", + }) + + await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client) + device2 = await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client) + # 同一个 discovery_topic 不应该创建重复设备 + assert device2 is not None + mqtt_client.subscribe.assert_called_once() # 只调用一次 +``` + +- [ ] **Step 3: 运行测试** + +Run: `pytest tests/test_discovery.py -v` +Expected: 5 passed + +- [ ] **Step 4: 提交** + +```bash +git add -A +git commit -m "feat: HA Discovery protocol handler for auto-registering devices" +``` + +--- + +### Task 8: FastAPI 应用与 REST API 路由 + +**Files:** +- Create: `src/mqtt_home/main.py` +- Create: `src/mqtt_home/api/__init__.py` +- Create: `src/mqtt_home/api/devices.py` +- Create: `src/mqtt_home/api/broker.py` +- Create: `src/mqtt_home/api/dashboard.py` +- Create: `tests/test_api_devices.py` +- Create: `tests/test_api_broker.py` + +- [ ] **Step 1: 创建 src/mqtt_home/api/__init__.py** + +```python +from fastapi import APIRouter +from mqtt_home.api.devices import router as devices_router +from mqtt_home.api.broker import router as broker_router +from mqtt_home.api.dashboard import router as dashboard_router + +api_router = APIRouter(prefix="/api") +api_router.include_router(devices_router, prefix="/devices", tags=["devices"]) +api_router.include_router(broker_router, prefix="/broker", tags=["broker"]) +api_router.include_router(dashboard_router, prefix="/dashboard", tags=["dashboard"]) +``` + +- [ ] **Step 2: 创建 src/mqtt_home/api/devices.py** + +```python +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.ext.asyncio import AsyncSession + +from mqtt_home.database import get_db +from mqtt_home.device_registry import ( + list_devices, get_device, create_device, update_device, + delete_device, send_command, get_device_logs, +) +from mqtt_home.schemas import ( + DeviceCreate, DeviceUpdate, DeviceCommand, + DeviceResponse, DeviceLogResponse, +) + +router = APIRouter() + + +@router.get("", response_model=list[DeviceResponse]) +async def get_devices(db: AsyncSession = Depends(get_db)): + return await list_devices(db) + + +@router.post("", response_model=DeviceResponse, status_code=201) +async def add_device(data: DeviceCreate, db: AsyncSession = Depends(get_db)): + return await create_device(db, data) + + +@router.get("/{device_id}", response_model=DeviceResponse) +async def get_device_detail(device_id: str, db: AsyncSession = Depends(get_db)): + device = await get_device(db, device_id) + if not device: + raise HTTPException(status_code=404, detail="Device not found") + return device + + +@router.put("/{device_id}", response_model=DeviceResponse) +async def patch_device(device_id: str, data: DeviceUpdate, db: AsyncSession = Depends(get_db)): + device = await update_device(db, device_id, data) + if not device: + raise HTTPException(status_code=404, detail="Device not found") + return device + + +@router.delete("/{device_id}", status_code=204) +async def remove_device(device_id: str, db: AsyncSession = Depends(get_db)): + if not await delete_device(db, device_id): + raise HTTPException(status_code=404, detail="Device not found") + + +@router.post("/{device_id}/command", response_model=DeviceLogResponse) +async def command_device( + device_id: str, data: DeviceCommand, + db: AsyncSession = Depends(get_db), +): + # 从应用状态获取 mqtt_client 和 publish_fn + from mqtt_home.main import app + mqtt_client = getattr(app.state, "mqtt_client", None) + if not mqtt_client or not mqtt_client.is_connected: + raise HTTPException(status_code=503, detail="MQTT not connected") + + try: + log = await send_command( + db, device_id, data.payload, + publish_fn=mqtt_client.publish, + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + if not log: + raise HTTPException(status_code=404, detail="Device not found") + return log + + +@router.get("/{device_id}/logs", response_model=list[DeviceLogResponse]) +async def read_device_logs(device_id: str, limit: int = 20, db: AsyncSession = Depends(get_db)): + return await get_device_logs(db, device_id, limit) +``` + +- [ ] **Step 3: 创建 src/mqtt_home/api/broker.py** + +```python +from fastapi import APIRouter, Depends, HTTPException + +from mqtt_home.schemas import BrokerClient, BrokerTopic + +router = APIRouter() + + +@router.get("/status") +async def broker_status(): + from mqtt_home.main import app + emqx = getattr(app.state, "emqx_client", None) + if not emqx: + raise HTTPException(status_code=503, detail="EMQX API client not configured") + try: + status = await emqx.get_broker_status() + metrics = await emqx.get_metrics() + return {"status": status, "metrics": metrics} + except Exception as e: + raise HTTPException(status_code=502, detail=f"EMQX API error: {e}") + + +@router.get("/clients", response_model=list[BrokerClient]) +async def broker_clients(limit: int = 100): + from mqtt_home.main import app + emqx = getattr(app.state, "emqx_client", None) + if not emqx: + raise HTTPException(status_code=503, detail="EMQX API client not configured") + try: + return await emqx.get_clients(limit) + except Exception as e: + raise HTTPException(status_code=502, detail=f"EMQX API error: {e}") + + +@router.get("/topics", response_model=list[BrokerTopic]) +async def broker_topics(limit: int = 100): + from mqtt_home.main import app + emqx = getattr(app.state, "emqx_client", None) + if not emqx: + raise HTTPException(status_code=503, detail="EMQX API client not configured") + try: + return await emqx.get_topics(limit) + except Exception as e: + raise HTTPException(status_code=502, detail=f"EMQX API error: {e}") +``` + +- [ ] **Step 4: 创建 src/mqtt_home/api/dashboard.py** + +```python +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +from mqtt_home.database import get_db +from mqtt_home.device_registry import get_dashboard_stats +from mqtt_home.schemas import DashboardStats + +router = APIRouter() + + +@router.get("", response_model=DashboardStats) +async def dashboard(db: AsyncSession = Depends(get_db)): + return await get_dashboard_stats(db) +``` + +- [ ] **Step 5: 创建 src/mqtt_home/main.py** + +```python +import asyncio +import logging +from contextlib import asynccontextmanager + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from mqtt_home.config import get_settings +from mqtt_home.database import init_db, get_session_factory, Base +from mqtt_home.mqtt_client import MqttClient +from mqtt_home.emqx_api import EmqxApiClient +from mqtt_home.discovery import handle_discovery_message +from mqtt_home.device_registry import handle_state_update +from mqtt_home.api import api_router + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger(__name__) + + +@asynccontextmanager +async def lifespan(app: FastAPI): + settings = get_settings() + + # 初始化数据库 + await init_db() + logger.info("Database initialized") + + # 初始化 EMQX API 客户端 + emqx = EmqxApiClient(settings) + app.state.emqx_client = emqx + logger.info("EMQX API client initialized") + + # 初始化 MQTT 客户端 + mqtt = MqttClient(settings) + app.state.mqtt_client = mqtt + + session_factory = get_session_factory() + + # 注册 HA Discovery 回调 + async def on_discovery(topic: str, payload: str): + async with session_factory() as db: + await handle_discovery_message(topic, payload, db, mqtt) + + # 注册状态更新回调 + async def on_state(topic: str, payload: str): + async with session_factory() as db: + device = await handle_state_update(db, topic, payload) + # WebSocket 推送会在前端 Task 中添加 + + mqtt.on_message("homeassistant/#", on_discovery) + mqtt.on_message("home/#", on_state) + + await mqtt.start() + logger.info("MQTT client started") + + yield + + await mqtt.stop() + await emqx.close() + logger.info("Shutdown complete") + + +app = FastAPI(title="MQTT Home", version="0.1.0", lifespan=lifespan) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +app.include_router(api_router) + + +@app.get("/health") +async def health(): + mqtt = getattr(app.state, "mqtt_client", None) + return { + "status": "ok", + "mqtt_connected": mqtt.is_connected if mqtt else False, + } +``` + +- [ ] **Step 6: 创建 tests/test_api_devices.py** + +```python +import pytest +from httpx import AsyncClient, ASGITransport + +from mqtt_home.main import app +from mqtt_home.database import Base, get_engine, get_session_factory +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker +from sqlalchemy.pool import StaticPool + + +TEST_DB = "sqlite+aiosqlite:///:memory:" + + +@pytest.fixture +async def client(): + engine = create_async_engine(TEST_DB, connect_args={"check_same_thread": False}, poolclass=StaticPool) + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + test_factory = async_sessionmaker(engine, expire_on_commit=False) + + async def override_get_db(): + async with test_factory() as session: + yield session + + app.dependency_overrides[get_db.__wrapped__ if hasattr(get_db, '__wrapped__') else get_db] = override_get_db + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + yield ac + + app.dependency_overrides.clear() + await engine.dispose() + + +async def test_get_devices_empty(client): + resp = await client.get("/api/devices") + assert resp.status_code == 200 + assert resp.json() == [] + + +async def test_create_device(client): + resp = await client.post("/api/devices", json={ + "name": "客厅灯", + "type": "light", + "mqtt_topic": "home/light", + "command_topic": "home/light/set", + }) + assert resp.status_code == 201 + data = resp.json() + assert data["name"] == "客厅灯" + assert data["type"] == "light" + + +async def test_get_device_detail(client): + create_resp = await client.post("/api/devices", json={ + "name": "灯", "type": "switch", "mqtt_topic": "t" + }) + device_id = create_resp.json()["id"] + + resp = await client.get(f"/api/devices/{device_id}") + assert resp.status_code == 200 + assert resp.json()["name"] == "灯" + + +async def test_get_device_not_found(client): + resp = await client.get("/api/devices/nonexistent") + assert resp.status_code == 404 + + +async def test_update_device(client): + create_resp = await client.post("/api/devices", json={ + "name": "灯", "type": "switch", "mqtt_topic": "t" + }) + device_id = create_resp.json()["id"] + + resp = await client.put(f"/api/devices/{device_id}", json={"name": "新名字"}) + assert resp.status_code == 200 + assert resp.json()["name"] == "新名字" + + +async def test_delete_device(client): + create_resp = await client.post("/api/devices", json={ + "name": "灯", "type": "switch", "mqtt_topic": "t" + }) + device_id = create_resp.json()["id"] + + resp = await client.delete(f"/api/devices/{device_id}") + assert resp.status_code == 204 + + +async def test_delete_device_not_found(client): + resp = await client.delete("/api/devices/nonexistent") + assert resp.status_code == 404 +``` + +- [ ] **Step 7: 创建 tests/test_api_broker.py** + +```python +import pytest +from httpx import AsyncClient, ASGITransport +from unittest.mock import AsyncMock, patch + +from mqtt_home.main import app + + +@pytest.fixture +async def client(): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as ac: + yield ac + + +async def test_broker_status_no_client(client): + # 没有 mqtt_client 时应返回 503 + resp = await client.get("/api/broker/status") + assert resp.status_code == 503 + + +async def test_broker_clients_no_client(client): + resp = await client.get("/api/broker/clients") + assert resp.status_code == 503 + + +async def test_health_endpoint(client): + resp = await client.get("/health") + assert resp.status_code == 200 + data = resp.json() + assert "status" in data + assert "mqtt_connected" in data +``` + +- [ ] **Step 8: 运行所有测试** + +Run: `pytest tests/ -v` +Expected: All tests pass + +- [ ] **Step 9: 提交** + +```bash +git add -A +git commit -m "feat: FastAPI app with device, broker, dashboard REST API routes" +``` + +--- + +### Task 9: WebSocket 实时推送 + +**Files:** +- Modify: `src/mqtt_home/main.py` +- Create: `src/mqtt_home/ws.py` + +- [ ] **Step 1: 创建 src/mqtt_home/ws.py** + +```python +import asyncio +import json +import logging +from fastapi import WebSocket, WebSocketDisconnect + +logger = logging.getLogger(__name__) + + +class ConnectionManager: + def __init__(self): + self._connections: list[WebSocket] = [] + + async def connect(self, ws: WebSocket): + await ws.accept() + self._connections.append(ws) + logger.info("WebSocket connected, total: %d", len(self._connections)) + + def disconnect(self, ws: WebSocket): + self._connections.remove(ws) + logger.info("WebSocket disconnected, total: %d", len(self._connections)) + + async def broadcast(self, message: dict): + dead = [] + for ws in self._connections: + try: + await ws.send_json(message) + except Exception: + dead.append(ws) + for ws in dead: + self.disconnect(ws) + + +ws_manager = ConnectionManager() + + +async def websocket_endpoint(ws: WebSocket): + await ws_manager.connect(ws) + try: + while True: + await ws.receive_text() # 保持连接,忽略客户端消息 + except WebSocketDisconnect: + ws_manager.disconnect(ws) + + +async def broadcast_device_update(device_id: str, data: dict): + await ws_manager.broadcast({ + "type": "device_update", + "device_id": device_id, + **data, + }) +``` + +- [ ] **Step 2: 修改 src/mqtt_home/main.py,添加 WebSocket 路由和广播** + +在 `from mqtt_home.api import api_router` 后面添加: +```python +from mqtt_home.ws import websocket_endpoint, broadcast_device_update +``` + +在 `app.include_router(api_router)` 后面添加: +```python +from mqtt_home.ws import ws_manager + +app.websocket("/ws/devices")(websocket_endpoint) +``` + +修改 `on_state` 回调,添加 WebSocket 广播: +```python + async def on_state(topic: str, payload: str): + async with session_factory() as db: + device = await handle_state_update(db, topic, payload) + if device: + await broadcast_device_update(device.id, { + "state": device.state, + "is_online": device.is_online, + "last_seen": device.last_seen.isoformat() if device.last_seen else None, + }) +``` + +- [ ] **Step 3: 提交** + +```bash +git add -A +git commit -m "feat: WebSocket endpoint for real-time device state updates" +``` + +--- + +### Task 10: CLI 命令 + +**Files:** +- Create: `src/mqtt_home/cli.py` +- Create: `tests/test_cli.py` + +- [ ] **Step 1: 创建 src/mqtt_home/cli.py** + +```python +import asyncio +import json +import click + +from mqtt_home.config import get_settings +from mqtt_home.database import init_db, get_session_factory +from mqtt_home.emqx_api import EmqxApiClient +from mqtt_home.device_registry import ( + list_devices, get_device, create_device, delete_device, + send_command, get_device_logs, +) +from mqtt_home.schemas import DeviceCreate +from mqtt_home.mqtt_client import MqttClient +from mqtt_home.discovery import handle_discovery_message, DISCOVERY_TOPIC_PREFIX +from mqtt_home.device_registry import handle_state_update + + +def run_async(coro): + return asyncio.get_event_loop().run_until_complete(coro) + + +@click.group() +def cli(): + """MQTT 智能家居管理工具""" + pass + + +@cli.group() +def device(): + """设备管理""" + pass + + +@device.command("list") +def device_list(): + """列出所有设备""" + async def _run(): + settings = get_settings() + await init_db() + factory = get_session_factory() + async with factory() as db: + devices = await list_devices(db) + if not devices: + click.echo("暂无设备") + return + for d in devices: + status = "🟢" if d.is_online else "🔴" + state = d.state or "-" + click.echo(f"{status} [{d.id[:8]}] {d.name} ({d.type}) state={state}") + + run_async(_run()) + + +@device.command("add") +@click.option("--name", required=True, help="设备名称") +@click.option("--type", "device_type", default="switch", help="设备类型") +@click.option("--state-topic", required=True, help="状态主题") +@click.option("--command-topic", default=None, help="命令主题") +def device_add(name, device_type, state_topic, command_topic): + """手动添加设备""" + async def _run(): + settings = get_settings() + await init_db() + factory = get_session_factory() + async with factory() as db: + d = await create_device(db, DeviceCreate( + name=name, + type=device_type, + mqtt_topic=state_topic, + command_topic=command_topic, + )) + click.echo(f"设备已创建: {d.id} - {d.name}") + + run_async(_run()) + + +@device.command("info") +@click.argument("device_id") +def device_info(device_id): + """查看设备详情""" + async def _run(): + settings = get_settings() + await init_db() + factory = get_session_factory() + async with factory() as db: + d = await get_device(db, device_id) + if not d: + click.echo(f"设备不存在: {device_id}", err=True) + return + click.echo(f"ID: {d.id}") + click.echo(f"名称: {d.name}") + click.echo(f"类型: {d.type}") + click.echo(f"协议: {d.protocol}") + click.echo(f"状态主题: {d.mqtt_topic}") + click.echo(f"命令主题: {d.command_topic or '无'}") + click.echo(f"当前状态: {d.state or '无'}") + click.echo(f"在线: {'是' if d.is_online else '否'}") + click.echo(f"最后活跃: {d.last_seen or '从未'}") + + run_async(_run()) + + +@device.command("remove") +@click.argument("device_id") +def device_remove(device_id): + """删除设备""" + async def _run(): + settings = get_settings() + await init_db() + factory = get_session_factory() + async with factory() as db: + if await delete_device(db, device_id): + click.echo(f"设备已删除: {device_id}") + else: + click.echo(f"设备不存在: {device_id}", err=True) + + run_async(_run()) + + +@device.command("command") +@click.argument("device_id") +@click.option("--payload", required=True, help="命令内容(JSON)") +def device_command(device_id, payload): + """向设备发送命令""" + async def _run(): + settings = get_settings() + await init_db() + factory = get_session_factory() + + # 连接 MQTT + mqtt = MqttClient(settings) + await mqtt._run.__wrapped__(mqtt) if False else None # 不启动消息循环 + + # 直接使用 aiomqtt 发送 + import aiomqtt + async with aiomqtt.Client( + hostname=settings.mqtt_host, + port=settings.mqtt_port, + username=settings.mqtt_username or None, + password=settings.mqtt_password or None, + ) as client: + async with factory() as db: + try: + log = await send_command( + db, device_id, payload, + publish_fn=lambda topic, p: client.publish(topic, p, qos=1), + ) + if log: + click.echo(f"命令已发送到 {log.topic}: {log.payload}") + else: + click.echo(f"设备不存在: {device_id}", err=True) + except ValueError as e: + click.echo(str(e), err=True) + + run_async(_run()) + + +@device.command("logs") +@click.argument("device_id") +@click.option("--limit", default=20, help="日志条数") +def device_logs(device_id, limit): + """查看设备消息日志""" + async def _run(): + settings = get_settings() + await init_db() + factory = get_session_factory() + async with factory() as db: + logs = await get_device_logs(db, device_id, limit) + if not logs: + click.echo("暂无日志") + return + for log in logs: + direction = "RX" if log.direction == "rx" else "TX" + click.echo(f"[{log.timestamp}] {direction} {log.topic} | {log.payload}") + + run_async(_run()) + + +@cli.group() +def broker(): + """Broker 管理""" + pass + + +@broker.command("status") +def broker_status(): + """查看 Broker 状态""" + async def _run(): + settings = get_settings() + emqx = EmqxApiClient(settings) + try: + status = await emqx.get_broker_status() + click.echo(f"版本: {status.get('version', 'unknown')}") + click.echo(f"运行时间: {status.get('uptime', 0)}s") + metrics = await emqx.get_metrics() + click.echo(f"连接数: {metrics.get('connections.count', 0)}") + click.echo(f"订阅数: {metrics.get('subscriptions.count', 0)}") + click.echo(f"主题数: {metrics.get('topics.count', 0)}") + except Exception as e: + click.echo(f"连接 EMQX 失败: {e}", err=True) + finally: + await emqx.close() + + run_async(_run()) + + +@broker.command("clients") +def broker_clients(): + """列出已连接客户端""" + async def _run(): + settings = get_settings() + emqx = EmqxApiClient(settings) + try: + clients = await emqx.get_clients() + if not clients: + click.echo("暂无客户端连接") + return + for c in clients: + status = "在线" if c.get("connected") else "离线" + click.echo(f"[{c.get('clientid')}] {status} ip={c.get('ip_address')}") + except Exception as e: + click.echo(f"连接 EMQX 失败: {e}", err=True) + finally: + await emqx.close() + + run_async(_run()) + + +@broker.command("topics") +def broker_topics(): + """列出活跃主题""" + async def _run(): + settings = get_settings() + emqx = EmqxApiClient(settings) + try: + topics = await emqx.get_topics() + if not topics: + click.echo("暂无活跃主题") + return + for t in topics: + click.echo(t.get("topic", "")) + except Exception as e: + click.echo(f"连接 EMQX 失败: {e}", err=True) + finally: + await emqx.close() + + run_async(_run()) +``` + +- [ ] **Step 2: 创建 tests/test_cli.py** + +```python +import pytest +from click.testing import CliRunner +from mqtt_home.cli import cli + + +def test_device_list_empty(monkeypatch): + """测试无设备时的 list 命令""" + runner = CliRunner() + + async def mock_init(): + pass + + async def mock_list(db): + return [] + + monkeypatch.setattr("mqtt_home.cli.init_db", mock_init) + monkeypatch.setattr("mqtt_home.cli.list_devices", mock_list) + + result = runner.invoke(cli, ["device", "list"]) + assert result.exit_code == 0 + assert "暂无设备" in result.output + + +def test_cli_groups(): + runner = CliRunner() + result = runner.invoke(cli, ["--help"]) + assert "device" in result.output + assert "broker" in result.output +``` + +- [ ] **Step 3: 运行测试** + +Run: `pytest tests/test_cli.py -v` +Expected: 2 passed + +- [ ] **Step 4: 提交** + +```bash +git add -A +git commit -m "feat: CLI commands for device management and broker monitoring" +``` + +--- + +### Task 11: 创建 .env 并验证端到端 + +**Files:** +- Create: `.env` + +- [ ] **Step 1: 创建 .env(从 .env.example 复制并填入实际值)** + +``` +MQTT_HOST=192.168.0.31 +MQTT_PORT=1883 +MQTT_USERNAME= +MQTT_PASSWORD= +EMQX_API_URL=http://192.168.0.31:18083/api/v5 +EMQX_API_KEY=ed74cf28e117c5f6 +EMQX_API_SECRET=fED31TaybOUXx9CtkabICmT0wEZuUPXSU0vPwIekSbbC +DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db +WEB_HOST=0.0.0.0 +WEB_PORT=8000 +``` + +- [ ] **Step 2: 启动服务验证** + +Run: `python -m mqtt_home serve` +Expected: 服务启动成功,日志显示 "MQTT connected" 和 "Database initialized" + +- [ ] **Step 3: 用 CLI 验证 broker 连接** + +Run: `mqtt-home broker status` +Expected: 显示 EMQX 版本和统计信息 + +- [ ] **Step 4: 用 API 验证** + +Run: `curl http://localhost:8000/health` +Expected: `{"status":"ok","mqtt_connected":true}` + +Run: `curl http://localhost:8000/api/broker/clients` +Expected: 客户端列表 JSON + +- [ ] **Step 5: 提交(.env 已在 .gitignore 中)** + +```bash +git add -A +git commit -m "feat: verify end-to-end backend with EMQX broker connection" +``` + +--- + +### Task 12: 全量测试确认 + +- [ ] **Step 1: 运行所有测试** + +Run: `pytest tests/ -v` +Expected: 所有测试通过 + +- [ ] **Step 2: 修复任何失败的测试** + +根据实际输出修复问题。 + +- [ ] **Step 3: 最终提交** + +```bash +git add -A +git commit -m "test: ensure all backend tests pass" +``` + +--- + +## 自检结果 + +| 设计文档章节 | 对应 Task | +|---|---| +| 项目结构 | Task 1 | +| 数据模型 (devices, device_logs) | Task 2 | +| Pydantic Schemas | Task 3 | +| EMQX REST API 封装 | Task 4 | +| MQTT 客户端 + 重连 + 通配符 | Task 5 | +| 设备注册表 (CRUD + 状态 + 命令) | Task 6 | +| HA Discovery 协议 | Task 7 | +| REST API 路由 (devices, broker, dashboard) | Task 8 | +| WebSocket 实时推送 | Task 9 | +| CLI 命令 | Task 10 | +| 配置 / .env | Task 11 | diff --git a/docs/superpowers/specs/2026-03-29-mqtt-home-design.md b/docs/superpowers/specs/2026-03-29-mqtt-home-design.md new file mode 100644 index 0000000..f556c0e --- /dev/null +++ b/docs/superpowers/specs/2026-03-29-mqtt-home-design.md @@ -0,0 +1,232 @@ +# MQTT 智能家居管理系统 - 设计文档 + +## 概述 + +一个轻量级的智能家居管理项目,通过自建 EMQX Broker 管理 MQTT 设备。提供 Web 管理页面和 CLI 命令行工具,支持 Home Assistant Discovery 协议和自定义协议。 + +## 技术选型 + +| 组件 | 选择 | 理由 | +|------|------|------| +| 后端框架 | FastAPI | 原生异步,适合 MQTT 长连接;自带 API 文档 | +| MQTT 客户端 | aiomqtt (paho-mqtt) | 异步兼容 FastAPI | +| 前端 | Vue 3 + Vite + Tailwind CSS + Pinia | 现代单页应用,响应式 | +| 数据库 | SQLite + aiosqlite + SQLAlchemy | 零配置,单文件,适合单进程 | +| CLI | click | 与 Web API 共享业务逻辑 | +| 配置管理 | pydantic-settings | 类型安全的环境变量加载 | + +## 架构方案:单体 FastAPI + +采用单体架构,FastAPI 同时处理 REST API 和 MQTT 连接。单进程部署,Web 和 CLI 共享同一套业务逻辑。 + +``` +EMQX Broker <--MQTT--> FastAPI MQTT Client <---> SQLite + | ^ + REST API CLI (click) + | + Vue SPA +``` + +**优势:** 最简部署(一个 Python 进程 + 一个 SQLite 文件),零逻辑重复。 +**局限:** MQTT 客户端和 Web 服务共享进程,不适合多实例水平扩展。 + +## 项目结构 + +``` +mqtt-home/ +├── pyproject.toml # 依赖管理、入口点 +├── .env # EMQX 凭据(gitignore) +├── .env.example # 配置模板 +├── src/ +│ └── mqtt_home/ +│ ├── __init__.py +│ ├── config.py # 环境变量配置(pydantic-settings) +│ ├── main.py # FastAPI 应用 + 生命周期(启停 MQTT) +│ ├── database.py # SQLite 引擎、会话、Base +│ ├── models.py # SQLAlchemy ORM 模型 +│ ├── schemas.py # Pydantic 请求/响应模型 +│ ├── mqtt_client.py # aiomqtt 客户端:订阅、发布、回调 +│ ├── device_registry.py # 业务逻辑:添加/删除/控制设备 +│ ├── discovery.py # HA Discovery 协议处理器 +│ ├── emqx_api.py # EMQX REST API 封装 +│ ├── cli.py # click CLI 命令 +│ └── api/ +│ ├── __init__.py +│ ├── devices.py # REST 路由:设备增删改查 + 控制 +│ ├── broker.py # REST 路由:Broker 状态、客户端、主题 +│ └── dashboard.py # REST 路由:聚合统计 +└── frontend/ # Vue SPA + └── ... +``` + +**模块职责:** +- `device_registry.py` — 设备业务逻辑的唯一入口,API 路由和 CLI 都从此导入 +- `mqtt_client.py` — MQTT 连接生命周期管理 +- `discovery.py` — 在 mqtt_client 上注册 HA Discovery 主题回调 +- `emqx_api.py` — EMQX REST API 的薄封装,处理认证和端点映射 +- 模块间通过直接函数调用通信,无消息总线或事件队列 + +## 数据模型 + +### devices 设备表 + +| 字段 | 类型 | 说明 | +|------|------|------| +| id | TEXT (UUID) | 主键 | +| name | TEXT | 设备名称 | +| type | TEXT | 设备类型:switch, light, sensor, binary_sensor, climate 等 | +| protocol | TEXT | ha_discovery 或 custom | +| mqtt_topic | TEXT | 状态主题(如 homeassistant/light/living_room/status) | +| command_topic | TEXT | 命令主题(发送控制指令) | +| discovery_topic | TEXT | 原始 HA 发现主题(可为空) | +| discovery_payload | TEXT (JSON) | 原始发现配置(可为空) | +| attributes | TEXT (JSON) | 设备属性 | +| state | TEXT | 当前状态(on, off, 温度值等) | +| is_online | BOOLEAN | 在线状态(根据 last_seen 超时判断) | +| last_seen | DATETIME | 最后收到 MQTT 消息的时间 | +| created_at | DATETIME | 创建时间 | +| updated_at | DATETIME | 更新时间 | + +### device_logs 设备日志表 + +| 字段 | 类型 | 说明 | +|------|------|------| +| id | INTEGER (自增) | 主键 | +| device_id | TEXT (外键) | 关联设备 | +| direction | TEXT | rx(收到)或 tx(发送命令) | +| topic | TEXT | MQTT 主题 | +| payload | TEXT | 消息内容 | +| timestamp | DATETIME | 时间戳 | + +每设备保留最近 100 条日志,超出自动清理。 + +### 设备状态流转 + +1. **HA Discovery 自动注册** — 收到 `homeassistant///config` 消息时自动创建设备记录 +2. **手动添加** — 通过 API/CLI 指定主题、类型、命令主题 +3. **状态更新** — 设备状态主题收到消息 → 更新 state + last_seen +4. **设备控制** — API/CLI 向命令主题发布消息 → 同时记录日志 +5. **在线判断** — last_seen 超过 60 秒标记为离线 + +## MQTT 集成 + +### HA Discovery 协议(被动,自动) + +- 启动时订阅 `homeassistant/#` +- 解析 `config` 主题 → 自动创建设备记录 +- 自动订阅每个设备的 `state_topic` +- 处理 `availability` 主题判断设备上下线 +- 支持标准 HA Discovery 所有字段:device class, unit of measurement, state class 等 + +### 自定义协议(手动,用户定义) + +- 用户通过 CLI 或 Web UI 添加设备,指定: + - 状态主题(监听状态更新的位置) + - 命令主题(发送控制指令的位置) + - JSON 负载格式(简单模板,如 `{"state": "{{value}}"}`) +- 系统订阅状态主题并追踪变化 + +### 通用 MQTT 行为 + +- 使用 `aiomqtt`(基于 `paho-mqtt` 的异步封装) +- 所有订阅使用 QoS 1(至少一次投递) +- 使用 Retained 消息保持状态持久化(订阅时读取保留消息) +- 连接断开时指数退避重连(初始 1 秒,最大 60 秒) +- 客户端 ID:`mqtt-home-{uuid}` + +## API 设计 + +### REST API(FastAPI 路由) + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/devices` | 获取所有设备列表 | +| POST | `/api/devices` | 手动添加设备 | +| GET | `/api/devices/{id}` | 获取设备详情 | +| PUT | `/api/devices/{id}` | 更新设备配置 | +| DELETE | `/api/devices/{id}` | 删除设备 | +| POST | `/api/devices/{id}/command` | 向设备发送控制命令 | +| GET | `/api/devices/{id}/logs` | 获取设备最近消息日志 | +| GET | `/api/broker/status` | EMQX Broker 统计信息 | +| GET | `/api/broker/clients` | 已连接 MQTT 客户端列表 | +| GET | `/api/broker/topics` | 活跃主题列表 | +| GET | `/api/dashboard` | 聚合统计(设备在线/离线数、最近活动) | + +### WebSocket + +- `/ws/devices` — 设备状态变化实时推送(state, is_online 变更时推送) + +### CLI 命令 + +```bash +# 设备管理 +mqtt-home device list +mqtt-home device add --name "客厅灯" --type light --state-topic home/living/light --command-topic home/living/light/set +mqtt-home device info +mqtt-home device remove +mqtt-home device command --payload '{"state":"on"}' +mqtt-home device logs [--limit 20] + +# Broker 管理 +mqtt-home broker status +mqtt-home broker clients +mqtt-home broker topics + +# 服务启动 +python -m mqtt_home serve +``` + +## 前端设计 + +### 页面 + +- **仪表盘** — 设备数量统计、在线/离线状态、最近活动时间线 +- **设备列表** — 网格视图,每个设备卡片显示类型图标、名称、当前状态、在线指示灯、快捷切换 +- **设备详情** — 状态展示、控制面板(根据类型显示开关/滑块/数值输入)、消息日志 +- **Broker 管理** — 已连接客户端列表、活跃主题、Broker 健康状态 + +### 关键交互 + +- WebSocket 实时推送设备状态变化 +- 设备卡片根据设备类型显示对应图标和颜色 +- 控制面板根据设备类型自适应(switch → 切换按钮,light → 开关+亮度滑块,sensor → 只读数值显示) +- 响应式布局,支持移动端访问 + +## 配置 + +### 环境变量 + +``` +MQTT_HOST=192.168.0.31 +MQTT_PORT=1883 +MQTT_USERNAME= +MQTT_PASSWORD= +EMQX_API_URL=http://192.168.0.31:18083/api/v5 +EMQX_API_KEY= +EMQX_API_SECRET= +DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db +WEB_HOST=0.0.0.0 +WEB_PORT=8000 +``` + +### EMQX 连接信息 + +- MQTT Broker:`192.168.0.31:1883` +- EMQX Dashboard:`http://192.168.0.31:18083` +- EMQX REST API:`http://192.168.0.31:18083/api/v5` +- 认证方式:API Key + Secret Key(HTTP Basic Auth) + +## 错误处理 + +- MQTT 连接失败:记录日志,自动重连,Web 端显示连接状态 +- EMQX API 请求失败:返回错误信息,不阻塞核心功能 +- 设备命令发送失败:记录日志,返回错误给用户 +- 数据库操作失败:事务回滚,返回 500 错误 + +## 部署 + +- Python 3.11+ +- `pip install -e .` 安装 +- `python -m mqtt_home serve` 启动(Web + MQTT) +- CLI 可独立使用,不需要启动 Web 服务 +- 可选 docker-compose 部署