"use strict"; const mqtt = require("mqtt"); class MqttUploader { constructor({ config, logger }) { this.config = config; this.logger = logger; this.client = null; this._connected = false; } start() { if (!this.config.enabled) { this.logger.upload("mqtt", "init", "", "MQTT 未启用,跳过"); return; } const brokerUrl = `mqtt://${this.config.brokerUrl}:${this.config.port}`; const clientId = `dialysis_gateway_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; this.logger.upload("mqtt", "init", "", `连接 MQTT Broker: ${brokerUrl}`); this.client = mqtt.connect(brokerUrl, { clientId, username: this.config.username, password: this.config.password, reconnectPeriod: this.config.reconnectPeriod || 5000, connectTimeout: 10000, }); this.client.on("connect", () => { this._connected = true; this.logger.upload("mqtt", "connect", "", "MQTT 连接成功"); }); this.client.on("close", () => { this._connected = false; this.logger.upload("mqtt", "close", "", "MQTT 连接断开"); }); this.client.on("reconnect", () => { this.logger.upload("mqtt", "reconnect", "", "MQTT 正在重连"); }); this.client.on("error", (err) => { this.logger.upload("mqtt", "error", "", `MQTT 错误: ${err.message}`); }); } getStatus() { return { enabled: !!this.config.enabled, connected: this._connected, }; } stop() { if (this.client) { this.client.end(true); this.client = null; this._connected = false; } } publish(deviceNo, payload) { const PUBLISH_TIMEOUT_MS = 10000; return new Promise((resolve) => { if (!this.config.enabled) { resolve({ enabled: false, ok: true, code: "MQTT_DISABLED", reason: "MQTT disabled" }); return; } if (!this.client || !this._connected) { resolve({ enabled: true, ok: false, code: "MQTT_NOT_CONNECTED", reason: "MQTT 未连接" }); return; } const prefix = this.config.defaultTopicPrefix || "touxiji"; const topic = `${prefix}/${sanitizeTopic(deviceNo)}`; let payloadStr; try { payloadStr = JSON.stringify(payload); } catch (err) { resolve({ enabled: true, ok: false, code: "MQTT_INVALID_PAYLOAD", reason: err.message }); return; } let done = false; const timer = setTimeout(() => { if (done) return; done = true; this.logger.upload("mqtt", "publish", deviceNo, `发布超时 topic=${topic}`); resolve({ enabled: true, ok: false, code: "MQTT_PUBLISH_TIMEOUT", reason: "publish timeout", topic }); }, PUBLISH_TIMEOUT_MS); this.client.publish(topic, payloadStr, { qos: this.config.qos || 1 }, (err) => { if (done) return; done = true; clearTimeout(timer); if (err) { this.logger.upload("mqtt", "publish", deviceNo, `发布失败 topic=${topic}: ${err.message}`); resolve({ enabled: true, ok: false, code: "MQTT_PUBLISH_FAIL", reason: err.message, topic }); } else { this.logger.upload("mqtt", "publish", deviceNo, `发布成功 topic=${topic}`); resolve({ enabled: true, ok: true, code: "MQTT_PUBLISH_OK", reason: "success", topic }); } }); }); } } function sanitizeTopic(deviceNo) { return String(deviceNo).replace(/[ #+]/g, "_"); } module.exports = { MqttUploader };