8.7 KiB
8.7 KiB
MQTT 智能家居管理系统 - 设计文档
概述
一个轻量级的智能家居管理项目,通过自建 EMQX Broker 管理 MQTT 设备。提供 Web 管理页面和 CLI 命令行工具,支持 Home Assistant Discovery 协议和自定义协议。
技术选型
| 组件 | 选择 | 理由 |
|---|---|---|
| 后端框架 | FastAPI | 原生异步,适合 MQTT 长连接;自带 API 文档 |
| MQTT 客户端 | aiomqtt (paho-mqtt) | 异步兼容 FastAPI |
| 前端 | Vue 3 + Vite + Tailwind CSS + Pinia | 现代单页应用,响应式 |
| 数据库 | SQLite + aiosqlite + SQLAlchemy | 零配置,单文件,适合单进程 |
| CLI | click | 与 Web API 共享业务逻辑 |
| 配置管理 | pydantic-settings | 类型安全的环境变量加载 |
架构方案:单体 FastAPI
采用单体架构,FastAPI 同时处理 REST API 和 MQTT 连接。单进程部署,Web 和 CLI 共享同一套业务逻辑。
EMQX Broker <--MQTT--> FastAPI MQTT Client <---> SQLite
| ^
REST API CLI (click)
|
Vue SPA
优势: 最简部署(一个 Python 进程 + 一个 SQLite 文件),零逻辑重复。 局限: MQTT 客户端和 Web 服务共享进程,不适合多实例水平扩展。
项目结构
mqtt-home/
├── pyproject.toml # 依赖管理、入口点
├── .env # EMQX 凭据(gitignore)
├── .env.example # 配置模板
├── src/
│ └── mqtt_home/
│ ├── __init__.py
│ ├── config.py # 环境变量配置(pydantic-settings)
│ ├── main.py # FastAPI 应用 + 生命周期(启停 MQTT)
│ ├── database.py # SQLite 引擎、会话、Base
│ ├── models.py # SQLAlchemy ORM 模型
│ ├── schemas.py # Pydantic 请求/响应模型
│ ├── mqtt_client.py # aiomqtt 客户端:订阅、发布、回调
│ ├── device_registry.py # 业务逻辑:添加/删除/控制设备
│ ├── discovery.py # HA Discovery 协议处理器
│ ├── emqx_api.py # EMQX REST API 封装
│ ├── cli.py # click CLI 命令
│ └── api/
│ ├── __init__.py
│ ├── devices.py # REST 路由:设备增删改查 + 控制
│ ├── broker.py # REST 路由:Broker 状态、客户端、主题
│ └── dashboard.py # REST 路由:聚合统计
└── frontend/ # Vue SPA
└── ...
模块职责:
device_registry.py— 设备业务逻辑的唯一入口,API 路由和 CLI 都从此导入mqtt_client.py— MQTT 连接生命周期管理discovery.py— 在 mqtt_client 上注册 HA Discovery 主题回调emqx_api.py— EMQX REST API 的薄封装,处理认证和端点映射- 模块间通过直接函数调用通信,无消息总线或事件队列
数据模型
devices 设备表
| 字段 | 类型 | 说明 |
|---|---|---|
| id | TEXT (UUID) | 主键 |
| name | TEXT | 设备名称 |
| type | TEXT | 设备类型:switch, light, sensor, binary_sensor, climate 等 |
| protocol | TEXT | ha_discovery 或 custom |
| mqtt_topic | TEXT | 状态主题(如 homeassistant/light/living_room/status) |
| command_topic | TEXT | 命令主题(发送控制指令) |
| discovery_topic | TEXT | 原始 HA 发现主题(可为空) |
| discovery_payload | TEXT (JSON) | 原始发现配置(可为空) |
| attributes | TEXT (JSON) | 设备属性 |
| state | TEXT | 当前状态(on, off, 温度值等) |
| is_online | BOOLEAN | 在线状态(根据 last_seen 超时判断) |
| last_seen | DATETIME | 最后收到 MQTT 消息的时间 |
| created_at | DATETIME | 创建时间 |
| updated_at | DATETIME | 更新时间 |
device_logs 设备日志表
| 字段 | 类型 | 说明 |
|---|---|---|
| id | INTEGER (自增) | 主键 |
| device_id | TEXT (外键) | 关联设备 |
| direction | TEXT | rx(收到)或 tx(发送命令) |
| topic | TEXT | MQTT 主题 |
| payload | TEXT | 消息内容 |
| timestamp | DATETIME | 时间戳 |
每设备保留最近 100 条日志,超出自动清理。
设备状态流转
- HA Discovery 自动注册 — 收到
homeassistant/<type>/<node_id>/config消息时自动创建设备记录 - 手动添加 — 通过 API/CLI 指定主题、类型、命令主题
- 状态更新 — 设备状态主题收到消息 → 更新 state + last_seen
- 设备控制 — API/CLI 向命令主题发布消息 → 同时记录日志
- 在线判断 — last_seen 超过 60 秒标记为离线
MQTT 集成
HA Discovery 协议(被动,自动)
- 启动时订阅
homeassistant/# - 解析
config主题 → 自动创建设备记录 - 自动订阅每个设备的
state_topic - 处理
availability主题判断设备上下线 - 支持标准 HA Discovery 所有字段:device class, unit of measurement, state class 等
自定义协议(手动,用户定义)
- 用户通过 CLI 或 Web UI 添加设备,指定:
- 状态主题(监听状态更新的位置)
- 命令主题(发送控制指令的位置)
- JSON 负载格式(简单模板,如
{"state": "{{value}}"})
- 系统订阅状态主题并追踪变化
通用 MQTT 行为
- 使用
aiomqtt(基于paho-mqtt的异步封装) - 所有订阅使用 QoS 1(至少一次投递)
- 使用 Retained 消息保持状态持久化(订阅时读取保留消息)
- 连接断开时指数退避重连(初始 1 秒,最大 60 秒)
- 客户端 ID:
mqtt-home-{uuid}
API 设计
REST API(FastAPI 路由)
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/devices |
获取所有设备列表 |
| POST | /api/devices |
手动添加设备 |
| GET | /api/devices/{id} |
获取设备详情 |
| PUT | /api/devices/{id} |
更新设备配置 |
| DELETE | /api/devices/{id} |
删除设备 |
| POST | /api/devices/{id}/command |
向设备发送控制命令 |
| GET | /api/devices/{id}/logs |
获取设备最近消息日志 |
| GET | /api/broker/status |
EMQX Broker 统计信息 |
| GET | /api/broker/clients |
已连接 MQTT 客户端列表 |
| GET | /api/broker/topics |
活跃主题列表 |
| GET | /api/dashboard |
聚合统计(设备在线/离线数、最近活动) |
WebSocket
/ws/devices— 设备状态变化实时推送(state, is_online 变更时推送)
CLI 命令
# 设备管理
mqtt-home device list
mqtt-home device add --name "客厅灯" --type light --state-topic home/living/light --command-topic home/living/light/set
mqtt-home device info <device_id>
mqtt-home device remove <device_id>
mqtt-home device command <device_id> --payload '{"state":"on"}'
mqtt-home device logs <device_id> [--limit 20]
# Broker 管理
mqtt-home broker status
mqtt-home broker clients
mqtt-home broker topics
# 服务启动
python -m mqtt_home serve
前端设计
页面
- 仪表盘 — 设备数量统计、在线/离线状态、最近活动时间线
- 设备列表 — 网格视图,每个设备卡片显示类型图标、名称、当前状态、在线指示灯、快捷切换
- 设备详情 — 状态展示、控制面板(根据类型显示开关/滑块/数值输入)、消息日志
- Broker 管理 — 已连接客户端列表、活跃主题、Broker 健康状态
关键交互
- WebSocket 实时推送设备状态变化
- 设备卡片根据设备类型显示对应图标和颜色
- 控制面板根据设备类型自适应(switch → 切换按钮,light → 开关+亮度滑块,sensor → 只读数值显示)
- 响应式布局,支持移动端访问
配置
环境变量
MQTT_HOST=192.168.0.31
MQTT_PORT=1883
MQTT_USERNAME=
MQTT_PASSWORD=
EMQX_API_URL=http://192.168.0.31:18083/api/v5
EMQX_API_KEY=<your-api-key>
EMQX_API_SECRET=<your-secret>
DATABASE_URL=sqlite+aiosqlite:///./data/mqtt_home.db
WEB_HOST=0.0.0.0
WEB_PORT=8000
EMQX 连接信息
- MQTT Broker:
192.168.0.31:1883 - EMQX Dashboard:
http://192.168.0.31:18083 - EMQX REST API:
http://192.168.0.31:18083/api/v5 - 认证方式:API Key + Secret Key(HTTP Basic Auth)
错误处理
- MQTT 连接失败:记录日志,自动重连,Web 端显示连接状态
- EMQX API 请求失败:返回错误信息,不阻塞核心功能
- 设备命令发送失败:记录日志,返回错误给用户
- 数据库操作失败:事务回滚,返回 500 错误
部署
- Python 3.11+
pip install -e .安装python -m mqtt_home serve启动(Web + MQTT)- CLI 可独立使用,不需要启动 Web 服务
- 可选 docker-compose 部署