Files
HomeOS/docs/superpowers/plans/2026-03-29-mqtt-home-backend.md

62 KiB
Raw Blame History

MQTT 智能家居管理系统 - 后端实现计划

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 实现后端核心功能,包括 FastAPI REST API、MQTT 客户端、EMQX API 封装、CLI 命令、SQLite 数据库。

Architecture: 单体 FastAPI 应用,通过 aiomqtt 连接 EMQX Broker使用 SQLite + SQLAlchemy 存储设备数据。CLI 和 REST API 共享 device_registry 业务逻辑层。

Tech Stack: Python 3.11+, FastAPI, uvicorn, aiomqtt, SQLAlchemy 2.0 + aiosqlite, pydantic-settings, click, httpx


文件结构总览

mqtt-home/
├── pyproject.toml
├── .env.example
├── .gitignore
├── src/
│   └── mqtt_home/
│       ├── __init__.py
│       ├── config.py           # Settings (pydantic-settings)
│       ├── database.py         # SQLite engine, async session
│       ├── models.py           # SQLAlchemy ORM (Device, DeviceLog)
│       ├── schemas.py          # Pydantic schemas (request/response)
│       ├── emqx_api.py         # EMQX REST API client (httpx)
│       ├── mqtt_client.py      # MQTT connection lifecycle (aiomqtt)
│       ├── discovery.py        # HA Discovery protocol handler
│       ├── device_registry.py  # Business logic (CRUD + control)
│       ├── main.py             # FastAPI app + lifespan
│       ├── cli.py              # click CLI commands
│       └── api/
│           ├── __init__.py     # Router aggregation
│           ├── devices.py      # Device REST routes
│           ├── broker.py       # Broker REST routes
│           └── dashboard.py    # Dashboard REST routes
└── tests/
    ├── conftest.py
    ├── test_config.py
    ├── test_models.py
    ├── test_device_registry.py
    ├── test_emqx_api.py
    ├── test_discovery.py
    ├── test_api_devices.py
    ├── test_api_broker.py
    └── test_cli.py

Task 1: 项目骨架与配置

Files:

  • Create: pyproject.toml

  • Create: .env.example

  • Create: .gitignore

  • Create: src/mqtt_home/__init__.py

  • Create: src/mqtt_home/config.py

  • Create: tests/conftest.py

  • Create: tests/test_config.py

  • Step 1: 创建 pyproject.toml

[build-system]
requires = ["setuptools>=68.0", "wheel"]
build-backend = "setuptools.backends._legacy:_Backend"

[project]
name = "mqtt-home"
version = "0.1.0"
description = "轻量级智能家居 MQTT 设备管理系统"
requires-python = ">=3.11"
dependencies = [
    "fastapi>=0.115.0",
    "uvicorn[standard]>=0.30.0",
    "aiomqtt>=2.0.0",
    "sqlalchemy[asyncio]>=2.0.0",
    "aiosqlite>=0.20.0",
    "pydantic-settings>=2.0.0",
    "click>=8.1.0",
    "httpx>=0.27.0",
    "websockets>=12.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0.0",
    "pytest-asyncio>=0.23.0",
    "pytest-httpx>=0.30.0",
    "httpx>=0.27.0",
]

[project.scripts]
mqtt-home = "mqtt_home.cli:cli"

[tool.setuptools.packages.find]
where = ["src"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
  • Step 2: 创建 .env.example
MQTT_HOST=192.168.0.31
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
EMQX_API_URL=http://192.168.0.31:18083/api/v5
EMQX_API_KEY=your-api-key
EMQX_API_SECRET=your-secret
DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db
WEB_HOST=0.0.0.0
WEB_PORT=8000
  • Step 3: 创建 .gitignore
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.venv/
.env
data/
*.db
frontend/node_modules/
frontend/dist/
  • Step 4: 创建 src/mqtt_home/init.py
  • Step 5: 创建 tests/conftest.py
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import StaticPool

from mqtt_home.database import Base
from mqtt_home.config import Settings

TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"


@pytest.fixture
def settings():
    return Settings(
        mqtt_host="localhost",
        mqtt_port=1883,
        emqx_api_url="http://localhost:18083/api/v5",
        emqx_api_key="test-key",
        emqx_api_secret="test-secret",
        database_url=TEST_DATABASE_URL,
    )


@pytest.fixture
async def engine():
    engine = create_async_engine(
        TEST_DATABASE_URL,
        connect_args={"check_same_thread": False},
        poolclass=StaticPool,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield engine
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
    await engine.dispose()


@pytest.fixture
async def db_session(engine):
    async_session = async_sessionmaker(engine, expire_on_commit=False)
    async with async_session() as session:
        yield session
  • Step 6: 创建 src/mqtt_home/config.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    mqtt_host: str = "localhost"
    mqtt_port: int = 1883
    mqtt_username: str = ""
    mqtt_password: str = ""
    emqx_api_url: str = "http://localhost:18083/api/v5"
    emqx_api_key: str = ""
    emqx_api_secret: str = ""
    database_url: str = "sqlite+aiosqlite:///./data/mqtt_home.db"
    web_host: str = "0.0.0.0"
    web_port: int = 8000

    model_config = {"env_file": ".env", "env_prefix": ""}


def get_settings() -> Settings:
    return Settings()
  • Step 7: 创建 tests/test_config.py
import os
from mqtt_home.config import Settings


def test_default_settings():
    s = Settings()
    assert s.mqtt_host == "localhost"
    assert s.mqtt_port == 1883
    assert s.web_port == 8000


def test_settings_from_env(monkeypatch):
    monkeypatch.setenv("MQTT_HOST", "192.168.1.1")
    monkeypatch.setenv("MQTT_PORT", "1884")
    s = Settings()
    assert s.mqtt_host == "192.168.1.1"
    assert s.mqtt_port == 1884
  • Step 8: 安装依赖并运行测试

Run: pip install -e ".[dev]" (在 D:\home\mqtt 目录下) Run: pytest tests/test_config.py -v Expected: 2 passed

  • Step 9: 初始化 git 仓库并提交

Run:

cd D:\home\mqtt
git init
git add -A
git commit -m "feat: project skeleton with config, dependencies, and test setup"

Task 2: 数据库与 ORM 模型

Files:

  • Create: src/mqtt_home/database.py

  • Create: src/mqtt_home/models.py

  • Create: tests/test_models.py

  • Step 1: 创建 src/mqtt_home/database.py

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, async_session
from sqlalchemy.orm import DeclarativeBase
from mqtt_home.config import get_settings


class Base(DeclarativeBase):
    pass


def get_engine():
    settings = get_settings()
    return create_async_engine(
        settings.database_url,
        echo=False,
    )


def get_session_factory(engine=None):
    _engine = engine or get_engine()
    return async_sessionmaker(_engine, expire_on_commit=False)


async def init_db():
    engine = get_engine()
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    await engine.dispose()


async def get_db() -> async_session:
    factory = get_session_factory()
    async with factory() as session:
        yield session
  • Step 2: 创建 src/mqtt_home/models.py
import uuid
from datetime import datetime, timezone

from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, ForeignKey
from sqlalchemy.orm import relationship

from mqtt_home.database import Base


def _utcnow():
    return datetime.now(timezone.utc)


def _new_id():
    return str(uuid.uuid4())


class Device(Base):
    __tablename__ = "devices"

    id = Column(String(36), primary_key=True, default=_new_id)
    name = Column(String(200), nullable=False)
    type = Column(String(50), nullable=False, default="switch")
    protocol = Column(String(20), nullable=False, default="custom")
    mqtt_topic = Column(String(500), nullable=False)
    command_topic = Column(String(500), nullable=True)
    discovery_topic = Column(String(500), nullable=True)
    discovery_payload = Column(Text, nullable=True)
    attributes = Column(Text, nullable=True, default="{}")
    state = Column(String(500), nullable=True, default=None)
    is_online = Column(Boolean, nullable=False, default=False)
    last_seen = Column(DateTime(timezone=True), nullable=True)
    created_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow)
    updated_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow, onupdate=_utcnow)

    logs = relationship("DeviceLog", back_populates="device", lazy="dynamic")


