东丽网口版透析机 socket- server 通讯
编辑 | blame | 历史 | 原始文档

透析通讯服务 - MQTT 集成文档

概述

本文档说明如何将透析通讯服务与第三方 MQTT 服务集成,实现透析机数据的实时上传和推送。


1. 系统架构

透析机设备 
    ↓ (Socket 连接)
Socket 服务器 (端口 10961)
    ↓ (处理数据)
数据转换模块
    ↓ (发布)
MQTT Broker
    ↓ (订阅)
第三方服务/应用

2. MQTT 配置

2.1 配置文件位置

配置文件 mqtt.json 必须与程序在同一目录。

2.2 配置参数

创建或编辑 mqtt.json

{
    "enabled": true,
    "brokerUrl": "mqtt.example.com",
    "port": 1883,
    "username": "your_username",
    "password": "your_password",
    "reconnectPeriod": 5000,
    "defaultTopicPrefix": "your_topic_prefix"
}
参数 类型 说明 示例
enabled boolean 是否启用 MQTT true / false
brokerUrl string MQTT Broker 地址 mqtt.ihemodialysis.com
port number MQTT Broker 端口 1883 (不加密) 或 8883 (TLS)
username string 连接用户名 data
password string 连接密码 data#2018
reconnectPeriod number 断线重连间隔(毫秒) 5000
defaultTopicPrefix string MQTT 主题前缀 touxiji

3. 数据发布

3.1 发布主题格式

{defaultTopicPrefix}/{deviceNumber}

示例:
- 前缀:touxiji
- 设备号:D001
- 完整主题:touxiji/D001

3.2 发布消息格式

每次设备上报数据时,服务器向 MQTT 发布以下格式的消息(JSON):

{
    "n": "D001",
    "parameter1": "value1",
    "parameter2": "value2",
    "...": "...",
    "deviceId": "192.168.1.100:54321",
    "timestamp": "2025-12-03T10:30:45.123Z"
}
字段 类型 说明
n string 透析机设备号
deviceId string 设备的 Socket 连接地址和端口
timestamp string 数据上报时间戳 (ISO 8601 格式)
其他字段 mixed 来自设备的具体透析参数

3.3 发布质量等级 (QoS)

  • 默认 QoS 级别:**1** (至少一次投递)
  • 确保消息不会丢失,适合关键数据

3.4 发布频率

  • 触发方式:**事件驱动** (设备发送数据时立即发布)
  • 无固定发布间隔
  • 取决于设备的数据上报频率

4. 连接管理

4.1 连接过程

  1. 程序启动时读取 mqtt.json 配置
  2. enabled: true,则自动连接到 MQTT Broker
  3. 连接成功后开始接收设备数据
  4. 每接收到一条完整设备消息,立即发送到 MQTT

4.2 断线重连

  • 自动重连:是的
  • 重连间隔:由 reconnectPeriod 参数控制(默认 5秒)
  • 重连策略:指数退避

4.3 心跳保活

  • MQTT 连接采用标准 MQTT 心跳机制
  • 确保连接持续稳定

4.4 断开连接

  • 关闭程序时自动断开 MQTT 连接
  • 设备断开时不影响 MQTT 连接

5. 日志和监控

5.1 日志输出

程序会记录以下信息:

✅ MQTT 连接成功: mqtt.ihemodialysis.com:62283 (用户: data)
📡 已通过 MQTT 发送数据到 touxiji/D001
📤 MQTT 已发布到 touxiji/D001
🔌 MQTT 连接断开
🔄 MQTT 正在重连...
❌ MQTT 错误: ...

5.2 日志位置

  • 日志存储在 logs/ 目录
  • 用于调试和问题排查

6. 第三方服务接入

6.1 订阅数据

第三方服务/应用需要:

  1. 连接到同一个 MQTT Broker
    地址:mqtt.ihemodialysis.com 端口:62283 用户名:data 密码:data#2018

  2. 订阅相关主题
    订阅模式:touxiji/+ (订阅所有设备) 或 订阅模式:touxiji/D001 (订阅特定设备)

  3. 处理接收的消息
    json { "n": "D001", "parameter1": "value1", "deviceId": "192.168.1.100:54321", "timestamp": "2025-12-03T10:30:45.123Z" }

6.2 Python 示例

import paho.mqtt.client as mqtt
import json

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("连接成功")
        # 订阅所有透析机数据
        client.subscribe("touxiji/+")
    else:
        print(f"连接失败: {rc}")

def on_message(client, userdata, msg):
    try:
        data = json.loads(msg.payload.decode())
        print(f"收到数据来自 {data['n']}: {data}")
        # 处理数据逻辑
    except Exception as e:
        print(f"解析错误: {e}")

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.username_pw_set("data", "data#2018")
client.connect("mqtt.ihemodialysis.com", 62283, keepalive=60)

