const mqtt = require('mqtt'); // MQTT 服务负责建立连接、处理重连,并按设备编号发布消息。 class MqttService { constructor(config, logger = console) { this.config = config; this.logger = logger; this.client = null; this.connected = false; } // 把时间格式化为 yyyy-mm-dd hh:MM:ss,供上报字段 suedtime 使用。 formatSuedtime(date = new Date()) { const year = date.getFullYear(); const month = String(date.getMonth() + 1).padStart(2, '0'); const day = String(date.getDate()).padStart(2, '0'); const hour = String(date.getHours()).padStart(2, '0'); const minute = String(date.getMinutes()).padStart(2, '0'); const second = String(date.getSeconds()).padStart(2, '0'); return `${year}-${month}-${day} ${hour}:${minute}:${second}`; } // 在原始指标对象上追加上传时间字段,保证每次发送都有 suedtime。 buildPayload(message) { return { ...(message || {}), suedtime: this.formatSuedtime(), }; } // 根据配置拼接 MQTT 连接地址,兼容 brokerUrl 和 protocol/host/port 两种写法。 getBrokerUrl() { if (this.config.brokerUrl) { return this.config.brokerUrl; } const protocol = this.config.protocol || 'mqtt'; const host = this.config.host; const port = this.config.port; return `${protocol}://${host}:${port}`; } // 为当前实例生成一个客户端标识,未配置时自动生成默认值。 getClientId() { if (this.config.clientId) { return this.config.clientId; } const hostName = (this.config.host || 'broker').replace(/[^a-zA-Z0-9]/g, '_'); return `jhm_tcp_gateway_${hostName}`; } // 启动 MQTT 客户端连接。 start() { const brokerUrl = this.getBrokerUrl(); const clientId = this.getClientId(); this.logger.info(`[MQTT] 准备连接 broker=${brokerUrl} clientId=${clientId} qos=${this.config.qos || 0} retain=${Boolean(this.config.retain)}`); this.client = mqtt.connect(brokerUrl, { clientId, username: this.config.username || undefined, password: this.config.password || undefined, reconnectPeriod: this.config.reconnectPeriod || 5000, connectTimeout: this.config.connectTimeoutMs || 30000, }); // 连接成功后标记状态,便于日志和故障判断。 this.client.on('connect', () => { this.connected = true; this.logger.info(`[MQTT] 已连接到 ${brokerUrl}`); }); // 重连时输出提示,方便现场排查网络波动问题。 this.client.on('reconnect', () => { this.connected = false; this.logger.warn('[MQTT] 正在重连...'); }); // 断开连接时更新状态。 this.client.on('close', () => { this.connected = false; this.logger.warn('[MQTT] 连接已关闭'); }); // 出现错误时只记录日志,不让主进程直接崩溃。 this.client.on('error', (error) => { this.logger.error(`[MQTT] 错误: ${error.message}`); }); } // 根据配置模板生成设备专属 topic。 buildTopic(device) { // 优先使用完整模板;如果只配置了默认前缀,则按 前缀/设备编号 组织 topic。 if (this.config.topicTemplate) { return this.config.topicTemplate .replace('{deviceId}', device.deviceId) .replace('{ip}', device.ip) .replace('{name}', device.name || device.deviceId); } const topicPrefix = this.config.defaultTopicPrefix || 'device'; return `${topicPrefix}/${device.deviceId}`; } // 发布一条最小指标对象,例如 { F: 37.5 }。 publish(device, message) { if (!this.client) { throw new Error('MQTT 服务尚未启动'); } const topic = this.buildTopic(device); const payloadObject = this.buildPayload(message); const payload = JSON.stringify(payloadObject); // 如果 MQTT 当前还未连上,则消息会由 mqtt 客户端排队,日志中做提示。 if (!this.connected) { this.logger.warn(`[MQTT] 当前未连接,消息将等待发送: ${topic}`); } this.client.publish(topic, payload, { qos: this.config.qos || 0, retain: Boolean(this.config.retain), }, (error) => { if (error) { this.logger.error(`[MQTT] 发布失败 ${topic}: ${error.message}`); return; } this.logger.info(`[MQTT] 发布成功 ${topic} ${payload}`); }); } // 关闭 MQTT 连接,便于优雅停机。 async stop() { if (!this.client) { return; } this.logger.info('[MQTT] 开始关闭连接'); await new Promise((resolve) => { this.client.end(false, resolve); }); this.connected = false; this.client = null; this.logger.info('[MQTT] MQTT 服务已停止'); } } module.exports = { MqttService, };