class DeviceLog(Base):
    __tablename__ = "device_logs"

    id = Column(Integer, primary_key=True, autoincrement=True)
    device_id = Column(String(36), ForeignKey("devices.id", ondelete="CASCADE"), nullable=False)
    direction = Column(String(10), nullable=False)  # "rx" or "tx"
    topic = Column(String(500), nullable=False)
    payload = Column(Text, nullable=False)
    timestamp = Column(DateTime(timezone=True), nullable=False, default=_utcnow)

    device = relationship("Device", back_populates="logs")
  • Step 3: 创建 tests/test_models.py
import pytest
from mqtt_home.models import Device, DeviceLog


async def test_create_device(db_session):
    device = Device(
        name="客厅灯",
        type="light",
        protocol="custom",
        mqtt_topic="home/living/light",
        command_topic="home/living/light/set",
    )
    db_session.add(device)
    await db_session.commit()
    await db_session.refresh(device)

    assert device.id is not None
    assert len(device.id) == 36
    assert device.name == "客厅灯"
    assert device.state is None
    assert device.is_online is False


async def test_create_device_log(db_session):
    device = Device(
        name="温度传感器",
        type="sensor",
        protocol="custom",
        mqtt_topic="home/temperature",
    )
    db_session.add(device)
    await db_session.flush()

    log = DeviceLog(
        device_id=device.id,
        direction="rx",
        topic="home/temperature",
        payload='{"temperature": 25.5}',
    )
    db_session.add(log)
    await db_session.commit()

    fetched_device = await db_session.get(Device, device.id)
    assert fetched_device.logs.count() == 1
  • Step 4: 运行测试

Run: pytest tests/test_models.py -v Expected: 2 passed

  • Step 5: 提交
git add -A
git commit -m "feat: database setup with SQLAlchemy ORM models for Device and DeviceLog"

Task 3: Pydantic Schemas

Files:

  • Create: src/mqtt_home/schemas.py

  • Step 1: 创建 src/mqtt_home/schemas.py

from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field


class DeviceCreate(BaseModel):
    name: str
    type: str = "switch"
    protocol: str = "custom"
    mqtt_topic: str
    command_topic: Optional[str] = None


class DeviceUpdate(BaseModel):
    name: Optional[str] = None
    type: Optional[str] = None
    command_topic: Optional[str] = None


class DeviceCommand(BaseModel):
    payload: str


class DeviceResponse(BaseModel):
    id: str
    name: str
    type: str
    protocol: str
    mqtt_topic: str
    command_topic: Optional[str] = None
    state: Optional[str] = None
    is_online: bool
    last_seen: Optional[datetime] = None
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


class DeviceLogResponse(BaseModel):
    id: int
    device_id: str
    direction: str
    topic: str
    payload: str
    timestamp: datetime

    model_config = {"from_attributes": True}


class BrokerClient(BaseModel):
    clientid: str
    username: Optional[str] = None
    ip_address: Optional[str] = None
    connected: bool
    keepalive: int
    proto_ver: Optional[int] = None
    clean_start: Optional[bool] = None


class BrokerTopic(BaseModel):
    topic: str
    node: Optional[str] = None


class DashboardStats(BaseModel):
    total_devices: int
    online_devices: int
    offline_devices: int
    recent_logs: list[DeviceLogResponse]
  • Step 2: 提交
git add -A
git commit -m "feat: Pydantic schemas for API request/response models"

Task 4: EMQX REST API 客户端

Files:

  • Create: src/mqtt_home/emqx_api.py

  • Create: tests/test_emqx_api.py

  • Step 1: 创建 src/mqtt_home/emqx_api.py

import httpx
from typing import Any

from mqtt_home.config import Settings


class EmqxApiClient:
    def __init__(self, settings: Settings):
        self._client = httpx.AsyncClient(
            base_url=settings.emqx_api_url,
            auth=(settings.emqx_api_key, settings.emqx_api_secret),
            timeout=10.0,
        )

    async def close(self):
        await self._client.aclose()

    async def get_broker_status(self) -> dict[str, Any]:
        resp = await self._client.get("/status")
        resp.raise_for_status()
        return resp.json().get("data", {})

    async def get_metrics(self) -> dict[str, Any]:
        resp = await self._client.get("/metrics")
        resp.raise_for_status()
        return resp.json().get("data", {})

    async def get_clients(self, limit: int = 100) -> list[dict[str, Any]]:
        resp = await self._client.get("/clients", params={"limit": limit})
        resp.raise_for_status()
        return resp.json().get("data", [])

    async def get_subscriptions(self, limit: int = 100) -> list[dict[str, Any]]:
        resp = await self._client.get("/subscriptions", params={"limit": limit})
        resp.raise_for_status()
        return resp.json().get("data", [])

    async def get_topics(self, limit: int = 100) -> list[dict[str, Any]]:
        resp = await self._client.get("/topics", params={"limit": limit})
        resp.raise_for_status()
        return resp.json().get("data", [])
  • Step 2: 创建 tests/test_emqx_api.py
import pytest
import httpx
from unittest.mock import AsyncMock, patch

from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.config import Settings


@pytest.fixture
def settings():
    return Settings(
        mqtt_host="localhost",
        emqx_api_url="http://localhost:18083/api/v5",
        emqx_api_key="test-key",
        emqx_api_secret="test-secret",
        database_url="sqlite+aiosqlite:///:memory:",
    )


@pytest.fixture
def emqx_client(settings):
    return EmqxApiClient(settings)


async def test_get_broker_status(emqx_client):
    with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
        mock_get.return_value = httpx.Response(
            200, json={"data": {"uptime": 12345, "version": "5.0.0"}}
        )
        result = await emqx_client.get_broker_status()
        assert result["uptime"] == 12345


async def test_get_clients(emqx_client):
    with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
        mock_get.return_value = httpx.Response(
            200, json={"data": [{"clientid": "dev1", "connected": True}]}
        )
        result = await emqx_client.get_clients()
        assert len(result) == 1
        assert result[0]["clientid"] == "dev1"


