本文档说明如何将透析通讯服务与第三方 MQTT 服务集成,实现透析机数据的实时上传和推送。
透析机设备
↓ (Socket 连接)
Socket 服务器 (端口 10961)
↓ (处理数据)
数据转换模块
↓ (发布)
MQTT Broker
↓ (订阅)
第三方服务/应用
配置文件 mqtt.json 必须与程序在同一目录。
创建或编辑 mqtt.json:
{
"enabled": true,
"brokerUrl": "mqtt.example.com",
"port": 1883,
"username": "your_username",
"password": "your_password",
"reconnectPeriod": 5000,
"defaultTopicPrefix": "your_topic_prefix"
}
| 参数 | 类型 | 说明 | 示例 |
|---|---|---|---|
enabled |
boolean | 是否启用 MQTT | true / false |
brokerUrl |
string | MQTT Broker 地址 | mqtt.ihemodialysis.com |
port |
number | MQTT Broker 端口 | 1883 (不加密) 或 8883 (TLS) |
username |
string | 连接用户名 | data |
password |
string | 连接密码 | data#2018 |
reconnectPeriod |
number | 断线重连间隔(毫秒) | 5000 |
defaultTopicPrefix |
string | MQTT 主题前缀 | touxiji |
{defaultTopicPrefix}/{deviceNumber}
示例:
- 前缀:touxiji
- 设备号:D001
- 完整主题:touxiji/D001
每次设备上报数据时,服务器向 MQTT 发布以下格式的消息(JSON):
{
"n": "D001",
"parameter1": "value1",
"parameter2": "value2",
"...": "...",
"deviceId": "192.168.1.100:54321",
"timestamp": "2025-12-03T10:30:45.123Z"
}
| 字段 | 类型 | 说明 |
|---|---|---|
n |
string | 透析机设备号 |
deviceId |
string | 设备的 Socket 连接地址和端口 |
timestamp |
string | 数据上报时间戳 (ISO 8601 格式) |
| 其他字段 | mixed | 来自设备的具体透析参数 |
mqtt.json 配置enabled: true,则自动连接到 MQTT BrokerreconnectPeriod 参数控制(默认 5秒)程序会记录以下信息:
✅ MQTT 连接成功: mqtt.ihemodialysis.com:62283 (用户: data)
📡 已通过 MQTT 发送数据到 touxiji/D001
📤 MQTT 已发布到 touxiji/D001
🔌 MQTT 连接断开
🔄 MQTT 正在重连...
❌ MQTT 错误: ...
logs/ 目录第三方服务/应用需要:
连接到同一个 MQTT Broker
地址:mqtt.ihemodialysis.com 端口:62283 用户名:data 密码:data#2018
订阅相关主题
订阅模式:touxiji/+ (订阅所有设备) 或 订阅模式:touxiji/D001 (订阅特定设备)
处理接收的消息
json { "n": "D001", "parameter1": "value1", "deviceId": "192.168.1.100:54321", "timestamp": "2025-12-03T10:30:45.123Z" }
import paho.mqtt.client as mqtt
import json
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("连接成功")
# 订阅所有透析机数据
client.subscribe("touxiji/+")
else:
print(f"连接失败: {rc}")
def on_message(client, userdata, msg):
try:
data = json.loads(msg.payload.decode())
print(f"收到数据来自 {data['n']}: {data}")
# 处理数据逻辑
except Exception as e:
print(f"解析错误: {e}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set("data", "data#2018")
client.connect("mqtt.ihemodialysis.com", 62283, keepalive=60)
client.loop_forever()
const mqtt = require('mqtt');
const options = {
host: 'mqtt.ihemodialysis.com',
port: 62283,
username: 'data',
password: 'data#2018'
};
const client = mqtt.connect(options);
client.on('connect', () => {
console.log('连接成功');
client.subscribe('touxiji/+', (err) => {
if (!err) {
console.log('订阅成功');
}
});
});
client.on('message', (topic, message) => {
try {
const data = JSON.parse(message.toString());
console.log(`收到数据来自 ${data.n}:`, data);
// 处理数据逻辑
} catch (e) {
console.error('解析错误:', e);
}
});
client.on('error', (err) => {
console.error('连接错误:', err);
});
import org.eclipse.paho.client.mqttv3.*;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
public class MqttClient {
public static void main(String[] args) {
String brokerUrl = "tcp://mqtt.ihemodialysis.com:62283";
String clientId = "third_party_service_" + System.currentTimeMillis();
try {
MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("data");
options.setPassword("data#2018".toCharArray());
options.setAutomaticReconnect(true);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("连接断开");
}
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
String payload = new String(message.getPayload());
Gson gson = new Gson();
JsonObject data = gson.fromJson(payload, JsonObject.class);
System.out.println("收到数据: " + data);
// 处理数据逻辑
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
client.connect(options);
client.subscribe("touxiji/+");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
问题: 连接失败 或 连接超时
排查步骤:
1. 检查 Broker 地址和端口是否正确
2. 检查网络连接
3. 检查防火墙规则
4. 验证用户名和密码
日志示例: ❌ MQTT 错误: getaddrinfo ENOTFOUND mqtt.example.com
问题: 认证错误 或 密码错误
排查步骤:
1. 验证 mqtt.json 中的用户名和密码
2. 检查密码中是否有特殊字符(需要正确转义)
3. 重新启动程序
日志示例: ❌ MQTT 错误: Not authorized
问题: 设备已连接,但 MQTT 中看不到数据
排查步骤:
1. 检查 mqtt.json 中 enabled 是否为 true
2. 确认设备确实在发送数据(查看设备连接日志)
3. 验证 MQTT 主题前缀是否正确
4. 查看 MQTT Broker 的订阅情况
日志示例: 📡 已通过 MQTT 发送数据到 touxiji/D001
问题: MQTT 连接不稳定,频繁断开
排查步骤:
1. 检查网络稳定性
2. 增加 reconnectPeriod 值
3. 检查 Broker 服务器状态
4. 查看 Broker 日志
| 指标 | 值 |
|---|---|
| 最大设备连接数 | 无限制(取决于硬件) |
| 单个消息大小 | ≤ 1MB |
| 发送延迟 | < 100ms(通常) |
| 重连间隔 | 可配置(默认 5秒) |
| 心跳间隔 | 60秒 |
A: 可以。编辑 mqtt.json 中的 defaultTopicPrefix 字段。
{
"defaultTopicPrefix": "your_custom_prefix"
}
主题将变为:your_custom_prefix/{deviceNumber}
A: 两个都会工作。MQTT 用于实时数据推送,阿里云用于备份存储。
A: 不会。数据是实时发送的,掉线数据会丢失。
A: 不支持。服务器固定使用 QoS 1 以确保可靠性。
A: MQTT Broker 会自动向所有订阅者发送消息,无需额外配置。
如有技术问题,请查看程序日志或联系系统管理员。
日志位置:logs/ 目录
文档版本: 1.0
更新时间: 2025年12月
适用程序版本: 1.0+