"use strict";
|
|
const mqtt = require("mqtt");
|
|
class MqttUploader {
|
constructor({ config, logger }) {
|
this.config = config;
|
this.logger = logger;
|
this.client = null;
|
this._connected = false;
|
}
|
|
start() {
|
if (!this.config.enabled) {
|
this.logger.upload("mqtt", "init", "", "MQTT 未启用,跳过");
|
return;
|
}
|
|
const brokerUrl = `mqtt://${this.config.brokerUrl}:${this.config.port}`;
|
const clientId = `dialysis_gateway_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
|
|
this.logger.upload("mqtt", "init", "", `连接 MQTT Broker: ${brokerUrl}`);
|
|
this.client = mqtt.connect(brokerUrl, {
|
clientId,
|
username: this.config.username,
|
password: this.config.password,
|
reconnectPeriod: this.config.reconnectPeriod || 5000,
|
connectTimeout: 10000,
|
});
|
|
this.client.on("connect", () => {
|
this._connected = true;
|
this.logger.upload("mqtt", "connect", "", "MQTT 连接成功");
|
});
|
|
this.client.on("close", () => {
|
this._connected = false;
|
this.logger.upload("mqtt", "close", "", "MQTT 连接断开");
|
});
|
|
this.client.on("reconnect", () => {
|
this.logger.upload("mqtt", "reconnect", "", "MQTT 正在重连");
|
});
|
|
this.client.on("error", (err) => {
|
this.logger.upload("mqtt", "error", "", `MQTT 错误: ${err.message}`);
|
});
|
}
|
|
getStatus() {
|
return {
|
enabled: !!this.config.enabled,
|
connected: this._connected,
|
};
|
}
|
|
stop() {
|
if (this.client) {
|
this.client.end(true);
|
this.client = null;
|
this._connected = false;
|
}
|
}
|
|
publish(deviceNo, payload) {
|
const PUBLISH_TIMEOUT_MS = 10000;
|
return new Promise((resolve) => {
|
if (!this.config.enabled) {
|
resolve({ enabled: false, ok: true, code: "MQTT_DISABLED", reason: "MQTT disabled" });
|
return;
|
}
|
|
if (!this.client || !this._connected) {
|
resolve({ enabled: true, ok: false, code: "MQTT_NOT_CONNECTED", reason: "MQTT 未连接" });
|
return;
|
}
|
|
const prefix = this.config.defaultTopicPrefix || "touxiji";
|
const topic = `${prefix}/${sanitizeTopic(deviceNo)}`;
|
|
let payloadStr;
|
try {
|
payloadStr = JSON.stringify(payload);
|
} catch (err) {
|
resolve({ enabled: true, ok: false, code: "MQTT_INVALID_PAYLOAD", reason: err.message });
|
return;
|
}
|
|
let done = false;
|
const timer = setTimeout(() => {
|
if (done) return;
|
done = true;
|
this.logger.upload("mqtt", "publish", deviceNo, `发布超时 topic=${topic}`);
|
resolve({ enabled: true, ok: false, code: "MQTT_PUBLISH_TIMEOUT", reason: "publish timeout", topic });
|
}, PUBLISH_TIMEOUT_MS);
|
|
this.client.publish(topic, payloadStr, { qos: this.config.qos || 1 }, (err) => {
|
if (done) return;
|
done = true;
|
clearTimeout(timer);
|
if (err) {
|
this.logger.upload("mqtt", "publish", deviceNo, `发布失败 topic=${topic}: ${err.message}`);
|
resolve({ enabled: true, ok: false, code: "MQTT_PUBLISH_FAIL", reason: err.message, topic });
|
} else {
|
this.logger.upload("mqtt", "publish", deviceNo, `发布成功 topic=${topic}`);
|
resolve({ enabled: true, ok: true, code: "MQTT_PUBLISH_OK", reason: "success", topic });
|
}
|
});
|
});
|
}
|
}
|
|
function sanitizeTopic(deviceNo) {
|
return String(deviceNo).replace(/[ #+]/g, "_");
|
}
|
|
module.exports = { MqttUploader };
|