费森4008s 网口通讯 ,原生串口透传网口
chenyc
2026-03-22 106a1256ccec6feef931d57923474180fa1a5ade
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// mqtt-client.js
const mqtt = require('mqtt');
const logger = require('./logger');
 
let client = null;
let mqttCfg = null;
let isConnected = false;
 
// 格式化时间为 yyyy-MM-dd HH:mm:ss
function formatDateTime(date = new Date()) {
  const pad = (n) => String(n).padStart(2, '0');
  return `${date.getFullYear()}-${pad(date.getMonth() + 1)}-${pad(date.getDate())} ${pad(date.getHours())}:${pad(date.getMinutes())}:${pad(date.getSeconds())}`;
}
 
/**
 * 初始化 MQTT 连接
 * @param {Object} config - MQTT 配置
 */
function initMqtt(config) {
  mqttCfg = config;
  
  if (!config || !config.enabled) {
    logger.info('mqtt.init', { status: 'disabled', message: 'MQTT 未启用' });
    return;
  }
 
  try {
    const brokerUrl = `mqtt://${config.brokerUrl}:${config.port}`;
    const clientOptions = {
      username: config.username,
      password: config.password,
      reconnectPeriod: config.reconnectPeriod || 5000,
      clientId: `fresenius-4008s-${Date.now()}`,
      clean: true,
      keepalive: 60
    };
 
    console.log(`🔌 正在连接 MQTT Broker: ${brokerUrl}`);
    client = mqtt.connect(brokerUrl, clientOptions);
 
    client.on('connect', () => {
      isConnected = true;
      console.log('✅ MQTT 已连接');
      logger.info('mqtt.connect', { brokerUrl, status: 'connected' });
    });
 
    client.on('reconnect', () => {
      console.log('🔁 MQTT 正在重连...');
      logger.warn('mqtt.reconnect', { brokerUrl });
    });
 
    client.on('error', (err) => {
      console.error('❌ MQTT 连接错误:', err.message);
      logger.error('mqtt.error', { error: err.message, brokerUrl });
    });
 
    client.on('offline', () => {
      isConnected = false;
      console.log('⚠️ MQTT 已离线');
      logger.warn('mqtt.offline', { brokerUrl });
    });
 
    client.on('message', (topic, message) => {
      logger.debug('mqtt.message.received', { topic, messageLength: message.length });
    });
  } catch (err) {
    console.error('❌ MQTT 初始化失败:', err.message);
    logger.error('mqtt.init.error', { error: err.message });
  }
}
 
/**
 * 发布数据到 MQTT
 * @param {string} deviceSerial - 设备序列号
 * @param {Object} data - 设备数据(物模型格式)
 * @param {string} clientId - 客户端ID
 */
async function publishToMqtt(deviceSerial, data, clientId) {
  if (!client || !isConnected || !mqttCfg || !mqttCfg.enabled) {
    return;
  }
 
  try {
    const topic = `${mqttCfg.defaultTopicPrefix}/${deviceSerial}`;
    // 直接使用物模型数据,无需额外包装
    const payload = {
      ...data,
      deviceId: clientId
    };
    console.log(`📤 发布到 MQTT [${topic}]:`, JSON.stringify(payload));
    client.publish(topic, JSON.stringify(payload), { qos: 1 }, (err) => {
      if (err) {
        console.error(`❌ MQTT 发布失败 [${topic}]:`, err.message);
        logger.error('mqtt.publish.error', { topic, deviceSerial, error: err.message });
      } else {
        console.log(`✅ MQTT 已发布 [${topic}]`);
        logger.info('mqtt.publish', { topic, deviceSerial, clientId });
      }
    });
  } catch (err) {
    console.error('❌ MQTT 发布异常:', err.message);
    logger.error('mqtt.publish.exception', { deviceSerial, error: err.message });
  }
}
 
/**
 * 关闭 MQTT 连接
 */
function closeMqtt() {
  if (client) {
    client.end(() => {
      console.log('🔌 MQTT 已断开连接');
      logger.info('mqtt.close');
    });
    client = null;
    isConnected = false;
  }
}
 
module.exports = {
  initMqtt,
  publishToMqtt,
  closeMqtt,
  isConnected: () => isConnected
};