2190 lines
62 KiB
Markdown
2190 lines
62 KiB
Markdown
# MQTT 智能家居管理系统 - 后端实现计划
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** 实现后端核心功能,包括 FastAPI REST API、MQTT 客户端、EMQX API 封装、CLI 命令、SQLite 数据库。
|
||
|
||
**Architecture:** 单体 FastAPI 应用,通过 aiomqtt 连接 EMQX Broker,使用 SQLite + SQLAlchemy 存储设备数据。CLI 和 REST API 共享 device_registry 业务逻辑层。
|
||
|
||
**Tech Stack:** Python 3.11+, FastAPI, uvicorn, aiomqtt, SQLAlchemy 2.0 + aiosqlite, pydantic-settings, click, httpx
|
||
|
||
---
|
||
|
||
## 文件结构总览
|
||
|
||
```
|
||
mqtt-home/
|
||
├── pyproject.toml
|
||
├── .env.example
|
||
├── .gitignore
|
||
├── src/
|
||
│ └── mqtt_home/
|
||
│ ├── __init__.py
|
||
│ ├── config.py # Settings (pydantic-settings)
|
||
│ ├── database.py # SQLite engine, async session
|
||
│ ├── models.py # SQLAlchemy ORM (Device, DeviceLog)
|
||
│ ├── schemas.py # Pydantic schemas (request/response)
|
||
│ ├── emqx_api.py # EMQX REST API client (httpx)
|
||
│ ├── mqtt_client.py # MQTT connection lifecycle (aiomqtt)
|
||
│ ├── discovery.py # HA Discovery protocol handler
|
||
│ ├── device_registry.py # Business logic (CRUD + control)
|
||
│ ├── main.py # FastAPI app + lifespan
|
||
│ ├── cli.py # click CLI commands
|
||
│ └── api/
|
||
│ ├── __init__.py # Router aggregation
|
||
│ ├── devices.py # Device REST routes
|
||
│ ├── broker.py # Broker REST routes
|
||
│ └── dashboard.py # Dashboard REST routes
|
||
└── tests/
|
||
├── conftest.py
|
||
├── test_config.py
|
||
├── test_models.py
|
||
├── test_device_registry.py
|
||
├── test_emqx_api.py
|
||
├── test_discovery.py
|
||
├── test_api_devices.py
|
||
├── test_api_broker.py
|
||
└── test_cli.py
|
||
```
|
||
|
||
---
|
||
|
||
### Task 1: 项目骨架与配置
|
||
|
||
**Files:**
|
||
- Create: `pyproject.toml`
|
||
- Create: `.env.example`
|
||
- Create: `.gitignore`
|
||
- Create: `src/mqtt_home/__init__.py`
|
||
- Create: `src/mqtt_home/config.py`
|
||
- Create: `tests/conftest.py`
|
||
- Create: `tests/test_config.py`
|
||
|
||
- [ ] **Step 1: 创建 pyproject.toml**
|
||
|
||
```toml
|
||
[build-system]
|
||
requires = ["setuptools>=68.0", "wheel"]
|
||
build-backend = "setuptools.backends._legacy:_Backend"
|
||
|
||
[project]
|
||
name = "mqtt-home"
|
||
version = "0.1.0"
|
||
description = "轻量级智能家居 MQTT 设备管理系统"
|
||
requires-python = ">=3.11"
|
||
dependencies = [
|
||
"fastapi>=0.115.0",
|
||
"uvicorn[standard]>=0.30.0",
|
||
"aiomqtt>=2.0.0",
|
||
"sqlalchemy[asyncio]>=2.0.0",
|
||
"aiosqlite>=0.20.0",
|
||
"pydantic-settings>=2.0.0",
|
||
"click>=8.1.0",
|
||
"httpx>=0.27.0",
|
||
"websockets>=12.0",
|
||
]
|
||
|
||
[project.optional-dependencies]
|
||
dev = [
|
||
"pytest>=8.0.0",
|
||
"pytest-asyncio>=0.23.0",
|
||
"pytest-httpx>=0.30.0",
|
||
"httpx>=0.27.0",
|
||
]
|
||
|
||
[project.scripts]
|
||
mqtt-home = "mqtt_home.cli:cli"
|
||
|
||
[tool.setuptools.packages.find]
|
||
where = ["src"]
|
||
|
||
[tool.pytest.ini_options]
|
||
asyncio_mode = "auto"
|
||
testpaths = ["tests"]
|
||
```
|
||
|
||
- [ ] **Step 2: 创建 .env.example**
|
||
|
||
```
|
||
MQTT_HOST=192.168.0.31
|
||
MQTT_PORT=1883
|
||
MQTT_USERNAME=
|
||
MQTT_PASSWORD=
|
||
EMQX_API_URL=http://192.168.0.31:18083/api/v5
|
||
EMQX_API_KEY=your-api-key
|
||
EMQX_API_SECRET=your-secret
|
||
DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db
|
||
WEB_HOST=0.0.0.0
|
||
WEB_PORT=8000
|
||
```
|
||
|
||
- [ ] **Step 3: 创建 .gitignore**
|
||
|
||
```
|
||
__pycache__/
|
||
*.py[cod]
|
||
*.egg-info/
|
||
dist/
|
||
build/
|
||
.venv/
|
||
.env
|
||
data/
|
||
*.db
|
||
frontend/node_modules/
|
||
frontend/dist/
|
||
```
|
||
|
||
- [ ] **Step 4: 创建 src/mqtt_home/__init__.py**
|
||
|
||
```python
|
||
```
|
||
|
||
- [ ] **Step 5: 创建 tests/conftest.py**
|
||
|
||
```python
|
||
import pytest
|
||
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
|
||
from sqlalchemy.pool import StaticPool
|
||
|
||
from mqtt_home.database import Base
|
||
from mqtt_home.config import Settings
|
||
|
||
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"
|
||
|
||
|
||
@pytest.fixture
|
||
def settings():
|
||
return Settings(
|
||
mqtt_host="localhost",
|
||
mqtt_port=1883,
|
||
emqx_api_url="http://localhost:18083/api/v5",
|
||
emqx_api_key="test-key",
|
||
emqx_api_secret="test-secret",
|
||
database_url=TEST_DATABASE_URL,
|
||
)
|
||
|
||
|
||
@pytest.fixture
|
||
async def engine():
|
||
engine = create_async_engine(
|
||
TEST_DATABASE_URL,
|
||
connect_args={"check_same_thread": False},
|
||
poolclass=StaticPool,
|
||
)
|
||
async with engine.begin() as conn:
|
||
await conn.run_sync(Base.metadata.create_all)
|
||
yield engine
|
||
async with engine.begin() as conn:
|
||
await conn.run_sync(Base.metadata.drop_all)
|
||
await engine.dispose()
|
||
|
||
|
||
@pytest.fixture
|
||
async def db_session(engine):
|
||
async_session = async_sessionmaker(engine, expire_on_commit=False)
|
||
async with async_session() as session:
|
||
yield session
|
||
```
|
||
|
||
- [ ] **Step 6: 创建 src/mqtt_home/config.py**
|
||
|
||
```python
|
||
from pydantic_settings import BaseSettings
|
||
|
||
|
||
class Settings(BaseSettings):
|
||
mqtt_host: str = "localhost"
|
||
mqtt_port: int = 1883
|
||
mqtt_username: str = ""
|
||
mqtt_password: str = ""
|
||
emqx_api_url: str = "http://localhost:18083/api/v5"
|
||
emqx_api_key: str = ""
|
||
emqx_api_secret: str = ""
|
||
database_url: str = "sqlite+aiosqlite:///./data/mqtt_home.db"
|
||
web_host: str = "0.0.0.0"
|
||
web_port: int = 8000
|
||
|
||
model_config = {"env_file": ".env", "env_prefix": ""}
|
||
|
||
|
||
def get_settings() -> Settings:
|
||
return Settings()
|
||
```
|
||
|
||
- [ ] **Step 7: 创建 tests/test_config.py**
|
||
|
||
```python
|
||
import os
|
||
from mqtt_home.config import Settings
|
||
|
||
|
||
def test_default_settings():
|
||
s = Settings()
|
||
assert s.mqtt_host == "localhost"
|
||
assert s.mqtt_port == 1883
|
||
assert s.web_port == 8000
|
||
|
||
|
||
def test_settings_from_env(monkeypatch):
|
||
monkeypatch.setenv("MQTT_HOST", "192.168.1.1")
|
||
monkeypatch.setenv("MQTT_PORT", "1884")
|
||
s = Settings()
|
||
assert s.mqtt_host == "192.168.1.1"
|
||
assert s.mqtt_port == 1884
|
||
```
|
||
|
||
- [ ] **Step 8: 安装依赖并运行测试**
|
||
|
||
Run: `pip install -e ".[dev]"` (在 `D:\home\mqtt` 目录下)
|
||
Run: `pytest tests/test_config.py -v`
|
||
Expected: 2 passed
|
||
|
||
- [ ] **Step 9: 初始化 git 仓库并提交**
|
||
|
||
Run:
|
||
```bash
|
||
cd D:\home\mqtt
|
||
git init
|
||
git add -A
|
||
git commit -m "feat: project skeleton with config, dependencies, and test setup"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 2: 数据库与 ORM 模型
|
||
|
||
**Files:**
|
||
- Create: `src/mqtt_home/database.py`
|
||
- Create: `src/mqtt_home/models.py`
|
||
- Create: `tests/test_models.py`
|
||
|
||
- [ ] **Step 1: 创建 src/mqtt_home/database.py**
|
||
|
||
```python
|
||
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, async_session
|
||
from sqlalchemy.orm import DeclarativeBase
|
||
from mqtt_home.config import get_settings
|
||
|
||
|
||
class Base(DeclarativeBase):
|
||
pass
|
||
|
||
|
||
def get_engine():
|
||
settings = get_settings()
|
||
return create_async_engine(
|
||
settings.database_url,
|
||
echo=False,
|
||
)
|
||
|
||
|
||
def get_session_factory(engine=None):
|
||
_engine = engine or get_engine()
|
||
return async_sessionmaker(_engine, expire_on_commit=False)
|
||
|
||
|
||
async def init_db():
|
||
engine = get_engine()
|
||
async with engine.begin() as conn:
|
||
await conn.run_sync(Base.metadata.create_all)
|
||
await engine.dispose()
|
||
|
||
|
||
async def get_db() -> async_session:
|
||
factory = get_session_factory()
|
||
async with factory() as session:
|
||
yield session
|
||
```
|
||
|
||
- [ ] **Step 2: 创建 src/mqtt_home/models.py**
|
||
|
||
```python
|
||
import uuid
|
||
from datetime import datetime, timezone
|
||
|
||
from sqlalchemy import Column, String, Boolean, DateTime, Text, Integer, ForeignKey
|
||
from sqlalchemy.orm import relationship
|
||
|
||
from mqtt_home.database import Base
|
||
|
||
|
||
def _utcnow():
|
||
return datetime.now(timezone.utc)
|
||
|
||
|
||
def _new_id():
|
||
return str(uuid.uuid4())
|
||
|
||
|
||
class Device(Base):
|
||
__tablename__ = "devices"
|
||
|
||
id = Column(String(36), primary_key=True, default=_new_id)
|
||
name = Column(String(200), nullable=False)
|
||
type = Column(String(50), nullable=False, default="switch")
|
||
protocol = Column(String(20), nullable=False, default="custom")
|
||
mqtt_topic = Column(String(500), nullable=False)
|
||
command_topic = Column(String(500), nullable=True)
|
||
discovery_topic = Column(String(500), nullable=True)
|
||
discovery_payload = Column(Text, nullable=True)
|
||
attributes = Column(Text, nullable=True, default="{}")
|
||
state = Column(String(500), nullable=True, default=None)
|
||
is_online = Column(Boolean, nullable=False, default=False)
|
||
last_seen = Column(DateTime(timezone=True), nullable=True)
|
||
created_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow)
|
||
updated_at = Column(DateTime(timezone=True), nullable=False, default=_utcnow, onupdate=_utcnow)
|
||
|
||
logs = relationship("DeviceLog", back_populates="device", lazy="dynamic")
|
||
|
||
|
||
class DeviceLog(Base):
|
||
__tablename__ = "device_logs"
|
||
|
||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||
device_id = Column(String(36), ForeignKey("devices.id", ondelete="CASCADE"), nullable=False)
|
||
direction = Column(String(10), nullable=False) # "rx" or "tx"
|
||
topic = Column(String(500), nullable=False)
|
||
payload = Column(Text, nullable=False)
|
||
timestamp = Column(DateTime(timezone=True), nullable=False, default=_utcnow)
|
||
|
||
device = relationship("Device", back_populates="logs")
|
||
```
|
||
|
||
- [ ] **Step 3: 创建 tests/test_models.py**
|
||
|
||
```python
|
||
import pytest
|
||
from mqtt_home.models import Device, DeviceLog
|
||
|
||
|
||
async def test_create_device(db_session):
|
||
device = Device(
|
||
name="客厅灯",
|
||
type="light",
|
||
protocol="custom",
|
||
mqtt_topic="home/living/light",
|
||
command_topic="home/living/light/set",
|
||
)
|
||
db_session.add(device)
|
||
await db_session.commit()
|
||
await db_session.refresh(device)
|
||
|
||
assert device.id is not None
|
||
assert len(device.id) == 36
|
||
assert device.name == "客厅灯"
|
||
assert device.state is None
|
||
assert device.is_online is False
|
||
|
||
|
||
async def test_create_device_log(db_session):
|
||
device = Device(
|
||
name="温度传感器",
|
||
type="sensor",
|
||
protocol="custom",
|
||
mqtt_topic="home/temperature",
|
||
)
|
||
db_session.add(device)
|
||
await db_session.flush()
|
||
|
||
log = DeviceLog(
|
||
device_id=device.id,
|
||
direction="rx",
|
||
topic="home/temperature",
|
||
payload='{"temperature": 25.5}',
|
||
)
|
||
db_session.add(log)
|
||
await db_session.commit()
|
||
|
||
fetched_device = await db_session.get(Device, device.id)
|
||
assert fetched_device.logs.count() == 1
|
||
```
|
||
|
||
- [ ] **Step 4: 运行测试**
|
||
|
||
Run: `pytest tests/test_models.py -v`
|
||
Expected: 2 passed
|
||
|
||
- [ ] **Step 5: 提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: database setup with SQLAlchemy ORM models for Device and DeviceLog"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 3: Pydantic Schemas
|
||
|
||
**Files:**
|
||
- Create: `src/mqtt_home/schemas.py`
|
||
|
||
- [ ] **Step 1: 创建 src/mqtt_home/schemas.py**
|
||
|
||
```python
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
from pydantic import BaseModel, Field
|
||
|
||
|
||
class DeviceCreate(BaseModel):
|
||
name: str
|
||
type: str = "switch"
|
||
protocol: str = "custom"
|
||
mqtt_topic: str
|
||
command_topic: Optional[str] = None
|
||
|
||
|
||
class DeviceUpdate(BaseModel):
|
||
name: Optional[str] = None
|
||
type: Optional[str] = None
|
||
command_topic: Optional[str] = None
|
||
|
||
|
||
class DeviceCommand(BaseModel):
|
||
payload: str
|
||
|
||
|
||
class DeviceResponse(BaseModel):
|
||
id: str
|
||
name: str
|
||
type: str
|
||
protocol: str
|
||
mqtt_topic: str
|
||
command_topic: Optional[str] = None
|
||
state: Optional[str] = None
|
||
is_online: bool
|
||
last_seen: Optional[datetime] = None
|
||
created_at: datetime
|
||
updated_at: datetime
|
||
|
||
model_config = {"from_attributes": True}
|
||
|
||
|
||
class DeviceLogResponse(BaseModel):
|
||
id: int
|
||
device_id: str
|
||
direction: str
|
||
topic: str
|
||
payload: str
|
||
timestamp: datetime
|
||
|
||
model_config = {"from_attributes": True}
|
||
|
||
|
||
class BrokerClient(BaseModel):
|
||
clientid: str
|
||
username: Optional[str] = None
|
||
ip_address: Optional[str] = None
|
||
connected: bool
|
||
keepalive: int
|
||
proto_ver: Optional[int] = None
|
||
clean_start: Optional[bool] = None
|
||
|
||
|
||
class BrokerTopic(BaseModel):
|
||
topic: str
|
||
node: Optional[str] = None
|
||
|
||
|
||
class DashboardStats(BaseModel):
|
||
total_devices: int
|
||
online_devices: int
|
||
offline_devices: int
|
||
recent_logs: list[DeviceLogResponse]
|
||
```
|
||
|
||
- [ ] **Step 2: 提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: Pydantic schemas for API request/response models"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 4: EMQX REST API 客户端
|
||
|
||
**Files:**
|
||
- Create: `src/mqtt_home/emqx_api.py`
|
||
- Create: `tests/test_emqx_api.py`
|
||
|
||
- [ ] **Step 1: 创建 src/mqtt_home/emqx_api.py**
|
||
|
||
```python
|
||
import httpx
|
||
from typing import Any
|
||
|
||
from mqtt_home.config import Settings
|
||
|
||
|
||
class EmqxApiClient:
|
||
def __init__(self, settings: Settings):
|
||
self._client = httpx.AsyncClient(
|
||
base_url=settings.emqx_api_url,
|
||
auth=(settings.emqx_api_key, settings.emqx_api_secret),
|
||
timeout=10.0,
|
||
)
|
||
|
||
async def close(self):
|
||
await self._client.aclose()
|
||
|
||
async def get_broker_status(self) -> dict[str, Any]:
|
||
resp = await self._client.get("/status")
|
||
resp.raise_for_status()
|
||
return resp.json().get("data", {})
|
||
|
||
async def get_metrics(self) -> dict[str, Any]:
|
||
resp = await self._client.get("/metrics")
|
||
resp.raise_for_status()
|
||
return resp.json().get("data", {})
|
||
|
||
async def get_clients(self, limit: int = 100) -> list[dict[str, Any]]:
|
||
resp = await self._client.get("/clients", params={"limit": limit})
|
||
resp.raise_for_status()
|
||
return resp.json().get("data", [])
|
||
|
||
async def get_subscriptions(self, limit: int = 100) -> list[dict[str, Any]]:
|
||
resp = await self._client.get("/subscriptions", params={"limit": limit})
|
||
resp.raise_for_status()
|
||
return resp.json().get("data", [])
|
||
|
||
async def get_topics(self, limit: int = 100) -> list[dict[str, Any]]:
|
||
resp = await self._client.get("/topics", params={"limit": limit})
|
||
resp.raise_for_status()
|
||
return resp.json().get("data", [])
|
||
```
|
||
|
||
- [ ] **Step 2: 创建 tests/test_emqx_api.py**
|
||
|
||
```python
|
||
import pytest
|
||
import httpx
|
||
from unittest.mock import AsyncMock, patch
|
||
|
||
from mqtt_home.emqx_api import EmqxApiClient
|
||
from mqtt_home.config import Settings
|
||
|
||
|
||
@pytest.fixture
|
||
def settings():
|
||
return Settings(
|
||
mqtt_host="localhost",
|
||
emqx_api_url="http://localhost:18083/api/v5",
|
||
emqx_api_key="test-key",
|
||
emqx_api_secret="test-secret",
|
||
database_url="sqlite+aiosqlite:///:memory:",
|
||
)
|
||
|
||
|
||
@pytest.fixture
|
||
def emqx_client(settings):
|
||
return EmqxApiClient(settings)
|
||
|
||
|
||
async def test_get_broker_status(emqx_client):
|
||
with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
|
||
mock_get.return_value = httpx.Response(
|
||
200, json={"data": {"uptime": 12345, "version": "5.0.0"}}
|
||
)
|
||
result = await emqx_client.get_broker_status()
|
||
assert result["uptime"] == 12345
|
||
|
||
|
||
async def test_get_clients(emqx_client):
|
||
with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
|
||
mock_get.return_value = httpx.Response(
|
||
200, json={"data": [{"clientid": "dev1", "connected": True}]}
|
||
)
|
||
result = await emqx_client.get_clients()
|
||
assert len(result) == 1
|
||
assert result[0]["clientid"] == "dev1"
|
||
|
||
|
||
async def test_get_topics(emqx_client):
|
||
with patch.object(emqx_client._client, "get", new_callable=AsyncMock) as mock_get:
|
||
mock_get.return_value = httpx.Response(
|
||
200, json={"data": [{"topic": "home/light"}]}
|
||
)
|
||
result = await emqx_client.get_topics()
|
||
assert len(result) == 1
|
||
```
|
||
|
||
- [ ] **Step 3: 运行测试**
|
||
|
||
Run: `pytest tests/test_emqx_api.py -v`
|
||
Expected: 3 passed
|
||
|
||
- [ ] **Step 4: 提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: EMQX REST API client for broker status, clients, and topics"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 5: MQTT 客户端
|
||
|
||
**Files:**
|
||
- Create: `src/mqtt_home/mqtt_client.py`
|
||
- Create: `tests/test_mqtt_client.py`
|
||
|
||
- [ ] **Step 1: 创建 src/mqtt_home/mqtt_client.py**
|
||
|
||
```python
|
||
import asyncio
|
||
import uuid
|
||
import logging
|
||
from typing import Callable, Awaitable
|
||
|
||
import aiomqtt
|
||
|
||
from mqtt_home.config import Settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
MessageCallback = Callable[[str, str], Awaitable[None]]
|
||
|
||
|
||
class MqttClient:
|
||
def __init__(self, settings: Settings):
|
||
self._settings = settings
|
||
self._client: aiomqtt.Client | None = None
|
||
self._callbacks: dict[str, list[MessageCallback]] = {}
|
||
self._connected = False
|
||
self._task: asyncio.Task | None = None
|
||
self._client_id = f"mqtt-home-{uuid.uuid4().hex[:8]}"
|
||
|
||
@property
|
||
def is_connected(self) -> bool:
|
||
return self._connected
|
||
|
||
def on_message(self, topic_filter: str, callback: MessageCallback):
|
||
self._callbacks.setdefault(topic_filter, []).append(callback)
|
||
|
||
async def start(self):
|
||
self._task = asyncio.create_task(self._run())
|
||
|
||
async def stop(self):
|
||
if self._task:
|
||
self._task.cancel()
|
||
try:
|
||
await self._task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
async def publish(self, topic: str, payload: str, qos: int = 1, retain: bool = False):
|
||
if not self._client:
|
||
raise RuntimeError("MQTT client not connected")
|
||
await self._client.publish(topic, payload, qos=qos, retain=retain)
|
||
|
||
async def subscribe(self, topic: str, qos: int = 1):
|
||
if not self._client:
|
||
raise RuntimeError("MQTT client not connected")
|
||
await self._client.subscribe(topic, qos=qos)
|
||
|
||
async def _run(self):
|
||
retry_delay = 1
|
||
max_delay = 60
|
||
while True:
|
||
try:
|
||
async with aiomqtt.Client(
|
||
hostname=self._settings.mqtt_host,
|
||
port=self._settings.mqtt_port,
|
||
username=self._settings.mqtt_username or None,
|
||
password=self._settings.mqtt_password or None,
|
||
identifier=self._client_id,
|
||
clean_session=True,
|
||
) as client:
|
||
self._client = client
|
||
self._connected = True
|
||
logger.info("MQTT connected to %s:%d", self._settings.mqtt_host, self._settings.mqtt_port)
|
||
retry_delay = 1
|
||
|
||
# 重新订阅所有已注册的主题
|
||
for topic_filter in self._callbacks:
|
||
await client.subscribe(topic_filter, qos=1)
|
||
|
||
async for message in client.messages:
|
||
topic = str(message.topic)
|
||
payload = message.payload.decode("utf-8", errors="replace")
|
||
await self._dispatch(topic, payload)
|
||
|
||
except asyncio.CancelledError:
|
||
self._connected = False
|
||
raise
|
||
except Exception as e:
|
||
self._connected = False
|
||
logger.warning("MQTT connection failed: %s, retrying in %ds", e, retry_delay)
|
||
await asyncio.sleep(retry_delay)
|
||
retry_delay = min(retry_delay * 2, max_delay)
|
||
|
||
async def _dispatch(self, topic: str, payload: str):
|
||
matched = False
|
||
for topic_filter, callbacks in self._callbacks.items():
|
||
if self._topic_matches(topic, topic_filter):
|
||
matched = True
|
||
for cb in callbacks:
|
||
try:
|
||
await cb(topic, payload)
|
||
except Exception as e:
|
||
logger.error("MQTT callback error for %s: %s", topic, e)
|
||
if not matched:
|
||
logger.debug("No callback for topic: %s", topic)
|
||
|
||
@staticmethod
|
||
def _topic_matches(topic: str, pattern: str) -> bool:
|
||
"""简单 MQTT 通配符匹配(支持 + 和 #)"""
|
||
topic_parts = topic.split("/")
|
||
pattern_parts = pattern.split("/")
|
||
pi = 0
|
||
for ti, tp in enumerate(pattern_parts):
|
||
if tp == "#":
|
||
return True
|
||
if ti >= len(topic_parts):
|
||
return False
|
||
if tp != "+" and tp != topic_parts[ti]:
|
||
return False
|
||
pi = ti
|
||
return len(topic_parts) == len(pattern_parts)
|
||
```
|
||
|
||
- [ ] **Step 2: 创建 tests/test_mqtt_client.py**
|
||
|
||
```python
|
||
import pytest
|
||
from mqtt_home.mqtt_client import MqttClient
|
||
from mqtt_home.config import Settings
|
||
|
||
|
||
@pytest.fixture
|
||
def mqtt_client():
|
||
settings = Settings(
|
||
mqtt_host="localhost",
|
||
mqtt_port=1883,
|
||
emqx_api_url="http://localhost:18083/api/v5",
|
||
emqx_api_key="test-key",
|
||
emqx_api_secret="test-secret",
|
||
database_url="sqlite+aiosqlite:///:memory:",
|
||
)
|
||
return MqttClient(settings)
|
||
|
||
|
||
def test_topic_matches_exact(mqtt_client):
|
||
assert MqttClient._topic_matches("home/light", "home/light") is True
|
||
assert MqttClient._topic_matches("home/light", "home/switch") is False
|
||
|
||
|
||
def test_topic_matches_single_level_wildcard(mqtt_client):
|
||
assert MqttClient._topic_matches("home/light/status", "home/+/status") is True
|
||
assert MqttClient._topic_matches("home/switch/status", "home/+/status") is True
|
||
assert MqttClient._topic_matches("home/light", "home/+") is True
|
||
assert MqttClient._topic_matches("home/light/extra", "home/+") is False
|
||
|
||
|
||
def test_topic_matches_multi_level_wildcard(mqtt_client):
|
||
assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/#") is True
|
||
assert MqttClient._topic_matches("homeassistant/light/abc/config", "homeassistant/light/#") is True
|
||
assert Mqtt_client._topic_matches("other/topic", "homeassistant/#") is False
|
||
|
||
|
||
def test_register_callback(mqtt_client):
|
||
async def cb(topic, payload):
|
||
pass
|
||
|
||
mqtt_client.on_message("home/#", cb)
|
||
assert "home/#" in mqtt_client._callbacks
|
||
assert len(mqtt_client._callbacks["home/#"]) == 1
|
||
|
||
|
||
def test_publish_not_connected(mqtt_client):
|
||
with pytest.raises(RuntimeError, match="not connected"):
|
||
import asyncio
|
||
asyncio.get_event_loop().run_until_complete(
|
||
mqtt_client.publish("test", "payload")
|
||
)
|
||
```
|
||
|
||
- [ ] **Step 3: 运行测试**
|
||
|
||
Run: `pytest tests/test_mqtt_client.py -v`
|
||
Expected: 5 passed
|
||
|
||
- [ ] **Step 4: 提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: MQTT client with reconnection, topic matching, and callback dispatch"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 6: 设备注册表(核心业务逻辑)
|
||
|
||
**Files:**
|
||
- Create: `src/mqtt_home/device_registry.py`
|
||
- Create: `tests/test_device_registry.py`
|
||
|
||
- [ ] **Step 1: 创建 src/mqtt_home/device_registry.py**
|
||
|
||
```python
|
||
import json
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
from typing import Optional
|
||
|
||
from sqlalchemy import select, func, desc
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from mqtt_home.models import Device, DeviceLog
|
||
from mqtt_home.schemas import DeviceCreate, DeviceUpdate
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
MAX_LOGS_PER_DEVICE = 100
|
||
|
||
|
||
async def list_devices(db: AsyncSession) -> list[Device]:
|
||
result = await db.execute(select(Device).order_by(Device.created_at.desc()))
|
||
return list(result.scalars().all())
|
||
|
||
|
||
async def get_device(db: AsyncSession, device_id: str) -> Optional[Device]:
|
||
return await db.get(Device, device_id)
|
||
|
||
|
||
async def create_device(db: AsyncSession, data: DeviceCreate) -> Device:
|
||
device = Device(
|
||
name=data.name,
|
||
type=data.type,
|
||
protocol=data.protocol,
|
||
mqtt_topic=data.mqtt_topic,
|
||
command_topic=data.command_topic,
|
||
)
|
||
db.add(device)
|
||
await db.commit()
|
||
await db.refresh(device)
|
||
logger.info("Device created: %s (%s)", device.name, device.id)
|
||
return device
|
||
|
||
|
||
async def update_device(db: AsyncSession, device_id: str, data: DeviceUpdate) -> Optional[Device]:
|
||
device = await db.get(Device, device_id)
|
||
if not device:
|
||
return None
|
||
update_data = data.model_dump(exclude_unset=True)
|
||
for key, value in update_data.items():
|
||
setattr(device, key, value)
|
||
await db.commit()
|
||
await db.refresh(device)
|
||
return device
|
||
|
||
|
||
async def delete_device(db: AsyncSession, device_id: str) -> bool:
|
||
device = await db.get(Device, device_id)
|
||
if not device:
|
||
return False
|
||
await db.delete(device)
|
||
await db.commit()
|
||
logger.info("Device deleted: %s", device_id)
|
||
return True
|
||
|
||
|
||
async def send_command(db: AsyncSession, device_id: str, payload: str, publish_fn=None) -> Optional[DeviceLog]:
|
||
device = await db.get(Device, device_id)
|
||
if not device:
|
||
return None
|
||
if not device.command_topic:
|
||
raise ValueError(f"Device {device_id} has no command_topic configured")
|
||
|
||
log = DeviceLog(
|
||
device_id=device_id,
|
||
direction="tx",
|
||
topic=device.command_topic,
|
||
payload=payload,
|
||
)
|
||
db.add(log)
|
||
|
||
if publish_fn:
|
||
await publish_fn(device.command_topic, payload)
|
||
|
||
await db.commit()
|
||
await db.refresh(log)
|
||
logger.info("Command sent to %s: %s", device.command_topic, payload)
|
||
return log
|
||
|
||
|
||
async def handle_state_update(db: AsyncSession, topic: str, payload: str) -> Optional[Device]:
|
||
"""处理从 MQTT 收到的状态更新"""
|
||
result = await db.execute(select(Device).where(Device.mqtt_topic == topic))
|
||
device = result.scalar_one_or_none()
|
||
if not device:
|
||
return None
|
||
|
||
old_state = device.state
|
||
device.state = payload
|
||
device.last_seen = datetime.now(timezone.utc)
|
||
device.is_online = True
|
||
|
||
log = DeviceLog(
|
||
device_id=device.id,
|
||
direction="rx",
|
||
topic=topic,
|
||
payload=payload,
|
||
)
|
||
db.add(log)
|
||
|
||
# 清理旧日志,保留最近 MAX_LOGS_PER_DEVICE 条
|
||
count_result = await db.execute(
|
||
select(func.count()).select_from(DeviceLog).where(DeviceLog.device_id == device.id)
|
||
)
|
||
count = count_result.scalar() or 0
|
||
if count >= MAX_LOGS_PER_DEVICE:
|
||
oldest = await db.execute(
|
||
select(DeviceLog).where(DeviceLog.device_id == device.id)
|
||
.order_by(DeviceLog.timestamp.asc())
|
||
.limit(count - MAX_LOGS_PER_DEVICE + 1)
|
||
)
|
||
for old_log in oldest.scalars().all():
|
||
await db.delete(old_log)
|
||
|
||
await db.commit()
|
||
await db.refresh(device)
|
||
return device
|
||
|
||
|
||
async def get_device_logs(db: AsyncSession, device_id: str, limit: int = 20) -> list[DeviceLog]:
|
||
result = await db.execute(
|
||
select(DeviceLog)
|
||
.where(DeviceLog.device_id == device_id)
|
||
.order_by(desc(DeviceLog.timestamp))
|
||
.limit(limit)
|
||
)
|
||
return list(result.scalars().all())
|
||
|
||
|
||
async def update_device_online_status(db: AsyncSession, topic: str, payload: str) -> Optional[Device]:
|
||
"""处理 HA availability 主题消息"""
|
||
result = await db.execute(select(Device).where(Device.mqtt_topic == topic))
|
||
device = result.scalar_one_or_none()
|
||
if not device:
|
||
return None
|
||
device.is_online = payload.lower() == "online"
|
||
device.last_seen = datetime.now(timezone.utc)
|
||
await db.commit()
|
||
await db.refresh(device)
|
||
return device
|
||
|
||
|
||
async def get_dashboard_stats(db: AsyncSession) -> dict:
|
||
total = await db.execute(select(func.count()).select_from(Device))
|
||
online = await db.execute(select(func.count()).select_from(Device).where(Device.is_online == True))
|
||
recent_logs_result = await db.execute(
|
||
select(DeviceLog).order_by(desc(DeviceLog.timestamp)).limit(10)
|
||
)
|
||
return {
|
||
"total_devices": total.scalar() or 0,
|
||
"online_devices": online.scalar() or 0,
|
||
"offline_devices": (total.scalar() or 0) - (online.scalar() or 0),
|
||
"recent_logs": list(recent_logs_result.scalars().all()),
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: 创建 tests/test_device_registry.py**
|
||
|
||
```python
|
||
import pytest
|
||
from mqtt_home.device_registry import (
|
||
create_device, get_device, list_devices, delete_device,
|
||
update_device, send_command, handle_state_update, get_device_logs,
|
||
get_dashboard_stats,
|
||
)
|
||
from mqtt_home.schemas import DeviceCreate, DeviceUpdate
|
||
|
||
|
||
async def test_create_and_get_device(db_session):
|
||
data = DeviceCreate(name="客厅灯", type="light", mqtt_topic="home/light")
|
||
device = await create_device(db_session, data)
|
||
assert device.name == "客厅灯"
|
||
|
||
fetched = await get_device(db_session, device.id)
|
||
assert fetched is not None
|
||
assert fetched.id == device.id
|
||
|
||
|
||
async def test_list_devices(db_session):
|
||
await create_device(db_session, DeviceCreate(name="设备1", type="switch", mqtt_topic="t1"))
|
||
await create_device(db_session, DeviceCreate(name="设备2", type="sensor", mqtt_topic="t2"))
|
||
devices = await list_devices(db_session)
|
||
assert len(devices) == 2
|
||
|
||
|
||
async def test_update_device(db_session):
|
||
device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t"))
|
||
updated = await update_device(db_session, device.id, DeviceUpdate(name="新名字"))
|
||
assert updated.name == "新名字"
|
||
|
||
|
||
async def test_delete_device(db_session):
|
||
device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t"))
|
||
assert await delete_device(db_session, device.id) is True
|
||
assert await get_device(db_session, device.id) is None
|
||
|
||
|
||
async def test_delete_nonexistent_device(db_session):
|
||
assert await delete_device(db_session, "nonexistent") is False
|
||
|
||
|
||
async def test_send_command(db_session):
|
||
device = await create_device(
|
||
db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t", command_topic="t/set")
|
||
)
|
||
published = {}
|
||
|
||
async def mock_publish(topic, payload):
|
||
published[topic] = payload
|
||
|
||
log = await send_command(db_session, device.id, '{"state":"on"}', mock_publish)
|
||
assert log is not None
|
||
assert log.direction == "tx"
|
||
assert published["t/set"] == '{"state":"on"}'
|
||
|
||
|
||
async def test_send_command_no_command_topic(db_session):
|
||
device = await create_device(db_session, DeviceCreate(name="传感器", type="sensor", mqtt_topic="t"))
|
||
with pytest.raises(ValueError, match="no command_topic"):
|
||
await send_command(db_session, device.id, '{"value":1}')
|
||
|
||
|
||
async def test_handle_state_update(db_session):
|
||
device = await create_device(db_session, DeviceCreate(name="灯", type="light", mqtt_topic="home/light"))
|
||
updated = await handle_state_update(db_session, "home/light", '{"state":"on"}')
|
||
assert updated is not None
|
||
assert updated.state == '{"state":"on"}'
|
||
assert updated.is_online is True
|
||
|
||
|
||
async def test_handle_state_update_unknown_topic(db_session):
|
||
result = await handle_state_update(db_session, "unknown/topic", "on")
|
||
assert result is None
|
||
|
||
|
||
async def test_get_device_logs(db_session):
|
||
device = await create_device(
|
||
db_session, DeviceCreate(name="灯", type="light", mqtt_topic="t", command_topic="t/set")
|
||
)
|
||
await send_command(db_session, device.id, '{"state":"on"}', lambda t, p: None)
|
||
await send_command(db_session, device.id, '{"state":"off"}', lambda t, p: None)
|
||
|
||
logs = await get_device_logs(db_session, device.id, limit=10)
|
||
assert len(logs) == 2
|
||
|
||
|
||
async def test_dashboard_stats(db_session):
|
||
await create_device(db_session, DeviceCreate(name="在线设备", type="switch", mqtt_topic="t1"))
|
||
device2 = await create_device(db_session, DeviceCreate(name="离线设备", type="sensor", mqtt_topic="t2"))
|
||
await handle_state_update(db_session, "t1", "online")
|
||
|
||
stats = await get_dashboard_stats(db_session)
|
||
assert stats["total_devices"] == 2
|
||
assert stats["online_devices"] == 1
|
||
assert stats["offline_devices"] == 1
|
||
```
|
||
|
||
- [ ] **Step 3: 运行测试**
|
||
|
||
Run: `pytest tests/test_device_registry.py -v`
|
||
Expected: 11 passed
|
||
|
||
- [ ] **Step 4: 提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: device registry with CRUD, state tracking, command sending, and log management"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 7: HA Discovery 协议处理器
|
||
|
||
**Files:**
|
||
- Create: `src/mqtt_home/discovery.py`
|
||
- Create: `tests/test_discovery.py`
|
||
|
||
- [ ] **Step 1: 创建 src/mqtt_home/discovery.py**
|
||
|
||
```python
|
||
import json
|
||
import logging
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from mqtt_home.models import Device
|
||
from mqtt_home.mqtt_client import MqttClient
|
||
from mqtt_home.device_registry import create_device, handle_state_update
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# HA Discovery 配置主题格式: homeassistant/<component>/<node_id>/config
|
||
DISCOVERY_TOPIC_PREFIX = "homeassistant/"
|
||
|
||
|
||
def parse_discovery_topic(topic: str) -> dict[str, str] | None:
|
||
"""解析 HA Discovery 主题,返回 component 和 node_id"""
|
||
parts = topic.split("/")
|
||
if len(parts) < 4 or parts[0] != DISCOVERY_TOPIC_PREFIX.strip("/"):
|
||
return None
|
||
if parts[-1] != "config":
|
||
return None
|
||
return {
|
||
"component": parts[1],
|
||
"node_id": "/".join(parts[2:-1]),
|
||
}
|
||
|
||
|
||
async def handle_discovery_message(
|
||
topic: str, payload: str, db: AsyncSession, mqtt_client: MqttClient
|
||
) -> Device | None:
|
||
"""处理 HA Discovery config 消息,自动注册设备并订阅状态主题"""
|
||
parsed = parse_discovery_topic(topic)
|
||
if not parsed:
|
||
return None
|
||
|
||
try:
|
||
config = json.loads(payload)
|
||
except json.JSONDecodeError:
|
||
logger.warning("Invalid JSON in discovery payload for %s", topic)
|
||
return None
|
||
|
||
state_topic = config.get("state_topic")
|
||
command_topic = config.get("command_topic")
|
||
device_name = config.get("name", config.get("device", {}).get("name", parsed["node_id"]))
|
||
device_class = config.get("device_class", "")
|
||
|
||
if not state_topic:
|
||
logger.warning("Discovery config for %s has no state_topic", topic)
|
||
return None
|
||
|
||
# 检查是否已存在(按 discovery_topic 去重)
|
||
from sqlalchemy import select
|
||
result = await db.execute(
|
||
select(Device).where(Device.discovery_topic == topic)
|
||
)
|
||
existing = result.scalar_one_or_none()
|
||
if existing:
|
||
logger.debug("Device already registered: %s", topic)
|
||
return existing
|
||
|
||
from mqtt_home.schemas import DeviceCreate
|
||
device = await create_device(db, DeviceCreate(
|
||
name=device_name,
|
||
type=parsed["component"],
|
||
protocol="ha_discovery",
|
||
mqtt_topic=state_topic,
|
||
command_topic=command_topic,
|
||
))
|
||
|
||
# 更新 discovery 相关字段
|
||
device.discovery_topic = topic
|
||
device.discovery_payload = payload
|
||
device.attributes = json.dumps({
|
||
"device_class": device_class,
|
||
"unit_of_measurement": config.get("unit_of_measurement"),
|
||
"icon": config.get("icon"),
|
||
})
|
||
await db.commit()
|
||
await db.refresh(device)
|
||
|
||
# 订阅状态主题
|
||
await mqtt_client.subscribe(state_topic, qos=1)
|
||
logger.info("HA Discovery device registered: %s -> %s", device.name, state_topic)
|
||
|
||
return device
|
||
```
|
||
|
||
- [ ] **Step 2: 创建 tests/test_discovery.py**
|
||
|
||
```python
|
||
import json
|
||
import pytest
|
||
from unittest.mock import AsyncMock
|
||
|
||
from mqtt_home.discovery import parse_discovery_topic, handle_discovery_message
|
||
from mqtt_home.config import Settings
|
||
from mqtt_home.mqtt_client import MqttClient
|
||
|
||
|
||
@pytest.fixture
|
||
def settings():
|
||
return Settings(
|
||
mqtt_host="localhost",
|
||
emqx_api_url="http://localhost:18083/api/v5",
|
||
emqx_api_key="test-key",
|
||
emqx_api_secret="test-secret",
|
||
database_url="sqlite+aiosqlite:///:memory:",
|
||
)
|
||
|
||
|
||
def test_parse_discovery_topic_valid():
|
||
result = parse_discovery_topic("homeassistant/light/abc123/config")
|
||
assert result is not None
|
||
assert result["component"] == "light"
|
||
assert result["node_id"] == "abc123"
|
||
|
||
|
||
def test_parse_discovery_topic_nested_node():
|
||
result = parse_discovery_topic("homeassistant/sensor/room/temperature/config")
|
||
assert result is not None
|
||
assert result["component"] == "sensor"
|
||
assert result["node_id"] == "room/temperature"
|
||
|
||
|
||
def test_parse_discovery_topic_invalid():
|
||
assert parse_discovery_topic("other/topic/config") is None
|
||
assert parse_discovery_topic("homeassistant/light/abc") is None
|
||
assert parse_discovery_topic("homeassistant/light/abc/status") is None
|
||
|
||
|
||
async def test_handle_discovery_creates_device(db_session, settings):
|
||
mqtt_client = MqttClient(settings)
|
||
mqtt_client.subscribe = AsyncMock()
|
||
|
||
payload = json.dumps({
|
||
"name": "客厅灯",
|
||
"state_topic": "home/living/light/status",
|
||
"command_topic": "home/living/light/set",
|
||
"device_class": "light",
|
||
})
|
||
|
||
device = await handle_discovery_message(
|
||
"homeassistant/light/living_room/config",
|
||
payload,
|
||
db_session,
|
||
mqtt_client,
|
||
)
|
||
assert device is not None
|
||
assert device.name == "客厅灯"
|
||
assert device.protocol == "ha_discovery"
|
||
assert device.mqtt_topic == "home/living/light/status"
|
||
mqtt_client.subscribe.assert_called_once_with("home/living/light/status", qos=1)
|
||
|
||
|
||
async def test_handle_discovery_duplicate(db_session, settings):
|
||
mqtt_client = MqttClient(settings)
|
||
mqtt_client.subscribe = AsyncMock()
|
||
|
||
payload = json.dumps({
|
||
"name": "灯",
|
||
"state_topic": "home/light",
|
||
})
|
||
|
||
await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client)
|
||
device2 = await handle_discovery_message("homeassistant/light/test/config", payload, db_session, mqtt_client)
|
||
# 同一个 discovery_topic 不应该创建重复设备
|
||
assert device2 is not None
|
||
mqtt_client.subscribe.assert_called_once() # 只调用一次
|
||
```
|
||
|
||
- [ ] **Step 3: 运行测试**
|
||
|
||
Run: `pytest tests/test_discovery.py -v`
|
||
Expected: 5 passed
|
||
|
||
- [ ] **Step 4: 提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: HA Discovery protocol handler for auto-registering devices"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 8: FastAPI 应用与 REST API 路由
|
||
|
||
**Files:**
|
||
- Create: `src/mqtt_home/main.py`
|
||
- Create: `src/mqtt_home/api/__init__.py`
|
||
- Create: `src/mqtt_home/api/devices.py`
|
||
- Create: `src/mqtt_home/api/broker.py`
|
||
- Create: `src/mqtt_home/api/dashboard.py`
|
||
- Create: `tests/test_api_devices.py`
|
||
- Create: `tests/test_api_broker.py`
|
||
|
||
- [ ] **Step 1: 创建 src/mqtt_home/api/__init__.py**
|
||
|
||
```python
|
||
from fastapi import APIRouter
|
||
from mqtt_home.api.devices import router as devices_router
|
||
from mqtt_home.api.broker import router as broker_router
|
||
from mqtt_home.api.dashboard import router as dashboard_router
|
||
|
||
api_router = APIRouter(prefix="/api")
|
||
api_router.include_router(devices_router, prefix="/devices", tags=["devices"])
|
||
api_router.include_router(broker_router, prefix="/broker", tags=["broker"])
|
||
api_router.include_router(dashboard_router, prefix="/dashboard", tags=["dashboard"])
|
||
```
|
||
|
||
- [ ] **Step 2: 创建 src/mqtt_home/api/devices.py**
|
||
|
||
```python
|
||
from fastapi import APIRouter, Depends, HTTPException
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from mqtt_home.database import get_db
|
||
from mqtt_home.device_registry import (
|
||
list_devices, get_device, create_device, update_device,
|
||
delete_device, send_command, get_device_logs,
|
||
)
|
||
from mqtt_home.schemas import (
|
||
DeviceCreate, DeviceUpdate, DeviceCommand,
|
||
DeviceResponse, DeviceLogResponse,
|
||
)
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.get("", response_model=list[DeviceResponse])
|
||
async def get_devices(db: AsyncSession = Depends(get_db)):
|
||
return await list_devices(db)
|
||
|
||
|
||
@router.post("", response_model=DeviceResponse, status_code=201)
|
||
async def add_device(data: DeviceCreate, db: AsyncSession = Depends(get_db)):
|
||
return await create_device(db, data)
|
||
|
||
|
||
@router.get("/{device_id}", response_model=DeviceResponse)
|
||
async def get_device_detail(device_id: str, db: AsyncSession = Depends(get_db)):
|
||
device = await get_device(db, device_id)
|
||
if not device:
|
||
raise HTTPException(status_code=404, detail="Device not found")
|
||
return device
|
||
|
||
|
||
@router.put("/{device_id}", response_model=DeviceResponse)
|
||
async def patch_device(device_id: str, data: DeviceUpdate, db: AsyncSession = Depends(get_db)):
|
||
device = await update_device(db, device_id, data)
|
||
if not device:
|
||
raise HTTPException(status_code=404, detail="Device not found")
|
||
return device
|
||
|
||
|
||
@router.delete("/{device_id}", status_code=204)
|
||
async def remove_device(device_id: str, db: AsyncSession = Depends(get_db)):
|
||
if not await delete_device(db, device_id):
|
||
raise HTTPException(status_code=404, detail="Device not found")
|
||
|
||
|
||
@router.post("/{device_id}/command", response_model=DeviceLogResponse)
|
||
async def command_device(
|
||
device_id: str, data: DeviceCommand,
|
||
db: AsyncSession = Depends(get_db),
|
||
):
|
||
# 从应用状态获取 mqtt_client 和 publish_fn
|
||
from mqtt_home.main import app
|
||
mqtt_client = getattr(app.state, "mqtt_client", None)
|
||
if not mqtt_client or not mqtt_client.is_connected:
|
||
raise HTTPException(status_code=503, detail="MQTT not connected")
|
||
|
||
try:
|
||
log = await send_command(
|
||
db, device_id, data.payload,
|
||
publish_fn=mqtt_client.publish,
|
||
)
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
if not log:
|
||
raise HTTPException(status_code=404, detail="Device not found")
|
||
return log
|
||
|
||
|
||
@router.get("/{device_id}/logs", response_model=list[DeviceLogResponse])
|
||
async def read_device_logs(device_id: str, limit: int = 20, db: AsyncSession = Depends(get_db)):
|
||
return await get_device_logs(db, device_id, limit)
|
||
```
|
||
|
||
- [ ] **Step 3: 创建 src/mqtt_home/api/broker.py**
|
||
|
||
```python
|
||
from fastapi import APIRouter, Depends, HTTPException
|
||
|
||
from mqtt_home.schemas import BrokerClient, BrokerTopic
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.get("/status")
|
||
async def broker_status():
|
||
from mqtt_home.main import app
|
||
emqx = getattr(app.state, "emqx_client", None)
|
||
if not emqx:
|
||
raise HTTPException(status_code=503, detail="EMQX API client not configured")
|
||
try:
|
||
status = await emqx.get_broker_status()
|
||
metrics = await emqx.get_metrics()
|
||
return {"status": status, "metrics": metrics}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
|
||
|
||
|
||
@router.get("/clients", response_model=list[BrokerClient])
|
||
async def broker_clients(limit: int = 100):
|
||
from mqtt_home.main import app
|
||
emqx = getattr(app.state, "emqx_client", None)
|
||
if not emqx:
|
||
raise HTTPException(status_code=503, detail="EMQX API client not configured")
|
||
try:
|
||
return await emqx.get_clients(limit)
|
||
except Exception as e:
|
||
raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
|
||
|
||
|
||
@router.get("/topics", response_model=list[BrokerTopic])
|
||
async def broker_topics(limit: int = 100):
|
||
from mqtt_home.main import app
|
||
emqx = getattr(app.state, "emqx_client", None)
|
||
if not emqx:
|
||
raise HTTPException(status_code=503, detail="EMQX API client not configured")
|
||
try:
|
||
return await emqx.get_topics(limit)
|
||
except Exception as e:
|
||
raise HTTPException(status_code=502, detail=f"EMQX API error: {e}")
|
||
```
|
||
|
||
- [ ] **Step 4: 创建 src/mqtt_home/api/dashboard.py**
|
||
|
||
```python
|
||
from fastapi import APIRouter, Depends
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from mqtt_home.database import get_db
|
||
from mqtt_home.device_registry import get_dashboard_stats
|
||
from mqtt_home.schemas import DashboardStats
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.get("", response_model=DashboardStats)
|
||
async def dashboard(db: AsyncSession = Depends(get_db)):
|
||
return await get_dashboard_stats(db)
|
||
```
|
||
|
||
- [ ] **Step 5: 创建 src/mqtt_home/main.py**
|
||
|
||
```python
|
||
import asyncio
|
||
import logging
|
||
from contextlib import asynccontextmanager
|
||
|
||
from fastapi import FastAPI
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
|
||
from mqtt_home.config import get_settings
|
||
from mqtt_home.database import init_db, get_session_factory, Base
|
||
from mqtt_home.mqtt_client import MqttClient
|
||
from mqtt_home.emqx_api import EmqxApiClient
|
||
from mqtt_home.discovery import handle_discovery_message
|
||
from mqtt_home.device_registry import handle_state_update
|
||
from mqtt_home.api import api_router
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
settings = get_settings()
|
||
|
||
# 初始化数据库
|
||
await init_db()
|
||
logger.info("Database initialized")
|
||
|
||
# 初始化 EMQX API 客户端
|
||
emqx = EmqxApiClient(settings)
|
||
app.state.emqx_client = emqx
|
||
logger.info("EMQX API client initialized")
|
||
|
||
# 初始化 MQTT 客户端
|
||
mqtt = MqttClient(settings)
|
||
app.state.mqtt_client = mqtt
|
||
|
||
session_factory = get_session_factory()
|
||
|
||
# 注册 HA Discovery 回调
|
||
async def on_discovery(topic: str, payload: str):
|
||
async with session_factory() as db:
|
||
await handle_discovery_message(topic, payload, db, mqtt)
|
||
|
||
# 注册状态更新回调
|
||
async def on_state(topic: str, payload: str):
|
||
async with session_factory() as db:
|
||
device = await handle_state_update(db, topic, payload)
|
||
# WebSocket 推送会在前端 Task 中添加
|
||
|
||
mqtt.on_message("homeassistant/#", on_discovery)
|
||
mqtt.on_message("home/#", on_state)
|
||
|
||
await mqtt.start()
|
||
logger.info("MQTT client started")
|
||
|
||
yield
|
||
|
||
await mqtt.stop()
|
||
await emqx.close()
|
||
logger.info("Shutdown complete")
|
||
|
||
|
||
app = FastAPI(title="MQTT Home", version="0.1.0", lifespan=lifespan)
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
app.include_router(api_router)
|
||
|
||
|
||
@app.get("/health")
|
||
async def health():
|
||
mqtt = getattr(app.state, "mqtt_client", None)
|
||
return {
|
||
"status": "ok",
|
||
"mqtt_connected": mqtt.is_connected if mqtt else False,
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 6: 创建 tests/test_api_devices.py**
|
||
|
||
```python
|
||
import pytest
|
||
from httpx import AsyncClient, ASGITransport
|
||
|
||
from mqtt_home.main import app
|
||
from mqtt_home.database import Base, get_engine, get_session_factory
|
||
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
|
||
from sqlalchemy.pool import StaticPool
|
||
|
||
|
||
TEST_DB = "sqlite+aiosqlite:///:memory:"
|
||
|
||
|
||
@pytest.fixture
|
||
async def client():
|
||
engine = create_async_engine(TEST_DB, connect_args={"check_same_thread": False}, poolclass=StaticPool)
|
||
async with engine.begin() as conn:
|
||
await conn.run_sync(Base.metadata.create_all)
|
||
|
||
test_factory = async_sessionmaker(engine, expire_on_commit=False)
|
||
|
||
async def override_get_db():
|
||
async with test_factory() as session:
|
||
yield session
|
||
|
||
app.dependency_overrides[get_db.__wrapped__ if hasattr(get_db, '__wrapped__') else get_db] = override_get_db
|
||
|
||
transport = ASGITransport(app=app)
|
||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||
yield ac
|
||
|
||
app.dependency_overrides.clear()
|
||
await engine.dispose()
|
||
|
||
|
||
async def test_get_devices_empty(client):
|
||
resp = await client.get("/api/devices")
|
||
assert resp.status_code == 200
|
||
assert resp.json() == []
|
||
|
||
|
||
async def test_create_device(client):
|
||
resp = await client.post("/api/devices", json={
|
||
"name": "客厅灯",
|
||
"type": "light",
|
||
"mqtt_topic": "home/light",
|
||
"command_topic": "home/light/set",
|
||
})
|
||
assert resp.status_code == 201
|
||
data = resp.json()
|
||
assert data["name"] == "客厅灯"
|
||
assert data["type"] == "light"
|
||
|
||
|
||
async def test_get_device_detail(client):
|
||
create_resp = await client.post("/api/devices", json={
|
||
"name": "灯", "type": "switch", "mqtt_topic": "t"
|
||
})
|
||
device_id = create_resp.json()["id"]
|
||
|
||
resp = await client.get(f"/api/devices/{device_id}")
|
||
assert resp.status_code == 200
|
||
assert resp.json()["name"] == "灯"
|
||
|
||
|
||
async def test_get_device_not_found(client):
|
||
resp = await client.get("/api/devices/nonexistent")
|
||
assert resp.status_code == 404
|
||
|
||
|
||
async def test_update_device(client):
|
||
create_resp = await client.post("/api/devices", json={
|
||
"name": "灯", "type": "switch", "mqtt_topic": "t"
|
||
})
|
||
device_id = create_resp.json()["id"]
|
||
|
||
resp = await client.put(f"/api/devices/{device_id}", json={"name": "新名字"})
|
||
assert resp.status_code == 200
|
||
assert resp.json()["name"] == "新名字"
|
||
|
||
|
||
async def test_delete_device(client):
|
||
create_resp = await client.post("/api/devices", json={
|
||
"name": "灯", "type": "switch", "mqtt_topic": "t"
|
||
})
|
||
device_id = create_resp.json()["id"]
|
||
|
||
resp = await client.delete(f"/api/devices/{device_id}")
|
||
assert resp.status_code == 204
|
||
|
||
|
||
async def test_delete_device_not_found(client):
|
||
resp = await client.delete("/api/devices/nonexistent")
|
||
assert resp.status_code == 404
|
||
```
|
||
|
||
- [ ] **Step 7: 创建 tests/test_api_broker.py**
|
||
|
||
```python
|
||
import pytest
|
||
from httpx import AsyncClient, ASGITransport
|
||
from unittest.mock import AsyncMock, patch
|
||
|
||
from mqtt_home.main import app
|
||
|
||
|
||
@pytest.fixture
|
||
async def client():
|
||
transport = ASGITransport(app=app)
|
||
async with AsyncClient(transport=transport, base_url="http://test") as ac:
|
||
yield ac
|
||
|
||
|
||
async def test_broker_status_no_client(client):
|
||
# 没有 mqtt_client 时应返回 503
|
||
resp = await client.get("/api/broker/status")
|
||
assert resp.status_code == 503
|
||
|
||
|
||
async def test_broker_clients_no_client(client):
|
||
resp = await client.get("/api/broker/clients")
|
||
assert resp.status_code == 503
|
||
|
||
|
||
async def test_health_endpoint(client):
|
||
resp = await client.get("/health")
|
||
assert resp.status_code == 200
|
||
data = resp.json()
|
||
assert "status" in data
|
||
assert "mqtt_connected" in data
|
||
```
|
||
|
||
- [ ] **Step 8: 运行所有测试**
|
||
|
||
Run: `pytest tests/ -v`
|
||
Expected: All tests pass
|
||
|
||
- [ ] **Step 9: 提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: FastAPI app with device, broker, dashboard REST API routes"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 9: WebSocket 实时推送
|
||
|
||
**Files:**
|
||
- Modify: `src/mqtt_home/main.py`
|
||
- Create: `src/mqtt_home/ws.py`
|
||
|
||
- [ ] **Step 1: 创建 src/mqtt_home/ws.py**
|
||
|
||
```python
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from fastapi import WebSocket, WebSocketDisconnect
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class ConnectionManager:
|
||
def __init__(self):
|
||
self._connections: list[WebSocket] = []
|
||
|
||
async def connect(self, ws: WebSocket):
|
||
await ws.accept()
|
||
self._connections.append(ws)
|
||
logger.info("WebSocket connected, total: %d", len(self._connections))
|
||
|
||
def disconnect(self, ws: WebSocket):
|
||
self._connections.remove(ws)
|
||
logger.info("WebSocket disconnected, total: %d", len(self._connections))
|
||
|
||
async def broadcast(self, message: dict):
|
||
dead = []
|
||
for ws in self._connections:
|
||
try:
|
||
await ws.send_json(message)
|
||
except Exception:
|
||
dead.append(ws)
|
||
for ws in dead:
|
||
self.disconnect(ws)
|
||
|
||
|
||
ws_manager = ConnectionManager()
|
||
|
||
|
||
async def websocket_endpoint(ws: WebSocket):
|
||
await ws_manager.connect(ws)
|
||
try:
|
||
while True:
|
||
await ws.receive_text() # 保持连接,忽略客户端消息
|
||
except WebSocketDisconnect:
|
||
ws_manager.disconnect(ws)
|
||
|
||
|
||
async def broadcast_device_update(device_id: str, data: dict):
|
||
await ws_manager.broadcast({
|
||
"type": "device_update",
|
||
"device_id": device_id,
|
||
**data,
|
||
})
|
||
```
|
||
|
||
- [ ] **Step 2: 修改 src/mqtt_home/main.py,添加 WebSocket 路由和广播**
|
||
|
||
在 `from mqtt_home.api import api_router` 后面添加:
|
||
```python
|
||
from mqtt_home.ws import websocket_endpoint, broadcast_device_update
|
||
```
|
||
|
||
在 `app.include_router(api_router)` 后面添加:
|
||
```python
|
||
from mqtt_home.ws import ws_manager
|
||
|
||
app.websocket("/ws/devices")(websocket_endpoint)
|
||
```
|
||
|
||
修改 `on_state` 回调,添加 WebSocket 广播:
|
||
```python
|
||
async def on_state(topic: str, payload: str):
|
||
async with session_factory() as db:
|
||
device = await handle_state_update(db, topic, payload)
|
||
if device:
|
||
await broadcast_device_update(device.id, {
|
||
"state": device.state,
|
||
"is_online": device.is_online,
|
||
"last_seen": device.last_seen.isoformat() if device.last_seen else None,
|
||
})
|
||
```
|
||
|
||
- [ ] **Step 3: 提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: WebSocket endpoint for real-time device state updates"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 10: CLI 命令
|
||
|
||
**Files:**
|
||
- Create: `src/mqtt_home/cli.py`
|
||
- Create: `tests/test_cli.py`
|
||
|
||
- [ ] **Step 1: 创建 src/mqtt_home/cli.py**
|
||
|
||
```python
|
||
import asyncio
|
||
import json
|
||
import click
|
||
|
||
from mqtt_home.config import get_settings
|
||
from mqtt_home.database import init_db, get_session_factory
|
||
from mqtt_home.emqx_api import EmqxApiClient
|
||
from mqtt_home.device_registry import (
|
||
list_devices, get_device, create_device, delete_device,
|
||
send_command, get_device_logs,
|
||
)
|
||
from mqtt_home.schemas import DeviceCreate
|
||
from mqtt_home.mqtt_client import MqttClient
|
||
from mqtt_home.discovery import handle_discovery_message, DISCOVERY_TOPIC_PREFIX
|
||
from mqtt_home.device_registry import handle_state_update
|
||
|
||
|
||
def run_async(coro):
|
||
return asyncio.get_event_loop().run_until_complete(coro)
|
||
|
||
|
||
@click.group()
|
||
def cli():
|
||
"""MQTT 智能家居管理工具"""
|
||
pass
|
||
|
||
|
||
@cli.group()
|
||
def device():
|
||
"""设备管理"""
|
||
pass
|
||
|
||
|
||
@device.command("list")
|
||
def device_list():
|
||
"""列出所有设备"""
|
||
async def _run():
|
||
settings = get_settings()
|
||
await init_db()
|
||
factory = get_session_factory()
|
||
async with factory() as db:
|
||
devices = await list_devices(db)
|
||
if not devices:
|
||
click.echo("暂无设备")
|
||
return
|
||
for d in devices:
|
||
status = "🟢" if d.is_online else "🔴"
|
||
state = d.state or "-"
|
||
click.echo(f"{status} [{d.id[:8]}] {d.name} ({d.type}) state={state}")
|
||
|
||
run_async(_run())
|
||
|
||
|
||
@device.command("add")
|
||
@click.option("--name", required=True, help="设备名称")
|
||
@click.option("--type", "device_type", default="switch", help="设备类型")
|
||
@click.option("--state-topic", required=True, help="状态主题")
|
||
@click.option("--command-topic", default=None, help="命令主题")
|
||
def device_add(name, device_type, state_topic, command_topic):
|
||
"""手动添加设备"""
|
||
async def _run():
|
||
settings = get_settings()
|
||
await init_db()
|
||
factory = get_session_factory()
|
||
async with factory() as db:
|
||
d = await create_device(db, DeviceCreate(
|
||
name=name,
|
||
type=device_type,
|
||
mqtt_topic=state_topic,
|
||
command_topic=command_topic,
|
||
))
|
||
click.echo(f"设备已创建: {d.id} - {d.name}")
|
||
|
||
run_async(_run())
|
||
|
||
|
||
@device.command("info")
|
||
@click.argument("device_id")
|
||
def device_info(device_id):
|
||
"""查看设备详情"""
|
||
async def _run():
|
||
settings = get_settings()
|
||
await init_db()
|
||
factory = get_session_factory()
|
||
async with factory() as db:
|
||
d = await get_device(db, device_id)
|
||
if not d:
|
||
click.echo(f"设备不存在: {device_id}", err=True)
|
||
return
|
||
click.echo(f"ID: {d.id}")
|
||
click.echo(f"名称: {d.name}")
|
||
click.echo(f"类型: {d.type}")
|
||
click.echo(f"协议: {d.protocol}")
|
||
click.echo(f"状态主题: {d.mqtt_topic}")
|
||
click.echo(f"命令主题: {d.command_topic or '无'}")
|
||
click.echo(f"当前状态: {d.state or '无'}")
|
||
click.echo(f"在线: {'是' if d.is_online else '否'}")
|
||
click.echo(f"最后活跃: {d.last_seen or '从未'}")
|
||
|
||
run_async(_run())
|
||
|
||
|
||
@device.command("remove")
|
||
@click.argument("device_id")
|
||
def device_remove(device_id):
|
||
"""删除设备"""
|
||
async def _run():
|
||
settings = get_settings()
|
||
await init_db()
|
||
factory = get_session_factory()
|
||
async with factory() as db:
|
||
if await delete_device(db, device_id):
|
||
click.echo(f"设备已删除: {device_id}")
|
||
else:
|
||
click.echo(f"设备不存在: {device_id}", err=True)
|
||
|
||
run_async(_run())
|
||
|
||
|
||
@device.command("command")
|
||
@click.argument("device_id")
|
||
@click.option("--payload", required=True, help="命令内容(JSON)")
|
||
def device_command(device_id, payload):
|
||
"""向设备发送命令"""
|
||
async def _run():
|
||
settings = get_settings()
|
||
await init_db()
|
||
factory = get_session_factory()
|
||
|
||
# 连接 MQTT
|
||
mqtt = MqttClient(settings)
|
||
await mqtt._run.__wrapped__(mqtt) if False else None # 不启动消息循环
|
||
|
||
# 直接使用 aiomqtt 发送
|
||
import aiomqtt
|
||
async with aiomqtt.Client(
|
||
hostname=settings.mqtt_host,
|
||
port=settings.mqtt_port,
|
||
username=settings.mqtt_username or None,
|
||
password=settings.mqtt_password or None,
|
||
) as client:
|
||
async with factory() as db:
|
||
try:
|
||
log = await send_command(
|
||
db, device_id, payload,
|
||
publish_fn=lambda topic, p: client.publish(topic, p, qos=1),
|
||
)
|
||
if log:
|
||
click.echo(f"命令已发送到 {log.topic}: {log.payload}")
|
||
else:
|
||
click.echo(f"设备不存在: {device_id}", err=True)
|
||
except ValueError as e:
|
||
click.echo(str(e), err=True)
|
||
|
||
run_async(_run())
|
||
|
||
|
||
@device.command("logs")
|
||
@click.argument("device_id")
|
||
@click.option("--limit", default=20, help="日志条数")
|
||
def device_logs(device_id, limit):
|
||
"""查看设备消息日志"""
|
||
async def _run():
|
||
settings = get_settings()
|
||
await init_db()
|
||
factory = get_session_factory()
|
||
async with factory() as db:
|
||
logs = await get_device_logs(db, device_id, limit)
|
||
if not logs:
|
||
click.echo("暂无日志")
|
||
return
|
||
for log in logs:
|
||
direction = "RX" if log.direction == "rx" else "TX"
|
||
click.echo(f"[{log.timestamp}] {direction} {log.topic} | {log.payload}")
|
||
|
||
run_async(_run())
|
||
|
||
|
||
@cli.group()
|
||
def broker():
|
||
"""Broker 管理"""
|
||
pass
|
||
|
||
|
||
@broker.command("status")
|
||
def broker_status():
|
||
"""查看 Broker 状态"""
|
||
async def _run():
|
||
settings = get_settings()
|
||
emqx = EmqxApiClient(settings)
|
||
try:
|
||
status = await emqx.get_broker_status()
|
||
click.echo(f"版本: {status.get('version', 'unknown')}")
|
||
click.echo(f"运行时间: {status.get('uptime', 0)}s")
|
||
metrics = await emqx.get_metrics()
|
||
click.echo(f"连接数: {metrics.get('connections.count', 0)}")
|
||
click.echo(f"订阅数: {metrics.get('subscriptions.count', 0)}")
|
||
click.echo(f"主题数: {metrics.get('topics.count', 0)}")
|
||
except Exception as e:
|
||
click.echo(f"连接 EMQX 失败: {e}", err=True)
|
||
finally:
|
||
await emqx.close()
|
||
|
||
run_async(_run())
|
||
|
||
|
||
@broker.command("clients")
|
||
def broker_clients():
|
||
"""列出已连接客户端"""
|
||
async def _run():
|
||
settings = get_settings()
|
||
emqx = EmqxApiClient(settings)
|
||
try:
|
||
clients = await emqx.get_clients()
|
||
if not clients:
|
||
click.echo("暂无客户端连接")
|
||
return
|
||
for c in clients:
|
||
status = "在线" if c.get("connected") else "离线"
|
||
click.echo(f"[{c.get('clientid')}] {status} ip={c.get('ip_address')}")
|
||
except Exception as e:
|
||
click.echo(f"连接 EMQX 失败: {e}", err=True)
|
||
finally:
|
||
await emqx.close()
|
||
|
||
run_async(_run())
|
||
|
||
|
||
@broker.command("topics")
|
||
def broker_topics():
|
||
"""列出活跃主题"""
|
||
async def _run():
|
||
settings = get_settings()
|
||
emqx = EmqxApiClient(settings)
|
||
try:
|
||
topics = await emqx.get_topics()
|
||
if not topics:
|
||
click.echo("暂无活跃主题")
|
||
return
|
||
for t in topics:
|
||
click.echo(t.get("topic", ""))
|
||
except Exception as e:
|
||
click.echo(f"连接 EMQX 失败: {e}", err=True)
|
||
finally:
|
||
await emqx.close()
|
||
|
||
run_async(_run())
|
||
```
|
||
|
||
- [ ] **Step 2: 创建 tests/test_cli.py**
|
||
|
||
```python
|
||
import pytest
|
||
from click.testing import CliRunner
|
||
from mqtt_home.cli import cli
|
||
|
||
|
||
def test_device_list_empty(monkeypatch):
|
||
"""测试无设备时的 list 命令"""
|
||
runner = CliRunner()
|
||
|
||
async def mock_init():
|
||
pass
|
||
|
||
async def mock_list(db):
|
||
return []
|
||
|
||
monkeypatch.setattr("mqtt_home.cli.init_db", mock_init)
|
||
monkeypatch.setattr("mqtt_home.cli.list_devices", mock_list)
|
||
|
||
result = runner.invoke(cli, ["device", "list"])
|
||
assert result.exit_code == 0
|
||
assert "暂无设备" in result.output
|
||
|
||
|
||
def test_cli_groups():
|
||
runner = CliRunner()
|
||
result = runner.invoke(cli, ["--help"])
|
||
assert "device" in result.output
|
||
assert "broker" in result.output
|
||
```
|
||
|
||
- [ ] **Step 3: 运行测试**
|
||
|
||
Run: `pytest tests/test_cli.py -v`
|
||
Expected: 2 passed
|
||
|
||
- [ ] **Step 4: 提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: CLI commands for device management and broker monitoring"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 11: 创建 .env 并验证端到端
|
||
|
||
**Files:**
|
||
- Create: `.env`
|
||
|
||
- [ ] **Step 1: 创建 .env(从 .env.example 复制并填入实际值)**
|
||
|
||
```
|
||
MQTT_HOST=192.168.0.31
|
||
MQTT_PORT=1883
|
||
MQTT_USERNAME=
|
||
MQTT_PASSWORD=
|
||
EMQX_API_URL=http://192.168.0.31:18083/api/v5
|
||
EMQX_API_KEY=ed74cf28e117c5f6
|
||
EMQX_API_SECRET=fED31TaybOUXx9CtkabICmT0wEZuUPXSU0vPwIekSbbC
|
||
DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db
|
||
WEB_HOST=0.0.0.0
|
||
WEB_PORT=8000
|
||
```
|
||
|
||
- [ ] **Step 2: 启动服务验证**
|
||
|
||
Run: `python -m mqtt_home serve`
|
||
Expected: 服务启动成功,日志显示 "MQTT connected" 和 "Database initialized"
|
||
|
||
- [ ] **Step 3: 用 CLI 验证 broker 连接**
|
||
|
||
Run: `mqtt-home broker status`
|
||
Expected: 显示 EMQX 版本和统计信息
|
||
|
||
- [ ] **Step 4: 用 API 验证**
|
||
|
||
Run: `curl http://localhost:8000/health`
|
||
Expected: `{"status":"ok","mqtt_connected":true}`
|
||
|
||
Run: `curl http://localhost:8000/api/broker/clients`
|
||
Expected: 客户端列表 JSON
|
||
|
||
- [ ] **Step 5: 提交(.env 已在 .gitignore 中)**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "feat: verify end-to-end backend with EMQX broker connection"
|
||
```
|
||
|
||
---
|
||
|
||
### Task 12: 全量测试确认
|
||
|
||
- [ ] **Step 1: 运行所有测试**
|
||
|
||
Run: `pytest tests/ -v`
|
||
Expected: 所有测试通过
|
||
|
||
- [ ] **Step 2: 修复任何失败的测试**
|
||
|
||
根据实际输出修复问题。
|
||
|
||
- [ ] **Step 3: 最终提交**
|
||
|
||
```bash
|
||
git add -A
|
||
git commit -m "test: ensure all backend tests pass"
|
||
```
|
||
|
||
---
|
||
|
||
## 自检结果
|
||
|
||
| 设计文档章节 | 对应 Task |
|
||
|---|---|
|
||
| 项目结构 | Task 1 |
|
||
| 数据模型 (devices, device_logs) | Task 2 |
|
||
| Pydantic Schemas | Task 3 |
|
||
| EMQX REST API 封装 | Task 4 |
|
||
| MQTT 客户端 + 重连 + 通配符 | Task 5 |
|
||
| 设备注册表 (CRUD + 状态 + 命令) | Task 6 |
|
||
| HA Discovery 协议 | Task 7 |
|
||
| REST API 路由 (devices, broker, dashboard) | Task 8 |
|
||
| WebSocket 实时推送 | Task 9 |
|
||
| CLI 命令 | Task 10 |
|
||
| 配置 / .env | Task 11 |
|