chenyc
2026-04-21 8632fbd73fdb15f22fae9cd36b9ed3e0635360f1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
const mqtt = require('mqtt');
 
// MQTT 服务负责建立连接、处理重连,并按设备编号发布消息。
class MqttService {
  constructor(config, logger = console) {
    this.config = config;
    this.logger = logger;
    this.client = null;
    this.connected = false;
  }
 
  // 把时间格式化为 yyyy-mm-dd hh:MM:ss,供上报字段 suedtime 使用。
  formatSuedtime(date = new Date()) {
    const year = date.getFullYear();
    const month = String(date.getMonth() + 1).padStart(2, '0');
    const day = String(date.getDate()).padStart(2, '0');
    const hour = String(date.getHours()).padStart(2, '0');
    const minute = String(date.getMinutes()).padStart(2, '0');
    const second = String(date.getSeconds()).padStart(2, '0');
 
    return `${year}-${month}-${day} ${hour}:${minute}:${second}`;
  }
 
  // 在原始指标对象上追加上传时间字段,保证每次发送都有 suedtime。
  buildPayload(message) {
    return {
      ...(message || {}),
      suedtime: this.formatSuedtime(),
    };
  }
 
  // 根据配置拼接 MQTT 连接地址,兼容 brokerUrl 和 protocol/host/port 两种写法。
  getBrokerUrl() {
    if (this.config.brokerUrl) {
      return this.config.brokerUrl;
    }
 
    const protocol = this.config.protocol || 'mqtt';
    const host = this.config.host;
    const port = this.config.port;
 
    return `${protocol}://${host}:${port}`;
  }
 
  // 为当前实例生成一个客户端标识,未配置时自动生成默认值。
  getClientId() {
    if (this.config.clientId) {
      return this.config.clientId;
    }
 
    const hostName = (this.config.host || 'broker').replace(/[^a-zA-Z0-9]/g, '_');
    return `jhm_tcp_gateway_${hostName}`;
  }
 
  // 启动 MQTT 客户端连接。
  start() {
    const brokerUrl = this.getBrokerUrl();
    const clientId = this.getClientId();
 
    this.logger.info(`[MQTT] 准备连接 broker=${brokerUrl} clientId=${clientId} qos=${this.config.qos || 0} retain=${Boolean(this.config.retain)}`);
 
    this.client = mqtt.connect(brokerUrl, {
      clientId,
      username: this.config.username || undefined,
      password: this.config.password || undefined,
      reconnectPeriod: this.config.reconnectPeriod || 5000,
      connectTimeout: this.config.connectTimeoutMs || 30000,
    });
 
    // 连接成功后标记状态,便于日志和故障判断。
    this.client.on('connect', () => {
      this.connected = true;
      this.logger.info(`[MQTT] 已连接到 ${brokerUrl}`);
    });
 
    // 重连时输出提示,方便现场排查网络波动问题。
    this.client.on('reconnect', () => {
      this.connected = false;
      this.logger.warn('[MQTT] 正在重连...');
    });
 
    // 断开连接时更新状态。
    this.client.on('close', () => {
      this.connected = false;
      this.logger.warn('[MQTT] 连接已关闭');
    });
 
    // 出现错误时只记录日志,不让主进程直接崩溃。
    this.client.on('error', (error) => {
      this.logger.error(`[MQTT] 错误: ${error.message}`);
    });
  }
 
  // 根据配置模板生成设备专属 topic。
  buildTopic(device) {
    // 优先使用完整模板;如果只配置了默认前缀,则按 前缀/设备编号 组织 topic。
    if (this.config.topicTemplate) {
      return this.config.topicTemplate
        .replace('{deviceId}', device.deviceId)
        .replace('{ip}', device.ip)
        .replace('{name}', device.name || device.deviceId);
    }
 
    const topicPrefix = this.config.defaultTopicPrefix || 'device';
    return `${topicPrefix}/${device.deviceId}`;
  }
 
  // 发布一条最小指标对象,例如 { F: 37.5 }。
  publish(device, message) {
    if (!this.client) {
      throw new Error('MQTT 服务尚未启动');
    }
 
    const topic = this.buildTopic(device);
    const payloadObject = this.buildPayload(message);
    const payload = JSON.stringify(payloadObject);
 
    // 如果 MQTT 当前还未连上,则消息会由 mqtt 客户端排队,日志中做提示。
    if (!this.connected) {
      this.logger.warn(`[MQTT] 当前未连接,消息将等待发送: ${topic}`);
    }
 
    this.client.publish(topic, payload, {
      qos: this.config.qos || 0,
      retain: Boolean(this.config.retain),
    }, (error) => {
      if (error) {
        this.logger.error(`[MQTT] 发布失败 ${topic}: ${error.message}`);
        return;
      }
 
      this.logger.info(`[MQTT] 发布成功 ${topic} ${payload}`);
    });
  }
 
  // 关闭 MQTT 连接,便于优雅停机。
  async stop() {
    if (!this.client) {
      return;
    }
 
    this.logger.info('[MQTT] 开始关闭连接');
 
    await new Promise((resolve) => {
      this.client.end(false, resolve);
    });
 
    this.connected = false;
    this.client = null;
    this.logger.info('[MQTT] MQTT 服务已停止');
  }
}
 
module.exports = {
  MqttService,
};