const mqtt = require('mqtt'); function formatUploadTime(date = new Date()) { const year = date.getFullYear(); const month = String(date.getMonth() + 1).padStart(2, '0'); const day = String(date.getDate()).padStart(2, '0'); const hour = String(date.getHours()).padStart(2, '0'); const minute = String(date.getMinutes()).padStart(2, '0'); const second = String(date.getSeconds()).padStart(2, '0'); return `${year}-${month}-${day} ${hour}:${minute}:${second}`; } class MqttService { constructor(config = {}, logger = console) { this.config = config; this.logger = logger; this.client = null; this.connected = false; } getBrokerUrl() { if (this.config.brokerUrl) { return this.config.brokerUrl; } return `${this.config.protocol || 'mqtt'}://${this.config.host}:${this.config.port}`; } getClientId() { if (this.config.clientId) { return this.config.clientId; } const hostName = String(this.config.host || 'broker').replace(/[^a-zA-Z0-9]/g, '_'); return `jh2028_gateway_${hostName}`; } buildTopic(device) { if (this.config.topicTemplate) { return this.config.topicTemplate .replace('{deviceId}', device.deviceId) .replace('{ip}', device.ip) .replace('{name}', device.name || device.deviceId); } return `${this.config.defaultTopicPrefix || 'device'}/${device.deviceId}`; } buildPayload(message) { return { ...(message || {}), suedtime: formatUploadTime(), }; } start() { const brokerUrl = this.getBrokerUrl(); const clientId = this.getClientId(); this.logger.info(`[MQTT] 正在连接 服务地址=${brokerUrl} 客户端ID=${clientId}`); this.client = mqtt.connect(brokerUrl, { clientId, username: this.config.username || undefined, password: this.config.password || undefined, reconnectPeriod: this.config.reconnectPeriod || 5000, connectTimeout: this.config.connectTimeoutMs || 30000, }); this.client.on('connect', () => { this.connected = true; this.logger.info(`[MQTT] 已连接 服务地址=${brokerUrl}`); }); this.client.on('reconnect', () => { this.connected = false; this.logger.warn('[MQTT] 正在重连'); }); this.client.on('close', () => { this.connected = false; this.logger.warn('[MQTT] 连接已关闭'); }); this.client.on('error', (error) => { this.logger.error(`[MQTT] 连接异常: ${error.message}`); }); } publish(device, message) { if (!this.client) { return Promise.reject(new Error('MQTT service is not started')); } const topic = this.buildTopic(device); const payloadObject = this.buildPayload(message); const payload = JSON.stringify(payloadObject); if (!this.connected) { this.logger.warn(`[MQTT] 当前未连接,消息将进入客户端队列 主题=${topic}`); } return new Promise((resolve, reject) => { this.client.publish(topic, payload, { qos: this.config.qos || 0, retain: Boolean(this.config.retain), }, (error) => { if (error) { this.logger.error(`[MQTT] 发布失败 主题=${topic}: ${error.message}`); reject(error); return; } this.logger.info(`[MQTT] 发布成功 主题=${topic} 数据=${payload}`); resolve({ ok: true, topic, payload: payloadObject }); }); }); } async stop() { if (!this.client) { return; } this.logger.info('[MQTT] 正在停止'); await new Promise((resolve) => { this.client.end(false, resolve); }); this.connected = false; this.client = null; this.logger.info('[MQTT] 已停止'); } } module.exports = { MqttService, formatUploadTime, };