// mqtt-client.js const mqtt = require('mqtt'); const logger = require('./logger'); let client = null; let mqttCfg = null; let isConnected = false; // 格式化时间为 yyyy-MM-dd HH:mm:ss function formatDateTime(date = new Date()) { const pad = (n) => String(n).padStart(2, '0'); return `${date.getFullYear()}-${pad(date.getMonth() + 1)}-${pad(date.getDate())} ${pad(date.getHours())}:${pad(date.getMinutes())}:${pad(date.getSeconds())}`; } /** * 初始化 MQTT 连接 * @param {Object} config - MQTT 配置 */ function initMqtt(config) { mqttCfg = config; if (!config || !config.enabled) { logger.info('mqtt.init', { status: 'disabled', message: 'MQTT 未启用' }); return; } try { const brokerUrl = `mqtt://${config.brokerUrl}:${config.port}`; const clientOptions = { username: config.username, password: config.password, reconnectPeriod: config.reconnectPeriod || 5000, clientId: `fresenius-4008s-${Date.now()}`, clean: true, keepalive: 60 }; console.log(`🔌 正在连接 MQTT Broker: ${brokerUrl}`); client = mqtt.connect(brokerUrl, clientOptions); client.on('connect', () => { isConnected = true; console.log('✅ MQTT 已连接'); logger.info('mqtt.connect', { brokerUrl, status: 'connected' }); }); client.on('reconnect', () => { console.log('🔁 MQTT 正在重连...'); logger.warn('mqtt.reconnect', { brokerUrl }); }); client.on('error', (err) => { console.error('❌ MQTT 连接错误:', err.message); logger.error('mqtt.error', { error: err.message, brokerUrl }); }); client.on('offline', () => { isConnected = false; console.log('⚠️ MQTT 已离线'); logger.warn('mqtt.offline', { brokerUrl }); }); client.on('message', (topic, message) => { logger.debug('mqtt.message.received', { topic, messageLength: message.length }); }); } catch (err) { console.error('❌ MQTT 初始化失败:', err.message); logger.error('mqtt.init.error', { error: err.message }); } } /** * 发布数据到 MQTT * @param {string} deviceSerial - 设备序列号 * @param {Object} data - 设备数据(物模型格式) * @param {string} clientId - 客户端ID */ async function publishToMqtt(deviceSerial, data, clientId) { if (!client || !isConnected || !mqttCfg || !mqttCfg.enabled) { return; } try { const topic = `${mqttCfg.defaultTopicPrefix}/${deviceSerial}`; // 直接使用物模型数据,无需额外包装 const payload = { ...data, deviceId: clientId }; console.log(`📤 发布到 MQTT [${topic}]:`, JSON.stringify(payload)); client.publish(topic, JSON.stringify(payload), { qos: 1 }, (err) => { if (err) { console.error(`❌ MQTT 发布失败 [${topic}]:`, err.message); logger.error('mqtt.publish.error', { topic, deviceSerial, error: err.message }); } else { console.log(`✅ MQTT 已发布 [${topic}]`); logger.info('mqtt.publish', { topic, deviceSerial, clientId }); } }); } catch (err) { console.error('❌ MQTT 发布异常:', err.message); logger.error('mqtt.publish.exception', { deviceSerial, error: err.message }); } } /** * 关闭 MQTT 连接 */ function closeMqtt() { if (client) { client.end(() => { console.log('🔌 MQTT 已断开连接'); logger.info('mqtt.close'); }); client = null; isConnected = false; } } module.exports = { initMqtt, publishToMqtt, closeMqtt, isConnected: () => isConnected };