async def test_get_topics(emqx_client):
    with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
        mock_get.return_value = httpx.Response(
            200, json={"data": [{"topic": "home/light"}]}
        )
        result = await emqx_client.get_topics()
        assert len(result) == 1
  • Step 3: 运行测试

Run: pytest tests/test_emqx_api.py -v Expected: 3 passed

  • Step 4: 提交
git add -A
git commit -m "feat: EMQX REST API client for broker status, clients, and topics"

Task 5: MQTT 客户端

Files:

  • Create: src/mqtt_home/mqtt_client.py

  • Create: tests/test_mqtt_client.py

  • Step 1: 创建 src/mqtt_home/mqtt_client.py

import asyncio
import uuid
import logging
from typing import Callable, Awaitable

import aiomqtt

from mqtt_home.config import Settings

logger = logging.getLogger(__name__)

MessageCallback = Callable[[str, str], Awaitable[None]]


class MqttClient:
    def __init__(self, settings: Settings):
        self._settings = settings
        self._client: aiomqtt.Client | None = None
        self._callbacks: dict[str, list[MessageCallback]] = {}
        self._connected = False
        self._task: asyncio.Task | None = None
        self._client_id = f"mqtt-home-{uuid.uuid4().hex[:8]}"

    @property
    def is_connected(self) -> bool:
        return self._connected

    def on_message(self, topic_filter: str, callback: MessageCallback):
        self._callbacks.setdefault(topic_filter, []).append(callback)

    async def start(self):
        self._task = asyncio.create_task(self._run())

    async def stop(self):
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass

    async def publish(self, topic: str, payload: str, qos: int = 1, retain: bool = False):
        if not self._client:
            raise RuntimeError("MQTT client not connected")
        await self._client.publish(topic, payload, qos=qos, retain=retain)

    async def subscribe(self, topic: str, qos: int = 1):
        if not self._client:
            raise RuntimeError("MQTT client not connected")
        await self._client.subscribe(topic, qos=qos)

    async def _run(self):
        retry_delay = 1
        max_delay = 60
        while True:
            try:
                async with aiomqtt.Client(
                    hostname=self._settings.mqtt_host,
                    port=self._settings.mqtt_port,
                    username=self._settings.mqtt_username or None,
                    password=self._settings.mqtt_password or None,
                    identifier=self._client_id,
                    clean_session=True,
                ) as client:
                    self._client = client
                    self._connected = True
                    logger.info("MQTT connected to %s:%d", self._settings.mqtt_host, self._settings.mqtt_port)
                    retry_delay = 1

                    # 重新订阅所有已注册的主题
                    for topic_filter in self._callbacks:
                        await client.subscribe(topic_filter, qos=1)

                    async for message in client.messages:
                        topic = str(message.topic)
                        payload = message.payload.decode("utf-8", errors="replace")
                        await self._dispatch(topic, payload)

            except asyncio.CancelledError:
                self._connected = False
                raise
            except Exception as e:
                self._connected = False
                logger.warning("MQTT connection failed: %s, retrying in %ds", e, retry_delay)
                await asyncio.sleep(retry_delay)
                retry_delay = min(retry_delay * 2, max_delay)

    async def _dispatch(self, topic: str, payload: str):
        matched = False
        for topic_filter, callbacks in self._callbacks.items():
            if self._topic_matches(topic, topic_filter):
                matched = True
                for cb in callbacks:
                    try:
                        await cb(topic, payload)
                    except Exception as e:
                        logger.error("MQTT callback error for %s: %s", topic, e)
        if not matched:
            logger.debug("No callback for topic: %s", topic)

    @staticmethod
    def _topic_matches(topic: str, pattern: str) -> bool:
        """简单 MQTT 通配符匹配(支持 + 和 #"""
        topic_parts = topic.split("/")
        pattern_parts = pattern.split("/")
        pi = 0
        for ti, tp in enumerate(pattern_parts):
            if tp == "#":
                return True
            if ti >= len(topic_parts):
                return False
            if tp != "+" and tp != topic_parts[ti]:
                return False
            pi = ti
        return len(topic_parts) == len(pattern_parts)
  • Step 2: 创建 tests/test_mqtt_client.py
import pytest
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.config import Settings


@pytest.fixture
def mqtt_client():
    settings = Settings(
        mqtt_host="localhost",
        mqtt_port=1883,
        emqx_api_url="http://localhost:18083/api/v5",
        emqx_api_key="test-key",
        emqx_api_secret="test-secret",
        database_url="sqlite+aiosqlite:///:memory:",
    )
    return MqttClient(settings)


def test_topic_matches_exact(mqtt_client):
    assert MqttClient._topic_matches("home/light", "home/light") is True
    assert MqttClient._topic_matches("home/light", "home/switch") is False


def test_topic_matches_single_level_wildcard(mqtt_client):
    assert MqttClient._topic_matches("home/light/status", "home/+/status") is True
    assert MqttClient._topic_matches("home/switch/status", "home/+/status") is True
    assert MqttClient._topic_matches("home/light", "home/+") is True
    assert MqttClient._topic_matches("home/light/extra", "home/+") is False


def test_topic_matches_multi_level_wildcard(mqtt_client):
    assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/#") is True
    assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/light/#") is True
    assert Mqtt_client._topic_matches("other/topic", "homeassistant/#") is False


def test_register_callback(mqtt_client):
    async def cb(topic, payload):
        pass

    mqtt_client.on_message("home/#", cb)
    assert "home/#" in mqtt_client._callbacks
    assert len(mqtt_client._callbacks["home/#"]) == 1


def test_publish_not_connected(mqtt_client):
    with pytest.raises(RuntimeError, match="not connected"):
        import asyncio
        asyncio.get_event_loop().run_until_complete(
            mqtt_client.publish("test", "payload")
        )
  • Step 3: 运行测试

Run: pytest tests/test_mqtt_client.py -v Expected: 5 passed

  • Step 4: 提交
git add -A
git commit -m "feat: MQTT client with reconnection, topic matching, and callback dispatch"

Task 6: 设备注册表(核心业务逻辑)

Files:

  • Create: src/mqtt_home/device_registry.py

  • Create: tests/test_device_registry.py

  • Step 1: 创建 src/mqtt_home/device_registry.py

import json
import logging
from datetime import datetime, timezone
from typing import Optional

from sqlalchemy import select, func, desc
from sqlalchemy.ext.asyncio import AsyncSession

from mqtt_home.models import Device, DeviceLog
from mqtt_home.schemas import DeviceCreate, DeviceUpdate

logger = logging.getLogger(__name__)

MAX_LOGS_PER_DEVICE = 100


async def list_devices(db: AsyncSession) -> list[Device]:
    result = await db.execute(select(Device).order_by(Device.created_at.desc()))
    return list(result.scalars().all())


async def get_device(db: AsyncSession, device_id: str) -> Optional[Device]:
    return await db.get(Device, device_id)


