| | |
| | | const { info, warn, error } = require('./logger'); // 引入自定义的日志模块 |
| | | // mqttClient.js |
| | | const { info, warn, error } = require('./logger'); |
| | | const mqtt = require('mqtt'); |
| | | |
| | | const brokerUrl = 'mqtt-test.ihemodialysis.com'; |
| | | const port = 62183; |
| | | |
| | | const options = { |
| | | host: brokerUrl, |
| | | port: port, |
| | | reconnectPeriod: 5000, // 自动重连间隔时间 |
| | | }; |
| | | |
| | | let client = null; |
| | | let publishEnabled = false; |
| | | let config = null; |
| | | |
| | | // 初始化客户端连接 |
| | | function connect() { |
| | | try { |
| | | client = mqtt.connect(options); |
| | | |
| | | client.on('connect', () => { |
| | | console.log('Connected to MQTT Broker'); |
| | | info(`成功完成连接到 MQTT Broker: ${brokerUrl}:${port}`); |
| | | publishEnabled = true; |
| | | |
| | | // 可选:启动定时发布任务 |
| | | // startPeriodicPublish(); |
| | | }); |
| | | |
| | | client.on('close', () => { |
| | | console.log('MQTT connection closed, will reconnect...'); |
| | | publishEnabled = false; |
| | | stopPeriodicPublish(); |
| | | }); |
| | | |
| | | client.on('error', (err) => { |
| | | console.error('MQTT Error:', err); |
| | | error(`MQTT 错误: ${err.message}`); |
| | | }); |
| | | |
| | | client.on('reconnect', () => { |
| | | console.log('MQTT client reconnecting...'); |
| | | }); |
| | | } catch (ex) { |
| | | console.error('Error during connecting to MQTT Broker:', ex); |
| | | error(`尝试连接 MQTT Broker 时发生错误: ${ex.message}`); |
| | | } |
| | | } |
| | | |
| | | let intervalId = null; |
| | | |
| | | function startPeriodicPublish() { |
| | | if (intervalId) return; |
| | | |
| | | intervalId = setInterval(() => { |
| | | const topic = 'system/heartbeat'; |
| | | let message; |
| | | try { |
| | | message = JSON.stringify({ |
| | | timestamp: new Date().toISOString(), |
| | | status: 'alive' |
| | | }); |
| | | } catch (ex) { |
| | | console.error('Failed to create heartbeat message:', ex); |
| | | error('创建心跳消息失败'); |
| | | return; |
| | | } |
| | | |
| | | publishMessage(topic, message); |
| | | }, 5000); // 每5秒发一次心跳 |
| | | } |
| | | |
| | | function stopPeriodicPublish() { |
| | | if (intervalId) { |
| | | clearInterval(intervalId); |
| | | intervalId = null; |
| | | } |
| | | } |
| | | |
| | | // 外部可调用的发布函数 |
| | | function publishMessage(topic, message) { |
| | | if (!client || !publishEnabled) { |
| | | console.warn('MQTT client not connected. Message not sent:', message); |
| | | warn('MQTT 客户端未连接。消息未发送'); |
| | | function initMqtt(mqttConfig) { |
| | | if (!mqttConfig.enabled) { |
| | | info('MQTT 已禁用'); |
| | | return; |
| | | } |
| | | |
| | | console.log(`Publishing to ${topic}:`, message); |
| | | config = mqttConfig; |
| | | const options = { |
| | | host: config.brokerUrl, |
| | | port: config.port, |
| | | reconnectPeriod: config.reconnectPeriod || 5000, |
| | | // 👇 添加认证 |
| | | username: config.username, |
| | | password: config.password, |
| | | // 可选:设置 Client ID(避免重复) |
| | | clientId: 'fresenius_gateway_' + Math.random().toString(16).substr(2, 8), |
| | | }; |
| | | |
| | | try { |
| | | client.publish(topic, message, { qos: 1 }, (err) => { |
| | | if (err) { |
| | | console.error('Failed to publish message:', err); |
| | | error(`消息发布失败: ${err.message}`); |
| | | } |
| | | client = mqtt.connect(options); |
| | | client.on('connect', () => { |
| | | info(`✅ MQTT 连接成功: ${config.brokerUrl}:${config.port} (用户: ${config.username})`); |
| | | publishEnabled = true; |
| | | }); |
| | | client.on('close', () => { |
| | | publishEnabled = false; |
| | | info('🔌 MQTT 连接断开'); |
| | | }); |
| | | client.on('error', (err) => { |
| | | error(`❌ MQTT 错误: ${err.message}`); |
| | | }); |
| | | client.on('reconnect', () => { |
| | | info('🔄 MQTT 正在重连...'); |
| | | }); |
| | | } catch (ex) { |
| | | console.error('Unexpected error while publishing message:', ex); |
| | | error(`发布消息时发生意外错误: ${ex.message}`); |
| | | error(`💥 MQTT 初始化失败: ${ex.message}`); |
| | | } |
| | | } |
| | | |
| | | // 初始化连接 |
| | | connect(); |
| | | function publishMessage(topic, message) { |
| | | if (!config?.enabled || !client || !publishEnabled) { |
| | | warn('MQTT 未启用或未连接,消息丢弃'); |
| | | return; |
| | | } |
| | | client.publish(topic, message, { qos: 1 }, (err) => { |
| | | if (err) { |
| | | error(`📤 MQTT 发布失败 (${topic}): ${err.message}`); |
| | | } else { |
| | | info(`📤 MQTT 已发布到 ${topic}`); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | // 暴出发布函数 |
| | | module.exports = { |
| | | publishMessage, |
| | | }; |
| | | module.exports = { initMqtt, publishMessage }; |