chenyc
2026-05-09 c7d690bc224fb84e88d3033bf324876e4a64b008
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"use strict";
 
const mqtt = require("mqtt");
 
class MqttUploader {
  constructor({ config, logger }) {
    this.config = config;
    this.logger = logger;
    this.client = null;
    this._connected = false;
  }
 
  start() {
    if (!this.config.enabled) {
      this.logger.upload("mqtt", "init", "", "MQTT 未启用,跳过");
      return;
    }
 
    const brokerUrl = `mqtt://${this.config.brokerUrl}:${this.config.port}`;
    const clientId = `dialysis_gateway_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
 
    this.logger.upload("mqtt", "init", "", `连接 MQTT Broker: ${brokerUrl}`);
 
    this.client = mqtt.connect(brokerUrl, {
      clientId,
      username: this.config.username,
      password: this.config.password,
      reconnectPeriod: this.config.reconnectPeriod || 5000,
      connectTimeout: 10000,
    });
 
    this.client.on("connect", () => {
      this._connected = true;
      this.logger.upload("mqtt", "connect", "", "MQTT 连接成功");
    });
 
    this.client.on("close", () => {
      this._connected = false;
      this.logger.upload("mqtt", "close", "", "MQTT 连接断开");
    });
 
    this.client.on("reconnect", () => {
      this.logger.upload("mqtt", "reconnect", "", "MQTT 正在重连");
    });
 
    this.client.on("error", (err) => {
      this.logger.upload("mqtt", "error", "", `MQTT 错误: ${err.message}`);
    });
  }
 
  getStatus() {
    return {
      enabled: !!this.config.enabled,
      connected: this._connected,
    };
  }
 
  stop() {
    if (this.client) {
      this.client.end(true);
      this.client = null;
      this._connected = false;
    }
  }
 
  publish(deviceNo, payload) {
    const PUBLISH_TIMEOUT_MS = 10000;
    return new Promise((resolve) => {
      if (!this.config.enabled) {
        resolve({ enabled: false, ok: true, code: "MQTT_DISABLED", reason: "MQTT disabled" });
        return;
      }
 
      if (!this.client || !this._connected) {
        resolve({ enabled: true, ok: false, code: "MQTT_NOT_CONNECTED", reason: "MQTT 未连接" });
        return;
      }
 
      const prefix = this.config.defaultTopicPrefix || "touxiji";
      const topic = `${prefix}/${sanitizeTopic(deviceNo)}`;
 
      let payloadStr;
      try {
        payloadStr = JSON.stringify(payload);
      } catch (err) {
        resolve({ enabled: true, ok: false, code: "MQTT_INVALID_PAYLOAD", reason: err.message });
        return;
      }
 
      let done = false;
      const timer = setTimeout(() => {
        if (done) return;
        done = true;
        this.logger.upload("mqtt", "publish", deviceNo, `发布超时 topic=${topic}`);
        resolve({ enabled: true, ok: false, code: "MQTT_PUBLISH_TIMEOUT", reason: "publish timeout", topic });
      }, PUBLISH_TIMEOUT_MS);
 
      this.client.publish(topic, payloadStr, { qos: this.config.qos || 1 }, (err) => {
        if (done) return;
        done = true;
        clearTimeout(timer);
        if (err) {
          this.logger.upload("mqtt", "publish", deviceNo, `发布失败 topic=${topic}: ${err.message}`);
          resolve({ enabled: true, ok: false, code: "MQTT_PUBLISH_FAIL", reason: err.message, topic });
        } else {
          this.logger.upload("mqtt", "publish", deviceNo, `发布成功 topic=${topic}`);
          resolve({ enabled: true, ok: true, code: "MQTT_PUBLISH_OK", reason: "success", topic });
        }
      });
    });
  }
}
 
function sanitizeTopic(deviceNo) {
  return String(deviceNo).replace(/[ #+]/g, "_");
}
 
module.exports = { MqttUploader };