async def create_device(db: AsyncSession, data: DeviceCreate) -> Device:
    device = Device(
        name=data.name,
        type=data.type,
        protocol=data.protocol,
        mqtt_topic=data.mqtt_topic,
        command_topic=data.command_topic,
    )
    db.add(device)
    await db.commit()
    await db.refresh(device)
    logger.info("Device created: %s (%s)", device.name, device.id)
    return device


async def update_device(db: AsyncSession, device_id: str, data: DeviceUpdate) -> Optional[Device]:
    device = await db.get(Device, device_id)
    if not device:
        return None
    update_data = data.model_dump(exclude_unset=True)
    for key, value in update_data.items():
        setattr(device, key, value)
    await db.commit()
    await db.refresh(device)
    return device


async def delete_device(db: AsyncSession, device_id: str) -> bool:
    device = await db.get(Device, device_id)
    if not device:
        return False
    await db.delete(device)
    await db.commit()
    logger.info("Device deleted: %s", device_id)
    return True


async def send_command(db: AsyncSession, device_id: str, payload: str, publish_fn=None) -> Optional[DeviceLog]:
    device = await db.get(Device, device_id)
    if not device:
        return None
    if not device.command_topic:
        raise ValueError(f"Device {device_id} has no command_topic configured")

    log = DeviceLog(
        device_id=device_id,
        direction="tx",
        topic=device.command_topic,
        payload=payload,
    )
    db.add(log)

    if publish_fn:
        await publish_fn(device.command_topic, payload)

    await db.commit()
    await db.refresh(log)
    logger.info("Command sent to %s: %s", device.command_topic, payload)
    return log


async def handle_state_update(db: AsyncSession, topic: str, payload: str) -> Optional[Device]:
    """处理从 MQTT 收到的状态更新"""
    result = await db.execute(select(Device).where(Device.mqtt_topic == topic))
    device = result.scalar_one_or_none()
    if not device:
        return None

    old_state = device.state
    device.state = payload
    device.last_seen = datetime.now(timezone.utc)
    device.is_online = True

    log = DeviceLog(
        device_id=device.id,
        direction="rx",
        topic=topic,
        payload=payload,
    )
    db.add(log)

    # 清理旧日志,保留最近 MAX_LOGS_PER_DEVICE 条
    count_result = await db.execute(
        select(func.count()).select_from(DeviceLog).where(DeviceLog.device_id == device.id)
    )
    count = count_result.scalar() or 0
    if count >= MAX_LOGS_PER_DEVICE:
        oldest = await db.execute(
            select(DeviceLog).where(DeviceLog.device_id == device.id)
            .order_by(DeviceLog.timestamp.asc())
            .limit(count - MAX_LOGS_PER_DEVICE + 1)
        )
        for old_log in oldest.scalars().all():
            await db.delete(old_log)

    await db.commit()
    await db.refresh(device)
    return device


async def get_device_logs(db: AsyncSession, device_id: str, limit: int = 20) -> list[DeviceLog]:
    result = await db.execute(
        select(DeviceLog)
        .where(DeviceLog.device_id == device_id)
        .order_by(desc(DeviceLog.timestamp))
        .limit(limit)
    )
    return list(result.scalars().all())


async def update_device_online_status(db: AsyncSession, topic: str, payload: str) -> Optional[Device]:
    """处理 HA availability 主题消息"""
    result = await db.execute(select(Device).where(Device.mqtt_topic == topic))
    device = result.scalar_one_or_none()
    if not device:
        return None
    device.is_online = payload.lower() == "online"
    device.last_seen = datetime.now(timezone.utc)
    await db.commit()
    await db.refresh(device)
    return device


async def get_dashboard_stats(db: AsyncSession) -> dict:
    total = await db.execute(select(func.count()).select_from(Device))
    online = await db.execute(select(func.count()).select_from(Device).where(Device.is_online == True))
    recent_logs_result = await db.execute(
        select(DeviceLog).order_by(desc(DeviceLog.timestamp)).limit(10)
    )
    return {
        "total_devices": total.scalar() or 0,
        "online_devices": online.scalar() or 0,
        "offline_devices": (total.scalar() or 0) - (online.scalar() or 0),
        "recent_logs": list(recent_logs_result.scalars().all()),
    }
  • Step 2: 创建 tests/test_device_registry.py
import pytest
from mqtt_home.device_registry import (
    create_device, get_device, list_devices, delete_device,
    update_device, send_command, handle_state_update, get_device_logs,
    get_dashboard_stats,
)
from mqtt_home.schemas import DeviceCreate, DeviceUpdate


async def test_create_and_get_device(db_session):
    data = DeviceCreate(name="客厅灯", type="light", mqtt_topic="home/light")
    device = await create_device(db_session, data)
    assert device.name == "客厅灯"

    fetched = await get_device(db_session, device.id)
    assert fetched is not None
    assert fetched.id == device.id


async def test_list_devices(db_session):
    await create_device(db_session, DeviceCreate(name="设备1", type="switch", mqtt_topic="t1"))
    await create_device(db_session, DeviceCreate(name="设备2", type="sensor", mqtt_topic="t2"))
    devices = await list_devices(db_session)
    assert len(devices) == 2


async def test_update_device(db_session):
    device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t"))
    updated = await update_device(db_session, device.id, DeviceUpdate(name="新名字"))
    assert updated.name == "新名字"


async def test_delete_device(db_session):
    device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t"))
    assert await delete_device(db_session, device.id) is True
    assert await get_device(db_session, device.id) is None


async def test_delete_nonexistent_device(db_session):
    assert await delete_device(db_session, "nonexistent") is False


async def test_send_command(db_session):
    device = await create_device(
        db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t", command_topic="t/set")
    )
    published = {}

    async def mock_publish(topic, payload):
        published[topic] = payload

    log = await send_command(db_session, device.id, '{"state":"on"}', mock_publish)
    assert log is not None
    assert log.direction == "tx"
    assert published["t/set"] == '{"state":"on"}'


async def test_send_command_no_command_topic(db_session):
    device = await create_device(db_session, DeviceCreate(name="传感器", type="sensor", mqtt_topic="t"))
    with pytest.raises(ValueError, match="no command_topic"):
        await send_command(db_session, device.id, '{"value":1}')


async def test_handle_state_update(db_session):
    device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="home/light"))
    updated = await handle_state_update(db_session, "home/light", '{"state":"on"}')
    assert updated is not None
    assert updated.state == '{"state":"on"}'
    assert updated.is_online is True


async def test_handle_state_update_unknown_topic(db_session):
    result = await handle_state_update(db_session, "unknown/topic", "on")
    assert result is None


async def test_get_device_logs(db_session):
    device = await create_device(
        db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t", command_topic="t/set")
    )
    await send_command(db_session, device.id, '{"state":"on"}', lambda t, p: None)
    await send_command(db_session, device.id, '{"state":"off"}', lambda t, p: None)

    logs = await get_device_logs(db_session, device.id, limit=10)
    assert len(logs) == 2


