实现设备自动发现规则管理系统,包含以下主要功能: 1. 新增规则管理页面和API,支持创建、编辑和删除主题匹配规则 2. 添加规则匹配引擎,支持+和#通配符匹配设备主题 3. 实现Broker设备注册表,自动发现并管理符合规则的设备 4. 扩展仪表盘显示Broker信息和活跃主题 5. 修改设备卡片和详情页以区分规则发现的设备 6. 添加相关测试用例确保功能稳定性
75 lines
2.2 KiB
Python
75 lines
2.2 KiB
Python
from mqtt_home.topic_matcher import match_topic, extract_device_id, build_command_topic, extract_state_value
|
|
|
|
|
|
def test_match_topic_single_plus():
|
|
assert match_topic("home/fire/state", "home/+/state") == {"1": "fire"}
|
|
|
|
|
|
def test_match_topic_no_match():
|
|
assert match_topic("home/fire/state", "home/living/state") is None
|
|
|
|
|
|
def test_match_topic_hash():
|
|
# # at end matches remaining segments
|
|
result = match_topic("home/fire/brightness", "home/fire/#")
|
|
assert result is not None
|
|
assert "2" in result # # is at index 2
|
|
|
|
|
|
def test_match_topic_multiple_plus():
|
|
assert match_topic("home/fire/living/state", "home/+/+/state") == {"1": "fire", "2": "living"}
|
|
|
|
|
|
def test_match_topic_hash_matches_empty():
|
|
assert match_topic("home/fire/", "home/fire/#") is not None
|
|
|
|
|
|
def test_extract_device_id_single_plus():
|
|
assert extract_device_id("home/fire/state", "home/+/state") == "fire"
|
|
|
|
|
|
def test_extract_device_id_no_match():
|
|
assert extract_device_id("home/fire/state", "home/living/state") is None
|
|
|
|
|
|
def test_extract_device_id_hash():
|
|
assert extract_device_id("home/fire/brightness", "home/fire/#") == "brightness"
|
|
|
|
|
|
def test_extract_device_id_multiple_plus():
|
|
# Returns last + match
|
|
assert extract_device_id("home/fire/living/state", "home/+/+/state") == "living"
|
|
|
|
|
|
def test_build_command_topic_basic():
|
|
assert build_command_topic("home/{device_id}/set", "fire") == "home/fire/set"
|
|
|
|
|
|
def test_build_command_topic_none():
|
|
assert build_command_topic(None, "fire") is None
|
|
assert build_command_topic("", "fire") is None
|
|
|
|
|
|
def test_extract_state_value_json_path():
|
|
assert extract_state_value('{"state":"on"}', "state") == "on"
|
|
|
|
|
|
def test_extract_state_value_no_path():
|
|
assert extract_state_value('{"state":"on"}', None) == '{"state":"on"}'
|
|
|
|
|
|
def test_extract_state_value_plain_text():
|
|
assert extract_state_value("plain text", "state") == "plain text"
|
|
|
|
|
|
def test_extract_state_value_nested_path():
|
|
assert extract_state_value('{"brightness":255}', "brightness") == "255"
|
|
|
|
|
|
def test_extract_state_value_missing_key():
|
|
assert extract_state_value('{"temperature":22}', "humidity") == '{"temperature":22}'
|
|
|
|
|
|
def test_extract_state_value_numeric_value():
|
|
assert extract_state_value('{"brightness":255}', "brightness") == "255"
|