# 透析通讯服务 - MQTT 集成文档 ## 概述 本文档说明如何将透析通讯服务与第三方 MQTT 服务集成,实现透析机数据的实时上传和推送。 --- ## 1. 系统架构 ``` 透析机设备 ↓ (Socket 连接) Socket 服务器 (端口 10961) ↓ (处理数据) 数据转换模块 ↓ (发布) MQTT Broker ↓ (订阅) 第三方服务/应用 ``` --- ## 2. MQTT 配置 ### 2.1 配置文件位置 配置文件 `mqtt.json` 必须与程序在同一目录。 ### 2.2 配置参数 创建或编辑 `mqtt.json`: ```json { "enabled": true, "brokerUrl": "mqtt.example.com", "port": 1883, "username": "your_username", "password": "your_password", "reconnectPeriod": 5000, "defaultTopicPrefix": "your_topic_prefix" } ``` | 参数 | 类型 | 说明 | 示例 | |------|------|------|------| | `enabled` | boolean | 是否启用 MQTT | `true` / `false` | | `brokerUrl` | string | MQTT Broker 地址 | `mqtt.ihemodialysis.com` | | `port` | number | MQTT Broker 端口 | `1883` (不加密) 或 `8883` (TLS) | | `username` | string | 连接用户名 | `data` | | `password` | string | 连接密码 | `data#2018` | | `reconnectPeriod` | number | 断线重连间隔(毫秒) | `5000` | | `defaultTopicPrefix` | string | MQTT 主题前缀 | `touxiji` | --- ## 3. 数据发布 ### 3.1 发布主题格式 ``` {defaultTopicPrefix}/{deviceNumber} ``` **示例:** - 前缀:`touxiji` - 设备号:`D001` - 完整主题:`touxiji/D001` ### 3.2 发布消息格式 每次设备上报数据时,服务器向 MQTT 发布以下格式的消息(JSON): ```json { "n": "D001", "parameter1": "value1", "parameter2": "value2", "...": "...", "deviceId": "192.168.1.100:54321", "timestamp": "2025-12-03T10:30:45.123Z" } ``` | 字段 | 类型 | 说明 | |------|------|------| | `n` | string | 透析机设备号 | | `deviceId` | string | 设备的 Socket 连接地址和端口 | | `timestamp` | string | 数据上报时间戳 (ISO 8601 格式) | | 其他字段 | mixed | 来自设备的具体透析参数 | ### 3.3 发布质量等级 (QoS) - 默认 QoS 级别:**1** (至少一次投递) - 确保消息不会丢失,适合关键数据 ### 3.4 发布频率 - 触发方式:**事件驱动** (设备发送数据时立即发布) - 无固定发布间隔 - 取决于设备的数据上报频率 --- ## 4. 连接管理 ### 4.1 连接过程 1. 程序启动时读取 `mqtt.json` 配置 2. 若 `enabled: true`,则自动连接到 MQTT Broker 3. 连接成功后开始接收设备数据 4. 每接收到一条完整设备消息,立即发送到 MQTT ### 4.2 断线重连 - **自动重连**:是的 - **重连间隔**:由 `reconnectPeriod` 参数控制(默认 5秒) - **重连策略**:指数退避 ### 4.3 心跳保活 - MQTT 连接采用标准 MQTT 心跳机制 - 确保连接持续稳定 ### 4.4 断开连接 - 关闭程序时自动断开 MQTT 连接 - 设备断开时不影响 MQTT 连接 --- ## 5. 日志和监控 ### 5.1 日志输出 程序会记录以下信息: ``` ✅ MQTT 连接成功: mqtt.ihemodialysis.com:62283 (用户: data) 📡 已通过 MQTT 发送数据到 touxiji/D001 📤 MQTT 已发布到 touxiji/D001 🔌 MQTT 连接断开 🔄 MQTT 正在重连... ❌ MQTT 错误: ... ``` ### 5.2 日志位置 - 日志存储在 `logs/` 目录 - 用于调试和问题排查 --- ## 6. 第三方服务接入 ### 6.1 订阅数据 第三方服务/应用需要: 1. **连接到同一个 MQTT Broker** ``` 地址:mqtt.ihemodialysis.com 端口:62283 用户名:data 密码:data#2018 ``` 2. **订阅相关主题** ``` 订阅模式:touxiji/+ (订阅所有设备) 或 订阅模式:touxiji/D001 (订阅特定设备) ``` 3. **处理接收的消息** ```json { "n": "D001", "parameter1": "value1", "deviceId": "192.168.1.100:54321", "timestamp": "2025-12-03T10:30:45.123Z" } ``` ### 6.2 Python 示例 ```python import paho.mqtt.client as mqtt import json def on_connect(client, userdata, flags, rc): if rc == 0: print("连接成功") # 订阅所有透析机数据 client.subscribe("touxiji/+") else: print(f"连接失败: {rc}") def on_message(client, userdata, msg): try: data = json.loads(msg.payload.decode()) print(f"收到数据来自 {data['n']}: {data}") # 处理数据逻辑 except Exception as e: print(f"解析错误: {e}") client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.username_pw_set("data", "data#2018") client.connect("mqtt.ihemodialysis.com", 62283, keepalive=60) client.loop_forever() ``` ### 6.3 Node.js 示例 ```javascript const mqtt = require('mqtt'); const options = { host: 'mqtt.ihemodialysis.com', port: 62283, username: 'data', password: 'data#2018' }; const client = mqtt.connect(options); client.on('connect', () => { console.log('连接成功'); client.subscribe('touxiji/+', (err) => { if (!err) { console.log('订阅成功'); } }); }); client.on('message', (topic, message) => { try { const data = JSON.parse(message.toString()); console.log(`收到数据来自 ${data.n}:`, data); // 处理数据逻辑 } catch (e) { console.error('解析错误:', e); } }); client.on('error', (err) => { console.error('连接错误:', err); }); ``` ### 6.4 Java 示例 ```java import org.eclipse.paho.client.mqttv3.*; import com.google.gson.Gson; import com.google.gson.JsonObject; public class MqttClient { public static void main(String[] args) { String brokerUrl = "tcp://mqtt.ihemodialysis.com:62283"; String clientId = "third_party_service_" + System.currentTimeMillis(); try { MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName("data"); options.setPassword("data#2018".toCharArray()); options.setAutomaticReconnect(true); client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out.println("连接断开"); } @Override public void messageArrived(String topic, MqttMessage message) { try { String payload = new String(message.getPayload()); Gson gson = new Gson(); JsonObject data = gson.fromJson(payload, JsonObject.class); System.out.println("收到数据: " + data); // 处理数据逻辑 } catch (Exception e) { e.printStackTrace(); } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); client.connect(options); client.subscribe("touxiji/+"); } catch (MqttException e) { e.printStackTrace(); } } } ``` --- ## 7. 故障排查 ### 7.1 无法连接到 MQTT **问题:** `连接失败` 或 `连接超时` **排查步骤:** 1. 检查 Broker 地址和端口是否正确 2. 检查网络连接 3. 检查防火墙规则 4. 验证用户名和密码 **日志示例:** ``` ❌ MQTT 错误: getaddrinfo ENOTFOUND mqtt.example.com ``` ### 7.2 认证失败 **问题:** `认证错误` 或 `密码错误` **排查步骤:** 1. 验证 `mqtt.json` 中的用户名和密码 2. 检查密码中是否有特殊字符(需要正确转义) 3. 重新启动程序 **日志示例:** ``` ❌ MQTT 错误: Not authorized ``` ### 7.3 数据未发送 **问题:** 设备已连接,但 MQTT 中看不到数据 **排查步骤:** 1. 检查 `mqtt.json` 中 `enabled` 是否为 `true` 2. 确认设备确实在发送数据(查看设备连接日志) 3. 验证 MQTT 主题前缀是否正确 4. 查看 MQTT Broker 的订阅情况 **日志示例:** ``` 📡 已通过 MQTT 发送数据到 touxiji/D001 ``` ### 7.4 频繁断线重连 **问题:** MQTT 连接不稳定,频繁断开 **排查步骤:** 1. 检查网络稳定性 2. 增加 `reconnectPeriod` 值 3. 检查 Broker 服务器状态 4. 查看 Broker 日志 --- ## 8. 安全建议 ### 8.1 用户名和密码 - ✅ 使用强密码 - ❌ 不要在代码中硬编码敏感信息 - ✅ 使用环境变量管理配置 ### 8.2 网络安全 - 如果需要加密通信,使用 TLS/SSL(端口 8883) - 限制 MQTT Broker 访问 IP 范围 ### 8.3 消息安全 - QoS 1 确保消息投递 - 实现消息校验机制(可选) --- ## 9. 性能指标 | 指标 | 值 | |------|-----| | 最大设备连接数 | 无限制(取决于硬件) | | 单个消息大小 | ≤ 1MB | | 发送延迟 | < 100ms(通常) | | 重连间隔 | 可配置(默认 5秒) | | 心跳间隔 | 60秒 | --- ## 10. 常见问题 (FAQ) ### Q1: 能否修改 MQTT 主题格式? **A:** 可以。编辑 `mqtt.json` 中的 `defaultTopicPrefix` 字段。 ```json { "defaultTopicPrefix": "your_custom_prefix" } ``` 主题将变为:`your_custom_prefix/{deviceNumber}` ### Q2: 如果 MQTT 和阿里云同时启用会怎样? **A:** 两个都会工作。MQTT 用于实时数据推送,阿里云用于备份存储。 ### Q3: 设备掉线后数据会保留吗? **A:** 不会。数据是实时发送的,掉线数据会丢失。 ### Q4: 支持 QoS 0 吗? **A:** 不支持。服务器固定使用 QoS 1 以确保可靠性。 ### Q5: 如何处理同一主题的多个订阅者? **A:** MQTT Broker 会自动向所有订阅者发送消息,无需额外配置。 --- ## 11. 联系方式 如有技术问题,请查看程序日志或联系系统管理员。 日志位置:`logs/` 目录 --- **文档版本:** 1.0 **更新时间:** 2025年12月 **适用程序版本:** 1.0+