async def test_dashboard_stats(db_session):
    await create_device(db_session, DeviceCreate(name="在线设备", type="switch", mqtt_topic="t1"))
    device2 = await create_device(db_session, DeviceCreate(name="离线设备", type="sensor", mqtt_topic="t2"))
    await handle_state_update(db_session, "t1", "online")

    stats = await get_dashboard_stats(db_session)
    assert stats["total_devices"] == 2
    assert stats["online_devices"] == 1
    assert stats["offline_devices"] == 1
  • Step 3: 运行测试

Run: pytest tests/test_device_registry.py -v Expected: 11 passed

  • Step 4: 提交
git add -A
git commit -m "feat: device registry with CRUD, state tracking, command sending, and log management"

Task 7: HA Discovery 协议处理器

Files:

  • Create: src/mqtt_home/discovery.py

  • Create: tests/test_discovery.py

  • Step 1: 创建 src/mqtt_home/discovery.py

import json
import logging
from sqlalchemy.ext.asyncio import AsyncSession

from mqtt_home.models import Device
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.device_registry import create_device, handle_state_update

logger = logging.getLogger(__name__)

# HA Discovery 配置主题格式: homeassistant/<component>/<node_id>/config
DISCOVERY_TOPIC_PREFIX = "homeassistant/"


def parse_discovery_topic(topic: str) -> dict[str, str] | None:
    """解析 HA Discovery 主题,返回 component 和 node_id"""
    parts = topic.split("/")
    if len(parts) < 4 or parts[0] != DISCOVERY_TOPIC_PREFIX.strip("/"):
        return None
    if parts[-1] != "config":
        return None
    return {
        "component": parts[1],
        "node_id": "/".join(parts[2:-1]),
    }


async def handle_discovery_message(
    topic: str, payload: str, db: AsyncSession, mqtt_client: MqttClient
) -> Device | None:
    """处理 HA Discovery config 消息,自动注册设备并订阅状态主题"""
    parsed = parse_discovery_topic(topic)
    if not parsed:
        return None

    try:
        config = json.loads(payload)
    except json.JSONDecodeError:
        logger.warning("Invalid JSON in discovery payload for %s", topic)
        return None

    state_topic = config.get("state_topic")
    command_topic = config.get("command_topic")
    device_name = config.get("name", config.get("device", {}).get("name", parsed["node_id"]))
    device_class = config.get("device_class", "")

    if not state_topic:
        logger.warning("Discovery config for %s has no state_topic", topic)
        return None

    # 检查是否已存在(按 discovery_topic 去重)
    from sqlalchemy import select
    result = await db.execute(
        select(Device).where(Device.discovery_topic == topic)
    )
    existing = result.scalar_one_or_none()
    if existing:
        logger.debug("Device already registered: %s", topic)
        return existing

    from mqtt_home.schemas import DeviceCreate
    device = await create_device(db, DeviceCreate(
        name=device_name,
        type=parsed["component"],
        protocol="ha_discovery",
        mqtt_topic=state_topic,
        command_topic=command_topic,
    ))

    # 更新 discovery 相关字段
    device.discovery_topic = topic
    device.discovery_payload = payload
    device.attributes = json.dumps({
        "device_class": device_class,
        "unit_of_measurement": config.get("unit_of_measurement"),
        "icon": config.get("icon"),
    })
    await db.commit()
    await db.refresh(device)

    # 订阅状态主题
    await mqtt_client.subscribe(state_topic, qos=1)
    logger.info("HA Discovery device registered: %s -> %s", device.name, state_topic)

    return device
  • Step 2: 创建 tests/test_discovery.py
import json
import pytest
from unittest.mock import AsyncMock

from mqtt_home.discovery import parse_discovery_topic, handle_discovery_message
from mqtt_home.config import Settings
from mqtt_home.mqtt_client import MqttClient


@pytest.fixture
def settings():
    return Settings(
        mqtt_host="localhost",
        emqx_api_url="http://localhost:18083/api/v5",
        emqx_api_key="test-key",
        emqx_api_secret="test-secret",
        database_url="sqlite+aiosqlite:///:memory:",
    )


def test_parse_discovery_topic_valid():
    result = parse_discovery_topic("homeassistant/light/abc123/config")
    assert result is not None
    assert result["component"] == "light"
    assert result["node_id"] == "abc123"


def test_parse_discovery_topic_nested_node():
    result = parse_discovery_topic("homeassistant/sensor/room/temperature/config")
    assert result is not None
    assert result["component"] == "sensor"
    assert result["node_id"] == "room/temperature"


def test_parse_discovery_topic_invalid():
    assert parse_discovery_topic("other/topic/config") is None
    assert parse_discovery_topic("homeassistant/light/abc") is None
    assert parse_discovery_topic("homeassistant/light/abc/status") is None


async def test_handle_discovery_creates_device(db_session, settings):
    mqtt_client = MqttClient(settings)
    mqtt_client.subscribe = AsyncMock()

    payload = json.dumps({
        "name": "客厅灯",
        "state_topic": "home/living/light/status",
        "command_topic": "home/living/light/set",
        "device_class": "light",
    })

    device = await handle_discovery_message(
        "homeassistant/light/living_room/config",
        payload,
        db_session,
        mqtt_client,
    )
    assert device is not None
    assert device.name == "客厅灯"
    assert device.protocol == "ha_discovery"
    assert device.mqtt_topic == "home/living/light/status"
    mqtt_client.subscribe.assert_called_once_with("home/living/light/status", qos=1)


async def test_handle_discovery_duplicate(db_session, settings):
    mqtt_client = MqttClient(settings)
    mqtt_client.subscribe = AsyncMock()

    payload = json.dumps({
        "name": "灯",
        "state_topic": "home/light",
    })

    await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client)
    device2 = await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client)
    # 同一个 discovery_topic 不应该创建重复设备
    assert device2 is not None
    mqtt_client.subscribe.assert_called_once()  # 只调用一次
  • Step 3: 运行测试

Run: pytest tests/test_discovery.py -v Expected: 5 passed

  • Step 4: 提交
git add -A
git commit -m "feat: HA Discovery protocol handler for auto-registering devices"

Task 8: FastAPI 应用与 REST API 路由

Files:

  • Create: src/mqtt_home/main.py

  • Create: src/mqtt_home/api/__init__.py

  • Create: src/mqtt_home/api/devices.py

  • Create: src/mqtt_home/api/broker.py

  • Create: src/mqtt_home/api/dashboard.py

  • Create: tests/test_api_devices.py

  • Create: tests/test_api_broker.py

  • Step 1: 创建 src/mqtt_home/api/init.py

from fastapi import APIRouter
from mqtt_home.api.devices import router as devices_router
from mqtt_home.api.broker import router as broker_router
from mqtt_home.api.dashboard import router as dashboard_router

api_router = APIRouter(prefix="/api")
api_router.include_router(devices_router, prefix="/devices", tags=["devices"])
api_router.include_router(broker_router, prefix="/broker", tags=["broker"])
api_router.include_router(dashboard_router, prefix="/dashboard", tags=["dashboard"])
  • Step 2: 创建 src/mqtt_home/api/devices.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

