/**
|
* MQTT 客户端管理模块
|
* 负责设备数据上传到 MQTT 服务
|
*/
|
|
const mqtt = require('mqtt');
|
const config = require('./config.json');
|
const appLogger = require('./loggerConfig');
|
|
// 获取 MQTT 配置
|
const mqttConfig = config.mqtt;
|
|
/**
|
* 从配置文件构建MQTT代理URL
|
*/
|
function buildBrokerUrl(brokerUrl, port) {
|
// 如果URL已包含协议,直接使用
|
if (brokerUrl.startsWith('mqtt://') || brokerUrl.startsWith('mqtts://')) {
|
return brokerUrl;
|
}
|
// 否则添加mqtt://前缀和端口
|
return `mqtt://${brokerUrl}:${port}`;
|
}
|
|
class MQTTClient {
|
constructor(deviceSerialNumber) {
|
// 检查MQTT是否启用
|
if (!mqttConfig.enabled) {
|
this.enabled = false;
|
this.deviceSerialNumber = deviceSerialNumber;
|
return;
|
}
|
|
this.enabled = true;
|
this.deviceSerialNumber = deviceSerialNumber;
|
this.client = null;
|
this.isConnected = false;
|
this.topicPrefix = `${mqttConfig.defaultTopicPrefix}/${deviceSerialNumber}`;
|
}
|
|
/**
|
* 连接到 MQTT 服务
|
*/
|
connect() {
|
if (!this.enabled) {
|
return Promise.resolve();
|
}
|
|
return new Promise((resolve, reject) => {
|
const brokerUrl = buildBrokerUrl(mqttConfig.brokerUrl, mqttConfig.port);
|
|
this.client = mqtt.connect(brokerUrl, {
|
username: mqttConfig.username,
|
password: mqttConfig.password,
|
clientId: `device_${this.deviceSerialNumber}`,
|
reconnectPeriod: mqttConfig.reconnectPeriod,
|
connectTimeout: 10000,
|
clean: true
|
});
|
|
this.client.on('connect', () => {
|
this.isConnected = true;
|
appLogger.logMqttConnected(this.deviceSerialNumber, `${buildBrokerUrl(mqttConfig.brokerUrl, mqttConfig.port)}`);
|
console.log(`✓ MQTT 连接成功 [设备: ${this.deviceSerialNumber}]`);
|
|
// 订阅设备相关主题
|
this.subscribe();
|
resolve();
|
});
|
|
this.client.on('error', (err) => {
|
appLogger.logMqttConnectionError(this.deviceSerialNumber, err);
|
console.error(`✗ MQTT 连接错误 [设备: ${this.deviceSerialNumber}]:`, err.message);
|
this.isConnected = false;
|
reject(err);
|
});
|
|
this.client.on('offline', () => {
|
this.isConnected = false;
|
appLogger.logMqttOffline(this.deviceSerialNumber);
|
console.warn(`⚠ MQTT 离线 [设备: ${this.deviceSerialNumber}]`);
|
});
|
|
this.client.on('reconnect', () => {
|
appLogger.logMqttReconnecting(this.deviceSerialNumber);
|
console.log(`⟳ MQTT 重新连接中 [设备: ${this.deviceSerialNumber}]`);
|
});
|
});
|
}
|
|
/**
|
* 订阅设备相关主题
|
*/
|
subscribe() {
|
if (!this.enabled || !this.client) return;
|
|
const topics = [
|
`${this.topicPrefix}/command`, // 命令主题
|
`${this.topicPrefix}/config` // 配置主题
|
];
|
|
topics.forEach(topic => {
|
this.client.subscribe(topic, (err) => {
|
if (err) {
|
console.error(`订阅主题失败 [${topic}]:`, err.message);
|
} else {
|
console.log(`✓ 已订阅主题: ${topic}`);
|
}
|
});
|
});
|
|
// 监听消息
|
this.client.on('message', (topic, message) => {
|
console.log(`\n📩 收到 MQTT 消息 [${topic}]:`, message.toString());
|
});
|
}
|
|
/**
|
* 发布关键医疗数据
|
* @param {object} keyData - 关键数据对象
|
* @param {string} clientIp - 设备 IP 地址
|
*/
|
publishKeyData(keyData, clientIp) {
|
if (!this.enabled || !this.isConnected) {
|
if (this.enabled && !this.isConnected) {
|
console.warn(`⚠ 设备 ${this.deviceSerialNumber} 未连接到 MQTT,无法发布数据`);
|
appLogger.logMqttPublishError(this.deviceSerialNumber, `${this.topicPrefix}`,
|
new Error('Device not connected to MQTT'), '');
|
}
|
return;
|
}
|
|
const topic = `${this.topicPrefix}`;
|
const payload = JSON.stringify({
|
timestamp: new Date().toISOString(),
|
deviceSN: this.deviceSerialNumber,
|
IPAddress: clientIp,
|
data: {
|
// 基本信息
|
deviceModel: keyData.deviceModel,
|
softwareVersion: keyData.softwareVersion,
|
|
// 超滤参数
|
ultraFiltrateTarget: keyData.ultraFiltrateTarget,
|
ultraFiltrateTotal: keyData.ultraFiltrateTotal,
|
ultrafiltrateRateSet: keyData.ultrafiltrateRateSet,
|
|
// 透析液参数
|
dialysisFluidFlow: keyData.dialysisFluidFlow,
|
dialysisFluidActual: keyData.dialysisFluidActual,
|
dialysisFluidTemp: keyData.dialysisFluidTemp,
|
sodiumConc: keyData.sodiumConc,
|
conductivity: keyData.conductivity,
|
|
// 血液/血流参数
|
bloodFlow: keyData.bloodFlow,
|
effectiveBloodFlow: keyData.effectiveBloodFlow,
|
dialysisBloodVolume: keyData.dialysisBloodVolume,
|
returnedBloodVolume: keyData.returnedBloodVolume,
|
plasmaNA: keyData.plasmaNA,
|
clearanceRate: keyData.clearanceRate,
|
|
// 质量指标
|
instantKT: keyData.instantKT,
|
ktvTarget: keyData.ktvTarget,
|
|
// 电解质参数
|
bicarbonate: keyData.bicarbonate,
|
|
// 压力参数
|
arterialPressure: keyData.arterialPressure,
|
venousPressure: keyData.venousPressure,
|
transmembranePressure: keyData.transmembranePressure,
|
|
// 膜型参数
|
dialysisMode: keyData.dialysisMode,
|
|
// 状态参数
|
runStatus: keyData.runStatus,
|
alarmStatus: keyData.alarmStatus,
|
treatmentDuration: keyData.treatmentDuration,
|
|
// 置换参数
|
replacementFluidTarget: keyData.replacementFluidTarget,
|
replacementRate: keyData.replacementRate,
|
replacementVolume: keyData.replacementVolume
|
}
|
});
|
|
// 记录发送前的信息
|
appLogger.logMqttDataSending(this.deviceSerialNumber, topic, 'properties', payload);
|
|
this.client.publish(topic, payload, { qos: 1 }, (err) => {
|
if (err) {
|
appLogger.logMqttPublishError(this.deviceSerialNumber, topic, err, payload);
|
console.error(`发布失败 [${topic}]:`, err.message);
|
} else {
|
appLogger.logMqttPublishSuccess(this.deviceSerialNumber, topic, 'properties', payload);
|
}
|
});
|
}
|
|
/**
|
* 发布状态事件
|
* @param {object} event - 事件对象
|
*/
|
publishEvent(event) {
|
if (!this.enabled || !this.isConnected) {
|
if (this.enabled && !this.isConnected) {
|
console.warn(`⚠ 设备 ${this.deviceSerialNumber} 未连接到 MQTT,无法发布事件`);
|
appLogger.logMqttPublishError(this.deviceSerialNumber, `${this.topicPrefix}/event`,
|
new Error('Device not connected to MQTT'), '');
|
}
|
return;
|
}
|
|
const topic = `${this.topicPrefix}/event`;
|
const payload = JSON.stringify({
|
timestamp: new Date().toISOString(),
|
deviceSN: this.deviceSerialNumber,
|
eventType: event.type,
|
description: event.description,
|
severity: event.severity
|
});
|
|
// 记录发送前的信息
|
appLogger.logMqttDataSending(this.deviceSerialNumber, topic, 'event', payload);
|
|
this.client.publish(topic, payload, { qos: 1 }, (err) => {
|
if (err) {
|
appLogger.logMqttPublishError(this.deviceSerialNumber, topic, err, payload);
|
console.error(`发布事件失败:`, err.message);
|
} else {
|
appLogger.logMqttPublishSuccess(this.deviceSerialNumber, topic, 'event', payload);
|
}
|
});
|
}
|
|
/**
|
* 断开连接
|
*/
|
disconnect() {
|
if (!this.enabled || !this.client) return;
|
|
this.client.end(true, () => {
|
this.isConnected = false;
|
console.log(`✓ MQTT 连接已关闭 [设备: ${this.deviceSerialNumber}]`);
|
});
|
}
|
|
/**
|
* 获取连接状态
|
*/
|
getConnectionStatus() {
|
return {
|
deviceSerialNumber: this.deviceSerialNumber,
|
mqttEnabled: this.enabled,
|
isConnected: this.isConnected,
|
topicPrefix: this.topicPrefix
|
};
|
}
|
}
|
|
module.exports = MQTTClient;
|