Files
HomeOS/docs/superpowers/plans/2026-03-29-mqtt-home-backend.md

2190 lines
62 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# MQTT 智能家居管理系统 - 后端实现计划
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 实现后端核心功能,包括 FastAPI REST API、MQTT 客户端、EMQX API 封装、CLI 命令、SQLite 数据库。
**Architecture:** 单体 FastAPI 应用,通过 aiomqtt 连接 EMQX Broker使用 SQLite + SQLAlchemy 存储设备数据。CLI 和 REST API 共享 device_registry 业务逻辑层。
**Tech Stack:** Python 3.11+, FastAPI, uvicorn, aiomqtt, SQLAlchemy 2.0 + aiosqlite, pydantic-settings, click, httpx
---
## 文件结构总览
```
mqtt-home/
├── pyproject.toml
├── .env.example
├── .gitignore
├── src/
│ └── mqtt_home/
│ ├── __init__.py
│ ├── config.py # Settings (pydantic-settings)
│ ├── database.py # SQLite engine, async session
│ ├── models.py # SQLAlchemy ORM (Device, DeviceLog)
│ ├── schemas.py # Pydantic schemas (request/response)
│ ├── emqx_api.py # EMQX REST API client (httpx)
│ ├── mqtt_client.py # MQTT connection lifecycle (aiomqtt)
│ ├── discovery.py # HA Discovery protocol handler
│ ├── device_registry.py # Business logic (CRUD + control)
│ ├── main.py # FastAPI app + lifespan
│ ├── cli.py # click CLI commands
│ └── api/
│ ├── __init__.py # Router aggregation
│ ├── devices.py # Device REST routes
│ ├── broker.py # Broker REST routes
│ └── dashboard.py # Dashboard REST routes
└── tests/
├── conftest.py
├── test_config.py
├── test_models.py
├── test_device_registry.py
├── test_emqx_api.py
├── test_discovery.py
├── test_api_devices.py
├── test_api_broker.py
└── test_cli.py
```
---
### Task 1: 项目骨架与配置
**Files:**
- Create: `pyproject.toml`
- Create: `.env.example`
- Create: `.gitignore`
- Create: `src/mqtt_home/__init__.py`
- Create: `src/mqtt_home/config.py`
- Create: `tests/conftest.py`
- Create: `tests/test_config.py`
- [ ] **Step 1: 创建 pyproject.toml**
```toml
[build-system]
requires = ["setuptools>=68.0", "wheel"]
build-backend = "setuptools.backends._legacy:_Backend"
[project]
name = "mqtt-home"
version = "0.1.0"
description = "轻量级智能家居 MQTT 设备管理系统"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.115.0",
"uvicorn[standard]>=0.30.0",
"aiomqtt>=2.0.0",
"sqlalchemy[asyncio]>=2.0.0",
"aiosqlite>=0.20.0",
"pydantic-settings>=2.0.0",
"click>=8.1.0",
"httpx>=0.27.0",
"websockets>=12.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"pytest-httpx>=0.30.0",
"httpx>=0.27.0",
]
[project.scripts]
mqtt-home = "mqtt_home.cli:cli"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
```
- [ ] **Step 2: 创建 .env.example**
```
MQTT_HOST=192.168.0.31
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
EMQX_API_URL=http://192.168.0.31:18083/api/v5
EMQX_API_KEY=your-api-key
EMQX_API_SECRET=your-secret
DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db
WEB_HOST=0.0.0.0
WEB_PORT=8000
```
- [ ] **Step 3: 创建 .gitignore**
```
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.venv/
.env
data/
*.db
frontend/node_modules/
frontend/dist/
```
- [ ] **Step 4: 创建 src/mqtt_home/__init__.py**
```python
```
- [ ] **Step 5: 创建 tests/conftest.py**
```python
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import StaticPool
from mqtt_home.database import Base
from mqtt_home.config import Settings
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
@pytest.fixture
def settings():
return Settings(
mqtt_host="localhost",
mqtt_port=1883,
emqx_api_url="http://localhost:18083/api/v5",
emqx_api_key="test-key",
emqx_api_secret="test-secret",
database_url=TEST_DATABASE_URL,
)
@pytest.fixture
async def engine():
engine = create_async_engine(
TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def db_session(engine):
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
yield session
```
- [ ] **Step 6: 创建 src/mqtt_home/config.py**
```python
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
mqtt_host: str = "localhost"
mqtt_port: int = 1883
mqtt_username: str = ""
mqtt_password: str = ""
emqx_api_url: str = "http://localhost:18083/api/v5"
emqx_api_key: str = ""
emqx_api_secret: str = ""
database_url: str = "sqlite+aiosqlite:///./data/mqtt_home.db"
web_host: str = "0.0.0.0"
web_port: int = 8000
model_config = {"env_file": ".env", "env_prefix": ""}
def get_settings() -> Settings:
return Settings()
```
- [ ] **Step 7: 创建 tests/test_config.py**
```python
import os
from mqtt_home.config import Settings
def test_default_settings():
s = Settings()
assert s.mqtt_host == "localhost"
assert s.mqtt_port == 1883
assert s.web_port == 8000
def test_settings_from_env(monkeypatch):
monkeypatch.setenv("MQTT_HOST", "192.168.1.1")
monkeypatch.setenv("MQTT_PORT", "1884")
s = Settings()
assert s.mqtt_host == "192.168.1.1"
assert s.mqtt_port == 1884
```
- [ ] **Step 8: 安装依赖并运行测试**
Run: `pip install -e ".[dev]"` (在 `D:\home\mqtt` 目录下)
Run: `pytest tests/test_config.py -v`
Expected: 2 passed
- [ ] **Step 9: 初始化 git 仓库并提交**
Run:
```bash
cd D:\home\mqtt
git init
git add -A
git commit -m "feat: project skeleton with config, dependencies, and test setup"
```
---
### Task 2: 数据库与 ORM 模型
**Files:**
- Create: `src/mqtt_home/database.py`
- Create: `src/mqtt_home/models.py`
- Create: `tests/test_models.py`
- [ ] **Step 1: 创建 src/mqtt_home/database.py**
```python
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, async_session
from sqlalchemy.orm import DeclarativeBase
from mqtt_home.config import get_settings
class Base(DeclarativeBase):
pass
def get_engine():
settings = get_settings()
return create_async_engine(
settings.database_url,
echo=False,
)
def get_session_factory(engine=None):
_engine = engine or get_engine()
return async_sessionmaker(_engine, expire_on_commit=False)
async def init_db():
engine = get_engine()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await engine.dispose()
async def get_db() -> async_session:
factory = get_session_factory()
async with factory() as session:
yield session
```
- [ ] **Step 2: 创建 src/mqtt_home/models.py**
```python
import uuid
from datetime import datetime, timezone
from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, ForeignKey
from sqlalchemy.orm import relationship
from mqtt_home.database import Base
def _utcnow():
return datetime.now(timezone.utc)
def _new_id():
return str(uuid.uuid4())
class Device(Base):
__tablename__ = "devices"
id = Column(String(36), primary_key=True, default=_new_id)
name = Column(String(200), nullable=False)
type = Column(String(50), nullable=False, default="switch")
protocol = Column(String(20), nullable=False, default="custom")
mqtt_topic = Column(String(500), nullable=False)
command_topic = Column(String(500), nullable=True)
discovery_topic = Column(String(500), nullable=True)
discovery_payload = Column(Text, nullable=True)
attributes = Column(Text, nullable=True, default="{}")
state = Column(String(500), nullable=True, default=None)
is_online = Column(Boolean, nullable=False, default=False)
last_seen = Column(DateTime(timezone=True), nullable=True)
created_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow)
updated_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow, onupdate=_utcnow)
logs = relationship("DeviceLog", back_populates="device", lazy="dynamic")
class DeviceLog(Base):
__tablename__ = "device_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
device_id = Column(String(36), ForeignKey("devices.id", ondelete="CASCADE"), nullable=False)
direction = Column(String(10), nullable=False) # "rx" or "tx"
topic = Column(String(500), nullable=False)
payload = Column(Text, nullable=False)
timestamp = Column(DateTime(timezone=True), nullable=False, default=_utcnow)
device = relationship("Device", back_populates="logs")
```
- [ ] **Step 3: 创建 tests/test_models.py**
```python
import pytest
from mqtt_home.models import Device, DeviceLog
async def test_create_device(db_session):
device = Device(
name="客厅灯",
type="light",
protocol="custom",
mqtt_topic="home/living/light",
command_topic="home/living/light/set",
)
db_session.add(device)
await db_session.commit()
await db_session.refresh(device)
assert device.id is not None
assert len(device.id) == 36
assert device.name == "客厅灯"
assert device.state is None
assert device.is_online is False
async def test_create_device_log(db_session):
device = Device(
name="温度传感器",
type="sensor",
protocol="custom",
mqtt_topic="home/temperature",
)
db_session.add(device)
await db_session.flush()
log = DeviceLog(
device_id=device.id,
direction="rx",
topic="home/temperature",
payload='{"temperature": 25.5}',
)
db_session.add(log)
await db_session.commit()
fetched_device = await db_session.get(Device, device.id)
assert fetched_device.logs.count() == 1
```
- [ ] **Step 4: 运行测试**
Run: `pytest tests/test_models.py -v`
Expected: 2 passed
- [ ] **Step 5: 提交**
```bash
git add -A
git commit -m "feat: database setup with SQLAlchemy ORM models for Device and DeviceLog"
```
---
### Task 3: Pydantic Schemas
**Files:**
- Create: `src/mqtt_home/schemas.py`
- [ ] **Step 1: 创建 src/mqtt_home/schemas.py**
```python
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
class DeviceCreate(BaseModel):
name: str
type: str = "switch"
protocol: str = "custom"
mqtt_topic: str
command_topic: Optional[str] = None
class DeviceUpdate(BaseModel):
name: Optional[str] = None
type: Optional[str] = None
command_topic: Optional[str] = None
class DeviceCommand(BaseModel):
payload: str
class DeviceResponse(BaseModel):
id: str
name: str
type: str
protocol: str
mqtt_topic: str
command_topic: Optional[str] = None
state: Optional[str] = None
is_online: bool
last_seen: Optional[datetime] = None
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
class DeviceLogResponse(BaseModel):
id: int
device_id: str
direction: str
topic: str
payload: str
timestamp: datetime
model_config = {"from_attributes": True}
class BrokerClient(BaseModel):
clientid: str
username: Optional[str] = None
ip_address: Optional[str] = None
connected: bool
keepalive: int
proto_ver: Optional[int] = None
clean_start: Optional[bool] = None
class BrokerTopic(BaseModel):
topic: str
node: Optional[str] = None
class DashboardStats(BaseModel):
total_devices: int
online_devices: int
offline_devices: int
recent_logs: list[DeviceLogResponse]
```
- [ ] **Step 2: 提交**
```bash
git add -A
git commit -m "feat: Pydantic schemas for API request/response models"
```
---
### Task 4: EMQX REST API 客户端
**Files:**
- Create: `src/mqtt_home/emqx_api.py`
- Create: `tests/test_emqx_api.py`
- [ ] **Step 1: 创建 src/mqtt_home/emqx_api.py**
```python
import httpx
from typing import Any
from mqtt_home.config import Settings
class EmqxApiClient:
def __init__(self, settings: Settings):
self._client = httpx.AsyncClient(
base_url=settings.emqx_api_url,
auth=(settings.emqx_api_key, settings.emqx_api_secret),
timeout=10.0,
)
async def close(self):
await self._client.aclose()
async def get_broker_status(self) -> dict[str, Any]:
resp = await self._client.get("/status")
resp.raise_for_status()
return resp.json().get("data", {})
async def get_metrics(self) -> dict[str, Any]:
resp = await self._client.get("/metrics")
resp.raise_for_status()
return resp.json().get("data", {})
async def get_clients(self, limit: int = 100) -> list[dict[str, Any]]:
resp = await self._client.get("/clients", params={"limit": limit})
resp.raise_for_status()
return resp.json().get("data", [])
async def get_subscriptions(self, limit: int = 100) -> list[dict[str, Any]]:
resp = await self._client.get("/subscriptions", params={"limit": limit})
resp.raise_for_status()
return resp.json().get("data", [])
async def get_topics(self, limit: int = 100) -> list[dict[str, Any]]:
resp = await self._client.get("/topics", params={"limit": limit})
resp.raise_for_status()
return resp.json().get("data", [])
```
- [ ] **Step 2: 创建 tests/test_emqx_api.py**
```python
import pytest
import httpx
from unittest.mock import AsyncMock, patch
from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.config import Settings
@pytest.fixture
def settings():
return Settings(
mqtt_host="localhost",
emqx_api_url="http://localhost:18083/api/v5",
emqx_api_key="test-key",
emqx_api_secret="test-secret",
database_url="sqlite+aiosqlite:///:memory:",
)
@pytest.fixture
def emqx_client(settings):
return EmqxApiClient(settings)
async def test_get_broker_status(emqx_client):
with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
mock_get.return_value = httpx.Response(
200, json={"data": {"uptime": 12345, "version": "5.0.0"}}
)
result = await emqx_client.get_broker_status()
assert result["uptime"] == 12345
async def test_get_clients(emqx_client):
with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
mock_get.return_value = httpx.Response(
200, json={"data": [{"clientid": "dev1", "connected": True}]}
)
result = await emqx_client.get_clients()
assert len(result) == 1
assert result[0]["clientid"] == "dev1"
async def test_get_topics(emqx_client):
with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
mock_get.return_value = httpx.Response(
200, json={"data": [{"topic": "home/light"}]}
)
result = await emqx_client.get_topics()
assert len(result) == 1
```
- [ ] **Step 3: 运行测试**
Run: `pytest tests/test_emqx_api.py -v`
Expected: 3 passed
- [ ] **Step 4: 提交**
```bash
git add -A
git commit -m "feat: EMQX REST API client for broker status, clients, and topics"
```
---
### Task 5: MQTT 客户端
**Files:**
- Create: `src/mqtt_home/mqtt_client.py`
- Create: `tests/test_mqtt_client.py`
- [ ] **Step 1: 创建 src/mqtt_home/mqtt_client.py**
```python
import asyncio
import uuid
import logging
from typing import Callable, Awaitable
import aiomqtt
from mqtt_home.config import Settings
logger = logging.getLogger(__name__)
MessageCallback = Callable[[str, str], Awaitable[None]]
class MqttClient:
def __init__(self, settings: Settings):
self._settings = settings
self._client: aiomqtt.Client | None = None
self._callbacks: dict[str, list[MessageCallback]] = {}
self._connected = False
self._task: asyncio.Task | None = None
self._client_id = f"mqtt-home-{uuid.uuid4().hex[:8]}"
@property
def is_connected(self) -> bool:
return self._connected
def on_message(self, topic_filter: str, callback: MessageCallback):
self._callbacks.setdefault(topic_filter, []).append(callback)
async def start(self):
self._task = asyncio.create_task(self._run())
async def stop(self):
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def publish(self, topic: str, payload: str, qos: int = 1, retain: bool = False):
if not self._client:
raise RuntimeError("MQTT client not connected")
await self._client.publish(topic, payload, qos=qos, retain=retain)
async def subscribe(self, topic: str, qos: int = 1):
if not self._client:
raise RuntimeError("MQTT client not connected")
await self._client.subscribe(topic, qos=qos)
async def _run(self):
retry_delay = 1
max_delay = 60
while True:
try:
async with aiomqtt.Client(
hostname=self._settings.mqtt_host,
port=self._settings.mqtt_port,
username=self._settings.mqtt_username or None,
password=self._settings.mqtt_password or None,
identifier=self._client_id,
clean_session=True,
) as client:
self._client = client
self._connected = True
logger.info("MQTT connected to %s:%d", self._settings.mqtt_host, self._settings.mqtt_port)
retry_delay = 1
# 重新订阅所有已注册的主题
for topic_filter in self._callbacks:
await client.subscribe(topic_filter, qos=1)
async for message in client.messages:
topic = str(message.topic)
payload = message.payload.decode("utf-8", errors="replace")
await self._dispatch(topic, payload)
except asyncio.CancelledError:
self._connected = False
raise
except Exception as e:
self._connected = False
logger.warning("MQTT connection failed: %s, retrying in %ds", e, retry_delay)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, max_delay)
async def _dispatch(self, topic: str, payload: str):
matched = False
for topic_filter, callbacks in self._callbacks.items():
if self._topic_matches(topic, topic_filter):
matched = True
for cb in callbacks:
try:
await cb(topic, payload)
except Exception as e:
logger.error("MQTT callback error for %s: %s", topic, e)
if not matched:
logger.debug("No callback for topic: %s", topic)
@staticmethod
def _topic_matches(topic: str, pattern: str) -> bool:
"""简单 MQTT 通配符匹配(支持 + 和 #"""
topic_parts = topic.split("/")
pattern_parts = pattern.split("/")
pi = 0
for ti, tp in enumerate(pattern_parts):
if tp == "#":
return True
if ti >= len(topic_parts):
return False
if tp != "+" and tp != topic_parts[ti]:
return False
pi = ti
return len(topic_parts) == len(pattern_parts)
```
- [ ] **Step 2: 创建 tests/test_mqtt_client.py**
```python
import pytest
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.config import Settings
@pytest.fixture
def mqtt_client():
settings = Settings(
mqtt_host="localhost",
mqtt_port=1883,
emqx_api_url="http://localhost:18083/api/v5",
emqx_api_key="test-key",
emqx_api_secret="test-secret",
database_url="sqlite+aiosqlite:///:memory:",
)
return MqttClient(settings)
def test_topic_matches_exact(mqtt_client):
assert MqttClient._topic_matches("home/light", "home/light") is True
assert MqttClient._topic_matches("home/light", "home/switch") is False
def test_topic_matches_single_level_wildcard(mqtt_client):
assert MqttClient._topic_matches("home/light/status", "home/+/status") is True
assert MqttClient._topic_matches("home/switch/status", "home/+/status") is True
assert MqttClient._topic_matches("home/light", "home/+") is True
assert MqttClient._topic_matches("home/light/extra", "home/+") is False
def test_topic_matches_multi_level_wildcard(mqtt_client):
assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/#") is True
assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/light/#") is True
assert Mqtt_client._topic_matches("other/topic", "homeassistant/#") is False
def test_register_callback(mqtt_client):
async def cb(topic, payload):
pass
mqtt_client.on_message("home/#", cb)
assert "home/#" in mqtt_client._callbacks
assert len(mqtt_client._callbacks["home/#"]) == 1
def test_publish_not_connected(mqtt_client):
with pytest.raises(RuntimeError, match="not connected"):
import asyncio
asyncio.get_event_loop().run_until_complete(
mqtt_client.publish("test", "payload")
)
```
- [ ] **Step 3: 运行测试**
Run: `pytest tests/test_mqtt_client.py -v`
Expected: 5 passed
- [ ] **Step 4: 提交**
```bash
git add -A
git commit -m "feat: MQTT client with reconnection, topic matching, and callback dispatch"
```
---
### Task 6: 设备注册表(核心业务逻辑)
**Files:**
- Create: `src/mqtt_home/device_registry.py`
- Create: `tests/test_device_registry.py`
- [ ] **Step 1: 创建 src/mqtt_home/device_registry.py**
```python
import json
import logging
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select, func, desc
from sqlalchemy.ext.asyncio import AsyncSession
from mqtt_home.models import Device, DeviceLog
from mqtt_home.schemas import DeviceCreate, DeviceUpdate
logger = logging.getLogger(__name__)
MAX_LOGS_PER_DEVICE = 100
async def list_devices(db: AsyncSession) -> list[Device]:
result = await db.execute(select(Device).order_by(Device.created_at.desc()))
return list(result.scalars().all())
async def get_device(db: AsyncSession, device_id: str) -> Optional[Device]:
return await db.get(Device, device_id)
async def create_device(db: AsyncSession, data: DeviceCreate) -> Device:
device = Device(
name=data.name,
type=data.type,
protocol=data.protocol,
mqtt_topic=data.mqtt_topic,
command_topic=data.command_topic,
)
db.add(device)
await db.commit()
await db.refresh(device)
logger.info("Device created: %s (%s)", device.name, device.id)
return device
async def update_device(db: AsyncSession, device_id: str, data: DeviceUpdate) -> Optional[Device]:
device = await db.get(Device, device_id)
if not device:
return None
update_data = data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(device, key, value)
await db.commit()
await db.refresh(device)
return device
async def delete_device(db: AsyncSession, device_id: str) -> bool:
device = await db.get(Device, device_id)
if not device:
return False
await db.delete(device)
await db.commit()
logger.info("Device deleted: %s", device_id)
return True
async def send_command(db: AsyncSession, device_id: str, payload: str, publish_fn=None) -> Optional[DeviceLog]:
device = await db.get(Device, device_id)
if not device:
return None
if not device.command_topic:
raise ValueError(f"Device {device_id} has no command_topic configured")
log = DeviceLog(
device_id=device_id,
direction="tx",
topic=device.command_topic,
payload=payload,
)
db.add(log)
if publish_fn:
await publish_fn(device.command_topic, payload)
await db.commit()
await db.refresh(log)
logger.info("Command sent to %s: %s", device.command_topic, payload)
return log
async def handle_state_update(db: AsyncSession, topic: str, payload: str) -> Optional[Device]:
"""处理从 MQTT 收到的状态更新"""
result = await db.execute(select(Device).where(Device.mqtt_topic == topic))
device = result.scalar_one_or_none()
if not device:
return None
old_state = device.state
device.state = payload
device.last_seen = datetime.now(timezone.utc)
device.is_online = True
log = DeviceLog(
device_id=device.id,
direction="rx",
topic=topic,
payload=payload,
)
db.add(log)
# 清理旧日志,保留最近 MAX_LOGS_PER_DEVICE 条
count_result = await db.execute(
select(func.count()).select_from(DeviceLog).where(DeviceLog.device_id == device.id)
)
count = count_result.scalar() or 0
if count >= MAX_LOGS_PER_DEVICE:
oldest = await db.execute(
select(DeviceLog).where(DeviceLog.device_id == device.id)
.order_by(DeviceLog.timestamp.asc())
.limit(count - MAX_LOGS_PER_DEVICE + 1)
)
for old_log in oldest.scalars().all():
await db.delete(old_log)
await db.commit()
await db.refresh(device)
return device
async def get_device_logs(db: AsyncSession, device_id: str, limit: int = 20) -> list[DeviceLog]:
result = await db.execute(
select(DeviceLog)
.where(DeviceLog.device_id == device_id)
.order_by(desc(DeviceLog.timestamp))
.limit(limit)
)
return list(result.scalars().all())
async def update_device_online_status(db: AsyncSession, topic: str, payload: str) -> Optional[Device]:
"""处理 HA availability 主题消息"""
result = await db.execute(select(Device).where(Device.mqtt_topic == topic))
device = result.scalar_one_or_none()
if not device:
return None
device.is_online = payload.lower() == "online"
device.last_seen = datetime.now(timezone.utc)
await db.commit()
await db.refresh(device)
return device
async def get_dashboard_stats(db: AsyncSession) -> dict:
total = await db.execute(select(func.count()).select_from(Device))
online = await db.execute(select(func.count()).select_from(Device).where(Device.is_online == True))
recent_logs_result = await db.execute(
select(DeviceLog).order_by(desc(DeviceLog.timestamp)).limit(10)
)
return {
"total_devices": total.scalar() or 0,
"online_devices": online.scalar() or 0,
"offline_devices": (total.scalar() or 0) - (online.scalar() or 0),
"recent_logs": list(recent_logs_result.scalars().all()),
}
```
- [ ] **Step 2: 创建 tests/test_device_registry.py**
```python
import pytest
from mqtt_home.device_registry import (
create_device, get_device, list_devices, delete_device,
update_device, send_command, handle_state_update, get_device_logs,
get_dashboard_stats,
)
from mqtt_home.schemas import DeviceCreate, DeviceUpdate
async def test_create_and_get_device(db_session):
data = DeviceCreate(name="客厅灯", type="light", mqtt_topic="home/light")
device = await create_device(db_session, data)
assert device.name == "客厅灯"
fetched = await get_device(db_session, device.id)
assert fetched is not None
assert fetched.id == device.id
async def test_list_devices(db_session):
await create_device(db_session, DeviceCreate(name="设备1", type="switch", mqtt_topic="t1"))
await create_device(db_session, DeviceCreate(name="设备2", type="sensor", mqtt_topic="t2"))
devices = await list_devices(db_session)
assert len(devices) == 2
async def test_update_device(db_session):
device = await create_device(db_session, DeviceCreate(name="", type="light", mqtt_topic="t"))
updated = await update_device(db_session, device.id, DeviceUpdate(name="新名字"))
assert updated.name == "新名字"
async def test_delete_device(db_session):
device = await create_device(db_session, DeviceCreate(name="", type="light", mqtt_topic="t"))
assert await delete_device(db_session, device.id) is True
assert await get_device(db_session, device.id) is None
async def test_delete_nonexistent_device(db_session):
assert await delete_device(db_session, "nonexistent") is False
async def test_send_command(db_session):
device = await create_device(
db_session, DeviceCreate(name="", type="light", mqtt_topic="t", command_topic="t/set")
)
published = {}
async def mock_publish(topic, payload):
published[topic] = payload
log = await send_command(db_session, device.id, '{"state":"on"}', mock_publish)
assert log is not None
assert log.direction == "tx"
assert published["t/set"] == '{"state":"on"}'
async def test_send_command_no_command_topic(db_session):
device = await create_device(db_session, DeviceCreate(name="传感器", type="sensor", mqtt_topic="t"))
with pytest.raises(ValueError, match="no command_topic"):
await send_command(db_session, device.id, '{"value":1}')
async def test_handle_state_update(db_session):
device = await create_device(db_session, DeviceCreate(name="", type="light", mqtt_topic="home/light"))
updated = await handle_state_update(db_session, "home/light", '{"state":"on"}')
assert updated is not None
assert updated.state == '{"state":"on"}'
assert updated.is_online is True
async def test_handle_state_update_unknown_topic(db_session):
result = await handle_state_update(db_session, "unknown/topic", "on")
assert result is None
async def test_get_device_logs(db_session):
device = await create_device(
db_session, DeviceCreate(name="", type="light", mqtt_topic="t", command_topic="t/set")
)
await send_command(db_session, device.id, '{"state":"on"}', lambda t, p: None)
await send_command(db_session, device.id, '{"state":"off"}', lambda t, p: None)
logs = await get_device_logs(db_session, device.id, limit=10)
assert len(logs) == 2
async def test_dashboard_stats(db_session):
await create_device(db_session, DeviceCreate(name="在线设备", type="switch", mqtt_topic="t1"))
device2 = await create_device(db_session, DeviceCreate(name="离线设备", type="sensor", mqtt_topic="t2"))
await handle_state_update(db_session, "t1", "online")
stats = await get_dashboard_stats(db_session)
assert stats["total_devices"] == 2
assert stats["online_devices"] == 1
assert stats["offline_devices"] == 1
```
- [ ] **Step 3: 运行测试**
Run: `pytest tests/test_device_registry.py -v`
Expected: 11 passed
- [ ] **Step 4: 提交**
```bash
git add -A
git commit -m "feat: device registry with CRUD, state tracking, command sending, and log management"
```
---
### Task 7: HA Discovery 协议处理器
**Files:**
- Create: `src/mqtt_home/discovery.py`
- Create: `tests/test_discovery.py`
- [ ] **Step 1: 创建 src/mqtt_home/discovery.py**
```python
import json
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from mqtt_home.models import Device
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.device_registry import create_device, handle_state_update
logger = logging.getLogger(__name__)
# HA Discovery 配置主题格式: homeassistant/<component>/<node_id>/config
DISCOVERY_TOPIC_PREFIX = "homeassistant/"
def parse_discovery_topic(topic: str) -> dict[str, str] | None:
"""解析 HA Discovery 主题,返回 component 和 node_id"""
parts = topic.split("/")
if len(parts) < 4 or parts[0] != DISCOVERY_TOPIC_PREFIX.strip("/"):
return None
if parts[-1] != "config":
return None
return {
"component": parts[1],
"node_id": "/".join(parts[2:-1]),
}
async def handle_discovery_message(
topic: str, payload: str, db: AsyncSession, mqtt_client: MqttClient
) -> Device | None:
"""处理 HA Discovery config 消息,自动注册设备并订阅状态主题"""
parsed = parse_discovery_topic(topic)
if not parsed:
return None
try:
config = json.loads(payload)
except json.JSONDecodeError:
logger.warning("Invalid JSON in discovery payload for %s", topic)
return None
state_topic = config.get("state_topic")
command_topic = config.get("command_topic")
device_name = config.get("name", config.get("device", {}).get("name", parsed["node_id"]))
device_class = config.get("device_class", "")
if not state_topic:
logger.warning("Discovery config for %s has no state_topic", topic)
return None
# 检查是否已存在(按 discovery_topic 去重)
from sqlalchemy import select
result = await db.execute(
select(Device).where(Device.discovery_topic == topic)
)
existing = result.scalar_one_or_none()
if existing:
logger.debug("Device already registered: %s", topic)
return existing
from mqtt_home.schemas import DeviceCreate
device = await create_device(db, DeviceCreate(
name=device_name,
type=parsed["component"],
protocol="ha_discovery",
mqtt_topic=state_topic,
command_topic=command_topic,
))
# 更新 discovery 相关字段
device.discovery_topic = topic
device.discovery_payload = payload
device.attributes = json.dumps({
"device_class": device_class,
"unit_of_measurement": config.get("unit_of_measurement"),
"icon": config.get("icon"),
})
await db.commit()
await db.refresh(device)
# 订阅状态主题
await mqtt_client.subscribe(state_topic, qos=1)
logger.info("HA Discovery device registered: %s -> %s", device.name, state_topic)
return device
```
- [ ] **Step 2: 创建 tests/test_discovery.py**
```python
import json
import pytest
from unittest.mock import AsyncMock
from mqtt_home.discovery import parse_discovery_topic, handle_discovery_message
from mqtt_home.config import Settings
from mqtt_home.mqtt_client import MqttClient
@pytest.fixture
def settings():
return Settings(
mqtt_host="localhost",
emqx_api_url="http://localhost:18083/api/v5",
emqx_api_key="test-key",
emqx_api_secret="test-secret",
database_url="sqlite+aiosqlite:///:memory:",
)
def test_parse_discovery_topic_valid():
result = parse_discovery_topic("homeassistant/light/abc123/config")
assert result is not None
assert result["component"] == "light"
assert result["node_id"] == "abc123"
def test_parse_discovery_topic_nested_node():
result = parse_discovery_topic("homeassistant/sensor/room/temperature/config")
assert result is not None
assert result["component"] == "sensor"
assert result["node_id"] == "room/temperature"
def test_parse_discovery_topic_invalid():
assert parse_discovery_topic("other/topic/config") is None
assert parse_discovery_topic("homeassistant/light/abc") is None
assert parse_discovery_topic("homeassistant/light/abc/status") is None
async def test_handle_discovery_creates_device(db_session, settings):
mqtt_client = MqttClient(settings)
mqtt_client.subscribe = AsyncMock()
payload = json.dumps({
"name": "客厅灯",
"state_topic": "home/living/light/status",
"command_topic": "home/living/light/set",
"device_class": "light",
})
device = await handle_discovery_message(
"homeassistant/light/living_room/config",
payload,
db_session,
mqtt_client,
)
assert device is not None
assert device.name == "客厅灯"
assert device.protocol == "ha_discovery"
assert device.mqtt_topic == "home/living/light/status"
mqtt_client.subscribe.assert_called_once_with("home/living/light/status", qos=1)
async def test_handle_discovery_duplicate(db_session, settings):
mqtt_client = MqttClient(settings)
mqtt_client.subscribe = AsyncMock()
payload = json.dumps({
"name": "",
"state_topic": "home/light",
})
await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client)
device2 = await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client)
# 同一个 discovery_topic 不应该创建重复设备
assert device2 is not None
mqtt_client.subscribe.assert_called_once() # 只调用一次
```
- [ ] **Step 3: 运行测试**
Run: `pytest tests/test_discovery.py -v`
Expected: 5 passed
- [ ] **Step 4: 提交**
```bash
git add -A
git commit -m "feat: HA Discovery protocol handler for auto-registering devices"
```
---
### Task 8: FastAPI 应用与 REST API 路由
**Files:**
- Create: `src/mqtt_home/main.py`
- Create: `src/mqtt_home/api/__init__.py`
- Create: `src/mqtt_home/api/devices.py`
- Create: `src/mqtt_home/api/broker.py`
- Create: `src/mqtt_home/api/dashboard.py`
- Create: `tests/test_api_devices.py`
- Create: `tests/test_api_broker.py`
- [ ] **Step 1: 创建 src/mqtt_home/api/__init__.py**
```python
from fastapi import APIRouter
from mqtt_home.api.devices import router as devices_router
from mqtt_home.api.broker import router as broker_router
from mqtt_home.api.dashboard import router as dashboard_router
api_router = APIRouter(prefix="/api")
api_router.include_router(devices_router, prefix="/devices", tags=["devices"])
api_router.include_router(broker_router, prefix="/broker", tags=["broker"])
api_router.include_router(dashboard_router, prefix="/dashboard", tags=["dashboard"])
```
- [ ] **Step 2: 创建 src/mqtt_home/api/devices.py**
```python
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from mqtt_home.database import get_db
from mqtt_home.device_registry import (
list_devices, get_device, create_device, update_device,
delete_device, send_command, get_device_logs,
)
from mqtt_home.schemas import (
DeviceCreate, DeviceUpdate, DeviceCommand,
DeviceResponse, DeviceLogResponse,
)
router = APIRouter()
@router.get("", response_model=list[DeviceResponse])
async def get_devices(db: AsyncSession = Depends(get_db)):
return await list_devices(db)
@router.post("", response_model=DeviceResponse, status_code=201)
async def add_device(data: DeviceCreate, db: AsyncSession = Depends(get_db)):
return await create_device(db, data)
@router.get("/{device_id}", response_model=DeviceResponse)
async def get_device_detail(device_id: str, db: AsyncSession = Depends(get_db)):
device = await get_device(db, device_id)
if not device:
raise HTTPException(status_code=404, detail="Device not found")
return device
@router.put("/{device_id}", response_model=DeviceResponse)
async def patch_device(device_id: str, data: DeviceUpdate, db: AsyncSession = Depends(get_db)):
device = await update_device(db, device_id, data)
if not device:
raise HTTPException(status_code=404, detail="Device not found")
return device
@router.delete("/{device_id}", status_code=204)
async def remove_device(device_id: str, db: AsyncSession = Depends(get_db)):
if not await delete_device(db, device_id):
raise HTTPException(status_code=404, detail="Device not found")
@router.post("/{device_id}/command", response_model=DeviceLogResponse)
async def command_device(
device_id: str, data: DeviceCommand,
db: AsyncSession = Depends(get_db),
):
# 从应用状态获取 mqtt_client 和 publish_fn
from mqtt_home.main import app
mqtt_client = getattr(app.state, "mqtt_client", None)
if not mqtt_client or not mqtt_client.is_connected:
raise HTTPException(status_code=503, detail="MQTT not connected")
try:
log = await send_command(
db, device_id, data.payload,
publish_fn=mqtt_client.publish,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not log:
raise HTTPException(status_code=404, detail="Device not found")
return log
@router.get("/{device_id}/logs", response_model=list[DeviceLogResponse])
async def read_device_logs(device_id: str, limit: int = 20, db: AsyncSession = Depends(get_db)):
return await get_device_logs(db, device_id, limit)
```
- [ ] **Step 3: 创建 src/mqtt_home/api/broker.py**
```python
from fastapi import APIRouter, Depends, HTTPException
from mqtt_home.schemas import BrokerClient, BrokerTopic
router = APIRouter()
@router.get("/status")
async def broker_status():
from mqtt_home.main import app
emqx = getattr(app.state, "emqx_client", None)
if not emqx:
raise HTTPException(status_code=503, detail="EMQX API client not configured")
try:
status = await emqx.get_broker_status()
metrics = await emqx.get_metrics()
return {"status": status, "metrics": metrics}
except Exception as e:
raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
@router.get("/clients", response_model=list[BrokerClient])
async def broker_clients(limit: int = 100):
from mqtt_home.main import app
emqx = getattr(app.state, "emqx_client", None)
if not emqx:
raise HTTPException(status_code=503, detail="EMQX API client not configured")
try:
return await emqx.get_clients(limit)
except Exception as e:
raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
@router.get("/topics", response_model=list[BrokerTopic])
async def broker_topics(limit: int = 100):
from mqtt_home.main import app
emqx = getattr(app.state, "emqx_client", None)
if not emqx:
raise HTTPException(status_code=503, detail="EMQX API client not configured")
try:
return await emqx.get_topics(limit)
except Exception as e:
raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
```
- [ ] **Step 4: 创建 src/mqtt_home/api/dashboard.py**
```python
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from mqtt_home.database import get_db
from mqtt_home.device_registry import get_dashboard_stats
from mqtt_home.schemas import DashboardStats
router = APIRouter()
@router.get("", response_model=DashboardStats)
async def dashboard(db: AsyncSession = Depends(get_db)):
return await get_dashboard_stats(db)
```
- [ ] **Step 5: 创建 src/mqtt_home/main.py**
```python
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from mqtt_home.config import get_settings
from mqtt_home.database import init_db, get_session_factory, Base
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.discovery import handle_discovery_message
from mqtt_home.device_registry import handle_state_update
from mqtt_home.api import api_router
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = get_settings()
# 初始化数据库
await init_db()
logger.info("Database initialized")
# 初始化 EMQX API 客户端
emqx = EmqxApiClient(settings)
app.state.emqx_client = emqx
logger.info("EMQX API client initialized")
# 初始化 MQTT 客户端
mqtt = MqttClient(settings)
app.state.mqtt_client = mqtt
session_factory = get_session_factory()
# 注册 HA Discovery 回调
async def on_discovery(topic: str, payload: str):
async with session_factory() as db:
await handle_discovery_message(topic, payload, db, mqtt)
# 注册状态更新回调
async def on_state(topic: str, payload: str):
async with session_factory() as db:
device = await handle_state_update(db, topic, payload)
# WebSocket 推送会在前端 Task 中添加
mqtt.on_message("homeassistant/#", on_discovery)
mqtt.on_message("home/#", on_state)
await mqtt.start()
logger.info("MQTT client started")
yield
await mqtt.stop()
await emqx.close()
logger.info("Shutdown complete")
app = FastAPI(title="MQTT Home", version="0.1.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router)
@app.get("/health")
async def health():
mqtt = getattr(app.state, "mqtt_client", None)
return {
"status": "ok",
"mqtt_connected": mqtt.is_connected if mqtt else False,
}
```
- [ ] **Step 6: 创建 tests/test_api_devices.py**
```python
import pytest
from httpx import AsyncClient, ASGITransport
from mqtt_home.main import app
from mqtt_home.database import Base, get_engine, get_session_factory
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.pool import StaticPool
TEST_DB = "sqlite+aiosqlite:///:memory:"
@pytest.fixture
async def client():
engine = create_async_engine(TEST_DB, connect_args={"check_same_thread": False}, poolclass=StaticPool)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
test_factory = async_sessionmaker(engine, expire_on_commit=False)
async def override_get_db():
async with test_factory() as session:
yield session
app.dependency_overrides[get_db.__wrapped__ if hasattr(get_db, '__wrapped__') else get_db] = override_get_db
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
await engine.dispose()
async def test_get_devices_empty(client):
resp = await client.get("/api/devices")
assert resp.status_code == 200
assert resp.json() == []
async def test_create_device(client):
resp = await client.post("/api/devices", json={
"name": "客厅灯",
"type": "light",
"mqtt_topic": "home/light",
"command_topic": "home/light/set",
})
assert resp.status_code == 201
data = resp.json()
assert data["name"] == "客厅灯"
assert data["type"] == "light"
async def test_get_device_detail(client):
create_resp = await client.post("/api/devices", json={
"name": "", "type": "switch", "mqtt_topic": "t"
})
device_id = create_resp.json()["id"]
resp = await client.get(f"/api/devices/{device_id}")
assert resp.status_code == 200
assert resp.json()["name"] == ""
async def test_get_device_not_found(client):
resp = await client.get("/api/devices/nonexistent")
assert resp.status_code == 404
async def test_update_device(client):
create_resp = await client.post("/api/devices", json={
"name": "", "type": "switch", "mqtt_topic": "t"
})
device_id = create_resp.json()["id"]
resp = await client.put(f"/api/devices/{device_id}", json={"name": "新名字"})
assert resp.status_code == 200
assert resp.json()["name"] == "新名字"
async def test_delete_device(client):
create_resp = await client.post("/api/devices", json={
"name": "", "type": "switch", "mqtt_topic": "t"
})
device_id = create_resp.json()["id"]
resp = await client.delete(f"/api/devices/{device_id}")
assert resp.status_code == 204
async def test_delete_device_not_found(client):
resp = await client.delete("/api/devices/nonexistent")
assert resp.status_code == 404
```
- [ ] **Step 7: 创建 tests/test_api_broker.py**
```python
import pytest
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock, patch
from mqtt_home.main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
async def test_broker_status_no_client(client):
# 没有 mqtt_client 时应返回 503
resp = await client.get("/api/broker/status")
assert resp.status_code == 503
async def test_broker_clients_no_client(client):
resp = await client.get("/api/broker/clients")
assert resp.status_code == 503
async def test_health_endpoint(client):
resp = await client.get("/health")
assert resp.status_code == 200
data = resp.json()
assert "status" in data
assert "mqtt_connected" in data
```
- [ ] **Step 8: 运行所有测试**
Run: `pytest tests/ -v`
Expected: All tests pass
- [ ] **Step 9: 提交**
```bash
git add -A
git commit -m "feat: FastAPI app with device, broker, dashboard REST API routes"
```
---
### Task 9: WebSocket 实时推送
**Files:**
- Modify: `src/mqtt_home/main.py`
- Create: `src/mqtt_home/ws.py`
- [ ] **Step 1: 创建 src/mqtt_home/ws.py**
```python
import asyncio
import json
import logging
from fastapi import WebSocket, WebSocketDisconnect
logger = logging.getLogger(__name__)
class ConnectionManager:
def __init__(self):
self._connections: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self._connections.append(ws)
logger.info("WebSocket connected, total: %d", len(self._connections))
def disconnect(self, ws: WebSocket):
self._connections.remove(ws)
logger.info("WebSocket disconnected, total: %d", len(self._connections))
async def broadcast(self, message: dict):
dead = []
for ws in self._connections:
try:
await ws.send_json(message)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws)
ws_manager = ConnectionManager()
async def websocket_endpoint(ws: WebSocket):
await ws_manager.connect(ws)
try:
while True:
await ws.receive_text() # 保持连接,忽略客户端消息
except WebSocketDisconnect:
ws_manager.disconnect(ws)
async def broadcast_device_update(device_id: str, data: dict):
await ws_manager.broadcast({
"type": "device_update",
"device_id": device_id,
**data,
})
```
- [ ] **Step 2: 修改 src/mqtt_home/main.py添加 WebSocket 路由和广播**
`from mqtt_home.api import api_router` 后面添加:
```python
from mqtt_home.ws import websocket_endpoint, broadcast_device_update
```
`app.include_router(api_router)` 后面添加:
```python
from mqtt_home.ws import ws_manager
app.websocket("/ws/devices")(websocket_endpoint)
```
修改 `on_state` 回调,添加 WebSocket 广播:
```python
async def on_state(topic: str, payload: str):
async with session_factory() as db:
device = await handle_state_update(db, topic, payload)
if device:
await broadcast_device_update(device.id, {
"state": device.state,
"is_online": device.is_online,
"last_seen": device.last_seen.isoformat() if device.last_seen else None,
})
```
- [ ] **Step 3: 提交**
```bash
git add -A
git commit -m "feat: WebSocket endpoint for real-time device state updates"
```
---
### Task 10: CLI 命令
**Files:**
- Create: `src/mqtt_home/cli.py`
- Create: `tests/test_cli.py`
- [ ] **Step 1: 创建 src/mqtt_home/cli.py**
```python
import asyncio
import json
import click
from mqtt_home.config import get_settings
from mqtt_home.database import init_db, get_session_factory
from mqtt_home.emqx_api import EmqxApiClient
from mqtt_home.device_registry import (
list_devices, get_device, create_device, delete_device,
send_command, get_device_logs,
)
from mqtt_home.schemas import DeviceCreate
from mqtt_home.mqtt_client import MqttClient
from mqtt_home.discovery import handle_discovery_message, DISCOVERY_TOPIC_PREFIX
from mqtt_home.device_registry import handle_state_update
def run_async(coro):
return asyncio.get_event_loop().run_until_complete(coro)
@click.group()
def cli():
"""MQTT 智能家居管理工具"""
pass
@cli.group()
def device():
"""设备管理"""
pass
@device.command("list")
def device_list():
"""列出所有设备"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
devices = await list_devices(db)
if not devices:
click.echo("暂无设备")
return
for d in devices:
status = "🟢" if d.is_online else "🔴"
state = d.state or "-"
click.echo(f"{status} [{d.id[:8]}] {d.name} ({d.type}) state={state}")
run_async(_run())
@device.command("add")
@click.option("--name", required=True, help="设备名称")
@click.option("--type", "device_type", default="switch", help="设备类型")
@click.option("--state-topic", required=True, help="状态主题")
@click.option("--command-topic", default=None, help="命令主题")
def device_add(name, device_type, state_topic, command_topic):
"""手动添加设备"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
d = await create_device(db, DeviceCreate(
name=name,
type=device_type,
mqtt_topic=state_topic,
command_topic=command_topic,
))
click.echo(f"设备已创建: {d.id} - {d.name}")
run_async(_run())
@device.command("info")
@click.argument("device_id")
def device_info(device_id):
"""查看设备详情"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
d = await get_device(db, device_id)
if not d:
click.echo(f"设备不存在: {device_id}", err=True)
return
click.echo(f"ID: {d.id}")
click.echo(f"名称: {d.name}")
click.echo(f"类型: {d.type}")
click.echo(f"协议: {d.protocol}")
click.echo(f"状态主题: {d.mqtt_topic}")
click.echo(f"命令主题: {d.command_topic or ''}")
click.echo(f"当前状态: {d.state or ''}")
click.echo(f"在线: {'' if d.is_online else ''}")
click.echo(f"最后活跃: {d.last_seen or '从未'}")
run_async(_run())
@device.command("remove")
@click.argument("device_id")
def device_remove(device_id):
"""删除设备"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
if await delete_device(db, device_id):
click.echo(f"设备已删除: {device_id}")
else:
click.echo(f"设备不存在: {device_id}", err=True)
run_async(_run())
@device.command("command")
@click.argument("device_id")
@click.option("--payload", required=True, help="命令内容JSON")
def device_command(device_id, payload):
"""向设备发送命令"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
# 连接 MQTT
mqtt = MqttClient(settings)
await mqtt._run.__wrapped__(mqtt) if False else None # 不启动消息循环
# 直接使用 aiomqtt 发送
import aiomqtt
async with aiomqtt.Client(
hostname=settings.mqtt_host,
port=settings.mqtt_port,
username=settings.mqtt_username or None,
password=settings.mqtt_password or None,
) as client:
async with factory() as db:
try:
log = await send_command(
db, device_id, payload,
publish_fn=lambda topic, p: client.publish(topic, p, qos=1),
)
if log:
click.echo(f"命令已发送到 {log.topic}: {log.payload}")
else:
click.echo(f"设备不存在: {device_id}", err=True)
except ValueError as e:
click.echo(str(e), err=True)
run_async(_run())
@device.command("logs")
@click.argument("device_id")
@click.option("--limit", default=20, help="日志条数")
def device_logs(device_id, limit):
"""查看设备消息日志"""
async def _run():
settings = get_settings()
await init_db()
factory = get_session_factory()
async with factory() as db:
logs = await get_device_logs(db, device_id, limit)
if not logs:
click.echo("暂无日志")
return
for log in logs:
direction = "RX" if log.direction == "rx" else "TX"
click.echo(f"[{log.timestamp}] {direction} {log.topic} | {log.payload}")
run_async(_run())
@cli.group()
def broker():
"""Broker 管理"""
pass
@broker.command("status")
def broker_status():
"""查看 Broker 状态"""
async def _run():
settings = get_settings()
emqx = EmqxApiClient(settings)
try:
status = await emqx.get_broker_status()
click.echo(f"版本: {status.get('version', 'unknown')}")
click.echo(f"运行时间: {status.get('uptime', 0)}s")
metrics = await emqx.get_metrics()
click.echo(f"连接数: {metrics.get('connections.count', 0)}")
click.echo(f"订阅数: {metrics.get('subscriptions.count', 0)}")
click.echo(f"主题数: {metrics.get('topics.count', 0)}")
except Exception as e:
click.echo(f"连接 EMQX 失败: {e}", err=True)
finally:
await emqx.close()
run_async(_run())
@broker.command("clients")
def broker_clients():
"""列出已连接客户端"""
async def _run():
settings = get_settings()
emqx = EmqxApiClient(settings)
try:
clients = await emqx.get_clients()
if not clients:
click.echo("暂无客户端连接")
return
for c in clients:
status = "在线" if c.get("connected") else "离线"
click.echo(f"[{c.get('clientid')}] {status} ip={c.get('ip_address')}")
except Exception as e:
click.echo(f"连接 EMQX 失败: {e}", err=True)
finally:
await emqx.close()
run_async(_run())
@broker.command("topics")
def broker_topics():
"""列出活跃主题"""
async def _run():
settings = get_settings()
emqx = EmqxApiClient(settings)
try:
topics = await emqx.get_topics()
if not topics:
click.echo("暂无活跃主题")
return
for t in topics:
click.echo(t.get("topic", ""))
except Exception as e:
click.echo(f"连接 EMQX 失败: {e}", err=True)
finally:
await emqx.close()
run_async(_run())
```
- [ ] **Step 2: 创建 tests/test_cli.py**
```python
import pytest
from click.testing import CliRunner
from mqtt_home.cli import cli
def test_device_list_empty(monkeypatch):
"""测试无设备时的 list 命令"""
runner = CliRunner()
async def mock_init():
pass
async def mock_list(db):
return []
monkeypatch.setattr("mqtt_home.cli.init_db", mock_init)
monkeypatch.setattr("mqtt_home.cli.list_devices", mock_list)
result = runner.invoke(cli, ["device", "list"])
assert result.exit_code == 0
assert "暂无设备" in result.output
def test_cli_groups():
runner = CliRunner()
result = runner.invoke(cli, ["--help"])
assert "device" in result.output
assert "broker" in result.output
```
- [ ] **Step 3: 运行测试**
Run: `pytest tests/test_cli.py -v`
Expected: 2 passed
- [ ] **Step 4: 提交**
```bash
git add -A
git commit -m "feat: CLI commands for device management and broker monitoring"
```
---
### Task 11: 创建 .env 并验证端到端
**Files:**
- Create: `.env`
- [ ] **Step 1: 创建 .env从 .env.example 复制并填入实际值)**
```
MQTT_HOST=192.168.0.31
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
EMQX_API_URL=http://192.168.0.31:18083/api/v5
EMQX_API_KEY=ed74cf28e117c5f6
EMQX_API_SECRET=fED31TaybOUXx9CtkabICmT0wEZuUPXSU0vPwIekSbbC
DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db
WEB_HOST=0.0.0.0
WEB_PORT=8000
```
- [ ] **Step 2: 启动服务验证**
Run: `python -m mqtt_home serve`
Expected: 服务启动成功,日志显示 "MQTT connected" 和 "Database initialized"
- [ ] **Step 3: 用 CLI 验证 broker 连接**
Run: `mqtt-home broker status`
Expected: 显示 EMQX 版本和统计信息
- [ ] **Step 4: 用 API 验证**
Run: `curl http://localhost:8000/health`
Expected: `{"status":"ok","mqtt_connected":true}`
Run: `curl http://localhost:8000/api/broker/clients`
Expected: 客户端列表 JSON
- [ ] **Step 5: 提交(.env 已在 .gitignore 中)**
```bash
git add -A
git commit -m "feat: verify end-to-end backend with EMQX broker connection"
```
---
### Task 12: 全量测试确认
- [ ] **Step 1: 运行所有测试**
Run: `pytest tests/ -v`
Expected: 所有测试通过
- [ ] **Step 2: 修复任何失败的测试**
根据实际输出修复问题。
- [ ] **Step 3: 最终提交**
```bash
git add -A
git commit -m "test: ensure all backend tests pass"
```
---
## 自检结果
| 设计文档章节 | 对应 Task |
|---|---|
| 项目结构 | Task 1 |
| 数据模型 (devices, device_logs) | Task 2 |
| Pydantic Schemas | Task 3 |
| EMQX REST API 封装 | Task 4 |
| MQTT 客户端 + 重连 + 通配符 | Task 5 |
| 设备注册表 (CRUD + 状态 + 命令) | Task 6 |
| HA Discovery 协议 | Task 7 |
| REST API 路由 (devices, broker, dashboard) | Task 8 |
| WebSocket 实时推送 | Task 9 |
| CLI 命令 | Task 10 |
| 配置 / .env | Task 11 |