from mqtt_home.database import get_db
from mqtt_home.device_registry import (
    list_devices, get_device, create_device, update_device,
    delete_device, send_command, get_device_logs,
)
from mqtt_home.schemas import (
    DeviceCreate, DeviceUpdate, DeviceCommand,
    DeviceResponse, DeviceLogResponse,
)

router = APIRouter()


@router.get("", response_model=list[DeviceResponse])
async def get_devices(db: AsyncSession = Depends(get_db)):
    return await list_devices(db)


@router.post("", response_model=DeviceResponse, status_code=201)
async def add_device(data: DeviceCreate, db: AsyncSession = Depends(get_db)):
    return await create_device(db, data)


@router.get("/{device_id}", response_model=DeviceResponse)
async def get_device_detail(device_id: str, db: AsyncSession = Depends(get_db)):
    device = await get_device(db, device_id)
    if not device:
        raise HTTPException(status_code=404, detail="Device not found")
    return device


@router.put("/{device_id}", response_model=DeviceResponse)
async def patch_device(device_id: str, data: DeviceUpdate, db: AsyncSession = Depends(get_db)):
    device = await update_device(db, device_id, data)
    if not device:
        raise HTTPException(status_code=404, detail="Device not found")
    return device


@router.delete("/{device_id}", status_code=204)
async def remove_device(device_id: str, db: AsyncSession = Depends(get_db)):
    if not await delete_device(db, device_id):
        raise HTTPException(status_code=404, detail="Device not found")


@router.post("/{device_id}/command", response_model=DeviceLogResponse)
async def command_device(
    device_id: str, data: DeviceCommand,
    db: AsyncSession = Depends(get_db),
):
    # 从应用状态获取 mqtt_client 和 publish_fn
    from mqtt_home.main import app
    mqtt_client = getattr(app.state, "mqtt_client", None)
    if not mqtt_client or not mqtt_client.is_connected:
        raise HTTPException(status_code=503, detail="MQTT not connected")

    try:
        log = await send_command(
            db, device_id, data.payload,
            publish_fn=mqtt_client.publish,
        )
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    if not log:
        raise HTTPException(status_code=404, detail="Device not found")
    return log


@router.get("/{device_id}/logs", response_model=list[DeviceLogResponse])
async def read_device_logs(device_id: str, limit: int = 20, db: AsyncSession = Depends(get_db)):
    return await get_device_logs(db, device_id, limit)
  • Step 3: 创建 src/mqtt_home/api/broker.py
from fastapi import APIRouter, Depends, HTTPException

from mqtt_home.schemas import BrokerClient, BrokerTopic

router = APIRouter()


@router.get("/status")
async def broker_status():
    from mqtt_home.main import app
    emqx = getattr(app.state, "emqx_client", None)
    if not emqx:
        raise HTTPException(status_code=503, detail="EMQX API client not configured")
    try:
        status = await emqx.get_broker_status()
        metrics = await emqx.get_metrics()
        return {"status": status, "metrics": metrics}
    except Exception as e:
        raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")


@router.get("/clients", response_model=list[BrokerClient])
async def broker_clients(limit: int = 100):
    from mqtt_home.main import app
    emqx = getattr(app.state, "emqx_client", None)
    if not emqx:
        raise HTTPException(status_code=503, detail="EMQX API client not configured")
    try:
        return await emqx.get_clients(limit)
    except Exception as e:
        raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")


@router.get("/topics", response_model=list[BrokerTopic])
async def broker_topics(limit: int = 100):
    from mqtt_home.main import app
    emqx = getattr(app.state, "emqx_client", None)
    if not emqx:
        raise HTTPException(status_code=503, detail="EMQX API client not configured")
    try:
        return await emqx.get_topics(limit)
    except Exception as e:
        raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
  • Step 4: 创建 src/mqtt_home/api/dashboard.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from mqtt_home.database import get_db
from mqtt_home.device_registry import get_dashboard_stats
from mqtt_home.schemas import DashboardStats

router = APIRouter()


@router.get("", response_model=DashboardStats)
async def dashboard(db: AsyncSession = Depends(get_db)):
    return await get_dashboard_stats(db)
  • Step 5: 创建 src/mqtt_home/main.py
import asyncio
import logging
from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from mqtt_home.config import get_settings
from mqtt_home.database import init_db, get_session_factory, Base
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.discovery import handle_discovery_message
from mqtt_home.device_registry import handle_state_update
from mqtt_home.api import api_router

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger(__name__)


@asynccontextmanager
async def lifespan(app: FastAPI):
    settings = get_settings()

    # 初始化数据库
    await init_db()
    logger.info("Database initialized")

    # 初始化 EMQX API 客户端
    emqx = EmqxApiClient(settings)
    app.state.emqx_client = emqx
    logger.info("EMQX API client initialized")

    # 初始化 MQTT 客户端
    mqtt = MqttClient(settings)
    app.state.mqtt_client = mqtt

    session_factory = get_session_factory()

    # 注册 HA Discovery 回调
    async def on_discovery(topic: str, payload: str):
        async with session_factory() as db:
            await handle_discovery_message(topic, payload, db, mqtt)

    # 注册状态更新回调
    async def on_state(topic: str, payload: str):
        async with session_factory() as db:
            device = await handle_state_update(db, topic, payload)
            # WebSocket 推送会在前端 Task 中添加

    mqtt.on_message("homeassistant/#", on_discovery)
    mqtt.on_message("home/#", on_state)

    await mqtt.start()
    logger.info("MQTT client started")

    yield

    await mqtt.stop()
    await emqx.close()
    logger.info("Shutdown complete")


app = FastAPI(title="MQTT Home", version="0.1.0", lifespan=lifespan)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

app.include_router(api_router)


@app.get("/health")
async def health():
    mqtt = getattr(app.state, "mqtt_client", None)
    return {
        "status": "ok",
        "mqtt_connected": mqtt.is_connected if mqtt else False,
    }
  • Step 6: 创建 tests/test_api_devices.py
import pytest
from httpx import AsyncClient, ASGITransport

from mqtt_home.main import app
from mqtt_home.database import Base, get_engine, get_session_factory
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.pool import StaticPool


TEST_DB = "sqlite+aiosqlite:///:memory:"


@pytest.fixture
async def client():
    engine = create_async_engine(TEST_DB, connect_args={"check_same_thread": False}, poolclass=StaticPool)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    test_factory = async_sessionmaker(engine, expire_on_commit=False)

    async def override_get_db():
        async with test_factory() as session:
            yield session

    app.dependency_overrides[get_db.__wrapped__ if hasattr(get_db, '__wrapped__') else get_db] = override_get_db

    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac

    app.dependency_overrides.clear()
    await engine.dispose()


async def test_get_devices_empty(client):
    resp = await client.get("/api/devices")
    assert resp.status_code == 200
    assert resp.json() == []


async def test_create_device(client):
    resp = await client.post("/api/devices", json={
        "name": "客厅灯",
        "type": "light",
        "mqtt_topic": "home/light",
        "command_topic": "home/light/set",
    })
    assert resp.status_code == 201
    data = resp.json()
    assert data["name"] == "客厅灯"
    assert data["type"] == "light"