client.loop_forever()

6.3 Node.js 示例

const mqtt = require('mqtt');

const options = {
    host: 'mqtt.ihemodialysis.com',
    port: 62283,
    username: 'data',
    password: 'data#2018'
};

const client = mqtt.connect(options);

client.on('connect', () => {
    console.log('连接成功');
    client.subscribe('touxiji/+', (err) => {
        if (!err) {
            console.log('订阅成功');
        }
    });
});

client.on('message', (topic, message) => {
    try {
        const data = JSON.parse(message.toString());
        console.log(`收到数据来自 ${data.n}:`, data);
        // 处理数据逻辑
    } catch (e) {
        console.error('解析错误:', e);
    }
});

client.on('error', (err) => {
    console.error('连接错误:', err);
});

6.4 Java 示例

import org.eclipse.paho.client.mqttv3.*;
import com.google.gson.Gson;
import com.google.gson.JsonObject;

public class MqttClient {
    public static void main(String[] args) {
        String brokerUrl = "tcp://mqtt.ihemodialysis.com:62283";
        String clientId = "third_party_service_" + System.currentTimeMillis();
        
        try {
            MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName("data");
            options.setPassword("data#2018".toCharArray());
            options.setAutomaticReconnect(true);
            
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    System.out.println("连接断开");
                }
                
                @Override
                public void messageArrived(String topic, MqttMessage message) {
                    try {
                        String payload = new String(message.getPayload());
                        Gson gson = new Gson();
                        JsonObject data = gson.fromJson(payload, JsonObject.class);
                        System.out.println("收到数据: " + data);
                        // 处理数据逻辑
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                }
            });
            
            client.connect(options);
            client.subscribe("touxiji/+");
            
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

7. 故障排查

7.1 无法连接到 MQTT

问题: 连接失败连接超时

排查步骤:
1. 检查 Broker 地址和端口是否正确
2. 检查网络连接
3. 检查防火墙规则
4. 验证用户名和密码

日志示例:
❌ MQTT 错误: getaddrinfo ENOTFOUND mqtt.example.com

7.2 认证失败

问题: 认证错误密码错误

排查步骤:
1. 验证 mqtt.json 中的用户名和密码
2. 检查密码中是否有特殊字符(需要正确转义)
3. 重新启动程序

日志示例:
❌ MQTT 错误: Not authorized

7.3 数据未发送

问题: 设备已连接,但 MQTT 中看不到数据

排查步骤:
1. 检查 mqtt.jsonenabled 是否为 true
2. 确认设备确实在发送数据(查看设备连接日志)
3. 验证 MQTT 主题前缀是否正确
4. 查看 MQTT Broker 的订阅情况

日志示例:
📡 已通过 MQTT 发送数据到 touxiji/D001

7.4 频繁断线重连

问题: MQTT 连接不稳定,频繁断开

排查步骤:
1. 检查网络稳定性
2. 增加 reconnectPeriod
3. 检查 Broker 服务器状态
4. 查看 Broker 日志


8. 安全建议

8.1 用户名和密码

  • ✅ 使用强密码
  • ❌ 不要在代码中硬编码敏感信息
  • ✅ 使用环境变量管理配置

8.2 网络安全

  • 如果需要加密通信,使用 TLS/SSL(端口 8883)
  • 限制 MQTT Broker 访问 IP 范围

8.3 消息安全

  • QoS 1 确保消息投递
  • 实现消息校验机制(可选)

9. 性能指标

指标
最大设备连接数 无限制(取决于硬件)
单个消息大小 ≤ 1MB
发送延迟 < 100ms(通常)
重连间隔 可配置(默认 5秒)
心跳间隔 60秒

10. 常见问题 (FAQ)

Q1: 能否修改 MQTT 主题格式?

A: 可以。编辑 mqtt.json 中的 defaultTopicPrefix 字段。

{
    "defaultTopicPrefix": "your_custom_prefix"
}

主题将变为:your_custom_prefix/{deviceNumber}

Q2: 如果 MQTT 和阿里云同时启用会怎样?

A: 两个都会工作。MQTT 用于实时数据推送,阿里云用于备份存储。

Q3: 设备掉线后数据会保留吗?

A: 不会。数据是实时发送的,掉线数据会丢失。

Q4: 支持 QoS 0 吗?

A: 不支持。服务器固定使用 QoS 1 以确保可靠性。

Q5: 如何处理同一主题的多个订阅者?

A: MQTT Broker 会自动向所有订阅者发送消息,无需额外配置。


11. 联系方式

如有技术问题,请查看程序日志或联系系统管理员。

日志位置:logs/ 目录


文档版本: 1.0
更新时间: 2025年12月
适用程序版本: 1.0+