东丽网口版透析机 socket- server 通讯
chenyc
22 小时以前 55c8181bbe4c198f9bda5520ea0d8ba148933f9e
mqttClient.js
@@ -1,108 +1,62 @@
const { info, warn, error } = require('./logger'); // 引入自定义的日志模块
// mqttClient.js
const { info, warn, error } = require('./logger');
const mqtt = require('mqtt');
const brokerUrl = 'mqtt-test.ihemodialysis.com';
const port = 62183;
const options = {
  host: brokerUrl,
  port: port,
  reconnectPeriod: 5000, // 自动重连间隔时间
};
let client = null;
let publishEnabled = false;
let config = null;
// 初始化客户端连接
function connect() {
  try {
    client = mqtt.connect(options);
    client.on('connect', () => {
      console.log('Connected to MQTT Broker');
      info(`成功完成连接到 MQTT Broker: ${brokerUrl}:${port}`);
      publishEnabled = true;
      // 可选:启动定时发布任务
      // startPeriodicPublish();
    });
    client.on('close', () => {
      console.log('MQTT connection closed, will reconnect...');
      publishEnabled = false;
      stopPeriodicPublish();
    });
    client.on('error', (err) => {
      console.error('MQTT Error:', err);
      error(`MQTT 错误: ${err.message}`);
    });
    client.on('reconnect', () => {
      console.log('MQTT client reconnecting...');
    });
  } catch (ex) {
    console.error('Error during connecting to MQTT Broker:', ex);
    error(`尝试连接 MQTT Broker 时发生错误: ${ex.message}`);
  }
}
let intervalId = null;
function startPeriodicPublish() {
  if (intervalId) return;
  intervalId = setInterval(() => {
    const topic = 'system/heartbeat';
    let message;
    try {
      message = JSON.stringify({
        timestamp: new Date().toISOString(),
        status: 'alive'
      });
    } catch (ex) {
      console.error('Failed to create heartbeat message:', ex);
      error('创建心跳消息失败');
      return;
    }
    publishMessage(topic, message);
  }, 5000); // 每5秒发一次心跳
}
function stopPeriodicPublish() {
  if (intervalId) {
    clearInterval(intervalId);
    intervalId = null;
  }
}
// 外部可调用的发布函数
function publishMessage(topic, message) {
  if (!client || !publishEnabled) {
    console.warn('MQTT client not connected. Message not sent:', message);
    warn('MQTT 客户端未连接。消息未发送');
function initMqtt(mqttConfig) {
  if (!mqttConfig.enabled) {
    info('MQTT 已禁用');
    return;
  }
  console.log(`Publishing to ${topic}:`, message);
  config = mqttConfig;
  const options = {
    host: config.brokerUrl,
    port: config.port,
    reconnectPeriod: config.reconnectPeriod || 5000,
    // 👇 添加认证
    username: config.username,
    password: config.password,
    // 可选:设置 Client ID(避免重复)
    clientId: 'fresenius_gateway_' + Math.random().toString(16).substr(2, 8),
  };
  try {
    client.publish(topic, message, { qos: 1 }, (err) => {
      if (err) {
        console.error('Failed to publish message:', err);
        error(`消息发布失败: ${err.message}`);
      }
    client = mqtt.connect(options);
    client.on('connect', () => {
      info(`✅ MQTT 连接成功: ${config.brokerUrl}:${config.port} (用户: ${config.username})`);
      publishEnabled = true;
    });
    client.on('close', () => {
      publishEnabled = false;
      info('🔌 MQTT 连接断开');
    });
    client.on('error', (err) => {
      error(`❌ MQTT 错误: ${err.message}`);
    });
    client.on('reconnect', () => {
      info('🔄 MQTT 正在重连...');
    });
  } catch (ex) {
    console.error('Unexpected error while publishing message:', ex);
    error(`发布消息时发生意外错误: ${ex.message}`);
    error(`💥 MQTT 初始化失败: ${ex.message}`);
  }
}
// 初始化连接
connect();
function publishMessage(topic, message) {
  if (!config?.enabled || !client || !publishEnabled) {
    warn('MQTT 未启用或未连接,消息丢弃');
    return;
  }
  client.publish(topic, message, { qos: 1 }, (err) => {
    if (err) {
      error(`📤 MQTT 发布失败 (${topic}): ${err.message}`);
    } else {
      info(`📤 MQTT 已发布到 ${topic}`);
    }
  });
}
// 暴出发布函数
module.exports = {
  publishMessage,
};
module.exports = { initMqtt, publishMessage };