async def test_get_device_detail(client):
    create_resp = await client.post("/api/devices", json={
        "name": "灯", "type": "switch", "mqtt_topic": "t"
    })
    device_id = create_resp.json()["id"]

    resp = await client.get(f"/api/devices/{device_id}")
    assert resp.status_code == 200
    assert resp.json()["name"] == "灯"


async def test_get_device_not_found(client):
    resp = await client.get("/api/devices/nonexistent")
    assert resp.status_code == 404


async def test_update_device(client):
    create_resp = await client.post("/api/devices", json={
        "name": "灯", "type": "switch", "mqtt_topic": "t"
    })
    device_id = create_resp.json()["id"]

    resp = await client.put(f"/api/devices/{device_id}", json={"name": "新名字"})
    assert resp.status_code == 200
    assert resp.json()["name"] == "新名字"


async def test_delete_device(client):
    create_resp = await client.post("/api/devices", json={
        "name": "灯", "type": "switch", "mqtt_topic": "t"
    })
    device_id = create_resp.json()["id"]

    resp = await client.delete(f"/api/devices/{device_id}")
    assert resp.status_code == 204


async def test_delete_device_not_found(client):
    resp = await client.delete("/api/devices/nonexistent")
    assert resp.status_code == 404
  • Step 7: 创建 tests/test_api_broker.py
import pytest
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock, patch

from mqtt_home.main import app


@pytest.fixture
async def client():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac


async def test_broker_status_no_client(client):
    # 没有 mqtt_client 时应返回 503
    resp = await client.get("/api/broker/status")
    assert resp.status_code == 503


async def test_broker_clients_no_client(client):
    resp = await client.get("/api/broker/clients")
    assert resp.status_code == 503


async def test_health_endpoint(client):
    resp = await client.get("/health")
    assert resp.status_code == 200
    data = resp.json()
    assert "status" in data
    assert "mqtt_connected" in data
  • Step 8: 运行所有测试

Run: pytest tests/ -v Expected: All tests pass

  • Step 9: 提交
git add -A
git commit -m "feat: FastAPI app with device, broker, dashboard REST API routes"

Task 9: WebSocket 实时推送

Files:

  • Modify: src/mqtt_home/main.py

  • Create: src/mqtt_home/ws.py

  • Step 1: 创建 src/mqtt_home/ws.py

import asyncio
import json
import logging
from fastapi import WebSocket, WebSocketDisconnect

logger = logging.getLogger(__name__)


