/** * MQTT 客户端管理模块 * 负责设备数据上传到 MQTT 服务 */ const mqtt = require('mqtt'); const config = require('./config.json'); const appLogger = require('./loggerConfig'); // 获取 MQTT 配置 const mqttConfig = config.mqtt; /** * 从配置文件构建MQTT代理URL */ function buildBrokerUrl(brokerUrl, port) { // 如果URL已包含协议,直接使用 if (brokerUrl.startsWith('mqtt://') || brokerUrl.startsWith('mqtts://')) { return brokerUrl; } // 否则添加mqtt://前缀和端口 return `mqtt://${brokerUrl}:${port}`; } class MQTTClient { constructor(deviceSerialNumber) { // 检查MQTT是否启用 if (!mqttConfig.enabled) { this.enabled = false; this.deviceSerialNumber = deviceSerialNumber; return; } this.enabled = true; this.deviceSerialNumber = deviceSerialNumber; this.client = null; this.isConnected = false; this.topicPrefix = `${mqttConfig.defaultTopicPrefix}/${deviceSerialNumber}`; } /** * 连接到 MQTT 服务 */ connect() { if (!this.enabled) { return Promise.resolve(); } return new Promise((resolve, reject) => { const brokerUrl = buildBrokerUrl(mqttConfig.brokerUrl, mqttConfig.port); this.client = mqtt.connect(brokerUrl, { username: mqttConfig.username, password: mqttConfig.password, clientId: `device_${this.deviceSerialNumber}`, reconnectPeriod: mqttConfig.reconnectPeriod, connectTimeout: 10000, clean: true }); this.client.on('connect', () => { this.isConnected = true; appLogger.logMqttConnected(this.deviceSerialNumber, `${buildBrokerUrl(mqttConfig.brokerUrl, mqttConfig.port)}`); console.log(`✓ MQTT 连接成功 [设备: ${this.deviceSerialNumber}]`); // 订阅设备相关主题 this.subscribe(); resolve(); }); this.client.on('error', (err) => { appLogger.logMqttConnectionError(this.deviceSerialNumber, err); console.error(`✗ MQTT 连接错误 [设备: ${this.deviceSerialNumber}]:`, err.message); this.isConnected = false; reject(err); }); this.client.on('offline', () => { this.isConnected = false; appLogger.logMqttOffline(this.deviceSerialNumber); console.warn(`⚠ MQTT 离线 [设备: ${this.deviceSerialNumber}]`); }); this.client.on('reconnect', () => { appLogger.logMqttReconnecting(this.deviceSerialNumber); console.log(`⟳ MQTT 重新连接中 [设备: ${this.deviceSerialNumber}]`); }); }); } /** * 订阅设备相关主题 */ subscribe() { if (!this.enabled || !this.client) return; const topics = [ `${this.topicPrefix}/command`, // 命令主题 `${this.topicPrefix}/config` // 配置主题 ]; topics.forEach(topic => { this.client.subscribe(topic, (err) => { if (err) { console.error(`订阅主题失败 [${topic}]:`, err.message); } else { console.log(`✓ 已订阅主题: ${topic}`); } }); }); // 监听消息 this.client.on('message', (topic, message) => { console.log(`\n📩 收到 MQTT 消息 [${topic}]:`, message.toString()); }); } /** * 发布关键医疗数据 * @param {object} keyData - 关键数据对象 * @param {string} clientIp - 设备 IP 地址 */ publishKeyData(keyData, clientIp) { if (!this.enabled || !this.isConnected) { if (this.enabled && !this.isConnected) { console.warn(`⚠ 设备 ${this.deviceSerialNumber} 未连接到 MQTT,无法发布数据`); appLogger.logMqttPublishError(this.deviceSerialNumber, `${this.topicPrefix}`, new Error('Device not connected to MQTT'), ''); } return; } const topic = `${this.topicPrefix}`; const payload = JSON.stringify({ timestamp: new Date().toISOString(), deviceSN: this.deviceSerialNumber, IPAddress: clientIp, data: { // 基本信息 deviceModel: keyData.deviceModel, softwareVersion: keyData.softwareVersion, // 超滤参数 ultraFiltrateTarget: keyData.ultraFiltrateTarget, ultraFiltrateTotal: keyData.ultraFiltrateTotal, ultrafiltrateRateSet: keyData.ultrafiltrateRateSet, // 透析液参数 dialysisFluidFlow: keyData.dialysisFluidFlow, dialysisFluidActual: keyData.dialysisFluidActual, dialysisFluidTemp: keyData.dialysisFluidTemp, sodiumConc: keyData.sodiumConc, conductivity: keyData.conductivity, // 血液/血流参数 bloodFlow: keyData.bloodFlow, effectiveBloodFlow: keyData.effectiveBloodFlow, dialysisBloodVolume: keyData.dialysisBloodVolume, returnedBloodVolume: keyData.returnedBloodVolume, plasmaNA: keyData.plasmaNA, clearanceRate: keyData.clearanceRate, // 质量指标 instantKT: keyData.instantKT, ktvTarget: keyData.ktvTarget, // 电解质参数 bicarbonate: keyData.bicarbonate, // 压力参数 arterialPressure: keyData.arterialPressure, venousPressure: keyData.venousPressure, transmembranePressure: keyData.transmembranePressure, // 膜型参数 dialysisMode: keyData.dialysisMode, // 状态参数 runStatus: keyData.runStatus, alarmStatus: keyData.alarmStatus, treatmentDuration: keyData.treatmentDuration, // 置换参数 replacementFluidTarget: keyData.replacementFluidTarget, replacementRate: keyData.replacementRate, replacementVolume: keyData.replacementVolume } }); // 记录发送前的信息 appLogger.logMqttDataSending(this.deviceSerialNumber, topic, 'properties', payload); this.client.publish(topic, payload, { qos: 1 }, (err) => { if (err) { appLogger.logMqttPublishError(this.deviceSerialNumber, topic, err, payload); console.error(`发布失败 [${topic}]:`, err.message); } else { appLogger.logMqttPublishSuccess(this.deviceSerialNumber, topic, 'properties', payload); } }); } /** * 发布状态事件 * @param {object} event - 事件对象 */ publishEvent(event) { if (!this.enabled || !this.isConnected) { if (this.enabled && !this.isConnected) { console.warn(`⚠ 设备 ${this.deviceSerialNumber} 未连接到 MQTT,无法发布事件`); appLogger.logMqttPublishError(this.deviceSerialNumber, `${this.topicPrefix}/event`, new Error('Device not connected to MQTT'), ''); } return; } const topic = `${this.topicPrefix}/event`; const payload = JSON.stringify({ timestamp: new Date().toISOString(), deviceSN: this.deviceSerialNumber, eventType: event.type, description: event.description, severity: event.severity }); // 记录发送前的信息 appLogger.logMqttDataSending(this.deviceSerialNumber, topic, 'event', payload); this.client.publish(topic, payload, { qos: 1 }, (err) => { if (err) { appLogger.logMqttPublishError(this.deviceSerialNumber, topic, err, payload); console.error(`发布事件失败:`, err.message); } else { appLogger.logMqttPublishSuccess(this.deviceSerialNumber, topic, 'event', payload); } }); } /** * 断开连接 */ disconnect() { if (!this.enabled || !this.client) return; this.client.end(true, () => { this.isConnected = false; console.log(`✓ MQTT 连接已关闭 [设备: ${this.deviceSerialNumber}]`); }); } /** * 获取连接状态 */ getConnectionStatus() { return { deviceSerialNumber: this.deviceSerialNumber, mqttEnabled: this.enabled, isConnected: this.isConnected, topicPrefix: this.topicPrefix }; } } module.exports = MQTTClient;