class ConnectionManager:
    def __init__(self):
        self._connections: list[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self._connections.append(ws)
        logger.info("WebSocket connected, total: %d", len(self._connections))

    def disconnect(self, ws: WebSocket):
        self._connections.remove(ws)
        logger.info("WebSocket disconnected, total: %d", len(self._connections))

    async def broadcast(self, message: dict):
        dead = []
        for ws in self._connections:
            try:
                await ws.send_json(message)
            except Exception:
                dead.append(ws)
        for ws in dead:
            self.disconnect(ws)


ws_manager = ConnectionManager()


async def websocket_endpoint(ws: WebSocket):
    await ws_manager.connect(ws)
    try:
        while True:
            await ws.receive_text()  # 保持连接,忽略客户端消息
    except WebSocketDisconnect:
        ws_manager.disconnect(ws)


async def broadcast_device_update(device_id: str, data: dict):
    await ws_manager.broadcast({
        "type": "device_update",
        "device_id": device_id,
        **data,
    })
  • Step 2: 修改 src/mqtt_home/main.py添加 WebSocket 路由和广播

from mqtt_home.api import api_router 后面添加:

from mqtt_home.ws import websocket_endpoint, broadcast_device_update

app.include_router(api_router) 后面添加:

from mqtt_home.ws import ws_manager

app.websocket("/ws/devices")(websocket_endpoint)

修改 on_state 回调,添加 WebSocket 广播:

    async def on_state(topic: str, payload: str):
        async with session_factory() as db:
            device = await handle_state_update(db, topic, payload)
            if device:
                await broadcast_device_update(device.id, {
                    "state": device.state,
                    "is_online": device.is_online,
                    "last_seen": device.last_seen.isoformat() if device.last_seen else None,
                })
  • Step 3: 提交
git add -A
git commit -m "feat: WebSocket endpoint for real-time device state updates"

Task 10: CLI 命令

Files:

  • Create: src/mqtt_home/cli.py

  • Create: tests/test_cli.py

  • Step 1: 创建 src/mqtt_home/cli.py

import asyncio
import json
import click

from mqtt_home.config import get_settings
from mqtt_home.database import init_db, get_session_factory
from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.device_registry import (
    list_devices, get_device, create_device, delete_device,
    send_command, get_device_logs,
)
from mqtt_home.schemas import DeviceCreate
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.discovery import handle_discovery_message, DISCOVERY_TOPIC_PREFIX
from mqtt_home.device_registry import handle_state_update


def run_async(coro):
    return asyncio.get_event_loop().run_until_complete(coro)


@click.group()
def cli():
    """MQTT 智能家居管理工具"""
    pass


@cli.group()
def device():
    """设备管理"""
    pass


@device.command("list")
def device_list():
    """列出所有设备"""
    async def _run():
        settings = get_settings()
        await init_db()
        factory = get_session_factory()
        async with factory() as db:
            devices = await list_devices(db)
            if not devices:
                click.echo("暂无设备")
                return
            for d in devices:
                status = "🟢" if d.is_online else "🔴"
                state = d.state or "-"
                click.echo(f"{status} [{d.id[:8]}] {d.name} ({d.type}) state={state}")

    run_async(_run())


@device.command("add")
@click.option("--name", required=True, help="设备名称")
@click.option("--type", "device_type", default="switch", help="设备类型")
@click.option("--state-topic", required=True, help="状态主题")
@click.option("--command-topic", default=None, help="命令主题")
def device_add(name, device_type, state_topic, command_topic):
    """手动添加设备"""
    async def _run():
        settings = get_settings()
        await init_db()
        factory = get_session_factory()
        async with factory() as db:
            d = await create_device(db, DeviceCreate(
                name=name,
                type=device_type,
                mqtt_topic=state_topic,
                command_topic=command_topic,
            ))
            click.echo(f"设备已创建: {d.id} - {d.name}")

    run_async(_run())


@device.command("info")
@click.argument("device_id")
def device_info(device_id):
    """查看设备详情"""
    async def _run():
        settings = get_settings()
        await init_db()
        factory = get_session_factory()
        async with factory() as db:
            d = await get_device(db, device_id)
            if not d:
                click.echo(f"设备不存在: {device_id}", err=True)
                return
            click.echo(f"ID: {d.id}")
            click.echo(f"名称: {d.name}")
            click.echo(f"类型: {d.type}")
            click.echo(f"协议: {d.protocol}")
            click.echo(f"状态主题: {d.mqtt_topic}")
            click.echo(f"命令主题: {d.command_topic or '无'}")
            click.echo(f"当前状态: {d.state or '无'}")
            click.echo(f"在线: {'是' if d.is_online else '否'}")
            click.echo(f"最后活跃: {d.last_seen or '从未'}")

    run_async(_run())


@device.command("remove")
@click.argument("device_id")
def device_remove(device_id):
    """删除设备"""
    async def _run():
        settings = get_settings()
        await init_db()
        factory = get_session_factory()
        async with factory() as db:
            if await delete_device(db, device_id):
                click.echo(f"设备已删除: {device_id}")
            else:
                click.echo(f"设备不存在: {device_id}", err=True)

    run_async(_run())


@device.command("command")
@click.argument("device_id")
@click.option("--payload", required=True, help="命令内容JSON")
def device_command(device_id, payload):
    """向设备发送命令"""
    async def _run():
        settings = get_settings()
        await init_db()
        factory = get_session_factory()

        # 连接 MQTT
        mqtt = MqttClient(settings)
        await mqtt._run.__wrapped__(mqtt) if False else None  # 不启动消息循环

        # 直接使用 aiomqtt 发送
        import aiomqtt
        async with aiomqtt.Client(
            hostname=settings.mqtt_host,
            port=settings.mqtt_port,
            username=settings.mqtt_username or None,
            password=settings.mqtt_password or None,
        ) as client:
            async with factory() as db:
                try:
                    log = await send_command(
                        db, device_id, payload,
                        publish_fn=lambda topic, p: client.publish(topic, p, qos=1),
                    )
                    if log:
                        click.echo(f"命令已发送到 {log.topic}: {log.payload}")
                    else:
                        click.echo(f"设备不存在: {device_id}", err=True)
                except ValueError as e:
                    click.echo(str(e), err=True)

    run_async(_run())


@device.command("logs")
@click.argument("device_id")
@click.option("--limit", default=20, help="日志条数")
def device_logs(device_id, limit):
    """查看设备消息日志"""
    async def _run():
        settings = get_settings()
        await init_db()
        factory = get_session_factory()
        async with factory() as db:
            logs = await get_device_logs(db, device_id, limit)
            if not logs:
                click.echo("暂无日志")
                return
            for log in logs:
                direction = "RX" if log.direction == "rx" else "TX"
                click.echo(f"[{log.timestamp}] {direction} {log.topic} | {log.payload}")

    run_async(_run())


@cli.group()
def broker():
    """Broker 管理"""
    pass


@broker.command("status")
def broker_status():
    """查看 Broker 状态"""
    async def _run():
        settings = get_settings()
        emqx = EmqxApiClient(settings)
        try:
            status = await emqx.get_broker_status()
            click.echo(f"版本: {status.get('version', 'unknown')}")
            click.echo(f"运行时间: {status.get('uptime', 0)}s")
            metrics = await emqx.get_metrics()
            click.echo(f"连接数: {metrics.get('connections.count', 0)}")
            click.echo(f"订阅数: {metrics.get('subscriptions.count', 0)}")
            click.echo(f"主题数: {metrics.get('topics.count', 0)}")
        except Exception as e:
            click.echo(f"连接 EMQX 失败: {e}", err=True)
        finally:
            await emqx.close()

    run_async(_run())


@broker.command("clients")
def broker_clients():
    """列出已连接客户端"""
    async def _run():
        settings = get_settings()
        emqx = EmqxApiClient(settings)
        try:
            clients = await emqx.get_clients()
            if not clients:
                click.echo("暂无客户端连接")
                return
            for c in clients:
                status = "在线" if c.get("connected") else "离线"
                click.echo(f"[{c.get('clientid')}] {status} ip={c.get('ip_address')}")
        except Exception as e:
            click.echo(f"连接 EMQX 失败: {e}", err=True)
        finally:
            await emqx.close()

    run_async(_run())


@broker.command("topics")
def broker_topics():
    """列出活跃主题"""
    async def _run():
        settings = get_settings()
        emqx = EmqxApiClient(settings)
        try:
            topics = await emqx.get_topics()
            if not topics:
                click.echo("暂无活跃主题")
                return
            for t in topics:
                click.echo(t.get("topic", ""))
        except Exception as e:
            click.echo(f"连接 EMQX 失败: {e}", err=True)
        finally:
            await emqx.close()

    run_async(_run())
  • Step 2: 创建 tests/test_cli.py
import pytest
from click.testing import CliRunner
from mqtt_home.cli import cli


def test_device_list_empty(monkeypatch):
    """测试无设备时的 list 命令"""
    runner = CliRunner()

    async def mock_init():
        pass

    async def mock_list(db):
        return []

    monkeypatch.setattr("mqtt_home.cli.init_db", mock_init)
    monkeypatch.setattr("mqtt_home.cli.list_devices", mock_list)

    result = runner.invoke(cli, ["device", "list"])
    assert result.exit_code == 0
    assert "暂无设备" in result.output


def test_cli_groups():
    runner = CliRunner()
    result = runner.invoke(cli, ["--help"])
    assert "device" in result.output
    assert "broker" in result.output
  • Step 3: 运行测试

Run: pytest tests/test_cli.py -v Expected: 2 passed

  • Step 4: 提交
git add -A
git commit -m "feat: CLI commands for device management and broker monitoring"

Task 11: 创建 .env 并验证端到端

Files:

  • Create: .env

  • Step 1: 创建 .env从 .env.example 复制并填入实际值)

MQTT_HOST=192.168.0.31
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
EMQX_API_URL=http://192.168.0.31:18083/api/v5
EMQX_API_KEY=ed74cf28e117c5f6
EMQX_API_SECRET=fED31TaybOUXx9CtkabICmT0wEZuUPXSU0vPwIekSbbC
DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db
WEB_HOST=0.0.0.0
WEB_PORT=8000
  • Step 2: 启动服务验证

Run: python -m mqtt_home serve Expected: 服务启动成功,日志显示 "MQTT connected" 和 "Database initialized"

  • Step 3: 用 CLI 验证 broker 连接

Run: mqtt-home broker status Expected: 显示 EMQX 版本和统计信息

  • Step 4: 用 API 验证

Run: curl http://localhost:8000/health Expected: {"status":"ok","mqtt_connected":true}

Run: curl http://localhost:8000/api/broker/clients Expected: 客户端列表 JSON

  • Step 5: 提交(.env 已在 .gitignore 中)
git add -A
git commit -m "feat: verify end-to-end backend with EMQX broker connection"

Task 12: 全量测试确认

  • Step 1: 运行所有测试

Run: pytest tests/ -v Expected: 所有测试通过

  • Step 2: 修复任何失败的测试

根据实际输出修复问题。

  • Step 3: 最终提交
git add -A
git commit -m "test: ensure all backend tests pass"

自检结果

设计文档章节 对应 Task
项目结构 Task 1
数据模型 (devices, device_logs) Task 2
Pydantic Schemas Task 3
EMQX REST API 封装 Task 4
MQTT 客户端 + 重连 + 通配符 Task 5
设备注册表 (CRUD + 状态 + 命令) Task 6
HA Discovery 协议 Task 7
REST API 路由 (devices, broker, dashboard) Task 8
WebSocket 实时推送 Task 9
CLI 命令 Task 10
配置 / .env Task 11