// mqtt-client.js
|
const mqtt = require('mqtt');
|
const logger = require('./logger');
|
|
let client = null;
|
let mqttCfg = null;
|
let isConnected = false;
|
|
// 格式化时间为 yyyy-MM-dd HH:mm:ss
|
function formatDateTime(date = new Date()) {
|
const pad = (n) => String(n).padStart(2, '0');
|
return `${date.getFullYear()}-${pad(date.getMonth() + 1)}-${pad(date.getDate())} ${pad(date.getHours())}:${pad(date.getMinutes())}:${pad(date.getSeconds())}`;
|
}
|
|
/**
|
* 初始化 MQTT 连接
|
* @param {Object} config - MQTT 配置
|
*/
|
function initMqtt(config) {
|
mqttCfg = config;
|
|
if (!config || !config.enabled) {
|
logger.info('mqtt.init', { status: 'disabled', message: 'MQTT 未启用' });
|
return;
|
}
|
|
try {
|
const brokerUrl = `mqtt://${config.brokerUrl}:${config.port}`;
|
const clientOptions = {
|
username: config.username,
|
password: config.password,
|
reconnectPeriod: config.reconnectPeriod || 5000,
|
clientId: `fresenius-4008s-${Date.now()}`,
|
clean: true,
|
keepalive: 60
|
};
|
|
console.log(`🔌 正在连接 MQTT Broker: ${brokerUrl}`);
|
client = mqtt.connect(brokerUrl, clientOptions);
|
|
client.on('connect', () => {
|
isConnected = true;
|
console.log('✅ MQTT 已连接');
|
logger.info('mqtt.connect', { brokerUrl, status: 'connected' });
|
});
|
|
client.on('reconnect', () => {
|
console.log('🔁 MQTT 正在重连...');
|
logger.warn('mqtt.reconnect', { brokerUrl });
|
});
|
|
client.on('error', (err) => {
|
console.error('❌ MQTT 连接错误:', err.message);
|
logger.error('mqtt.error', { error: err.message, brokerUrl });
|
});
|
|
client.on('offline', () => {
|
isConnected = false;
|
console.log('⚠️ MQTT 已离线');
|
logger.warn('mqtt.offline', { brokerUrl });
|
});
|
|
client.on('message', (topic, message) => {
|
logger.debug('mqtt.message.received', { topic, messageLength: message.length });
|
});
|
} catch (err) {
|
console.error('❌ MQTT 初始化失败:', err.message);
|
logger.error('mqtt.init.error', { error: err.message });
|
}
|
}
|
|
/**
|
* 发布数据到 MQTT
|
* @param {string} deviceSerial - 设备序列号
|
* @param {Object} data - 设备数据(物模型格式)
|
* @param {string} clientId - 客户端ID
|
*/
|
async function publishToMqtt(deviceSerial, data, clientId) {
|
if (!client || !isConnected || !mqttCfg || !mqttCfg.enabled) {
|
return;
|
}
|
|
try {
|
const topic = `${mqttCfg.defaultTopicPrefix}/${deviceSerial}`;
|
// 直接使用物模型数据,无需额外包装
|
const payload = {
|
...data,
|
deviceId: clientId
|
};
|
console.log(`📤 发布到 MQTT [${topic}]:`, JSON.stringify(payload));
|
client.publish(topic, JSON.stringify(payload), { qos: 1 }, (err) => {
|
if (err) {
|
console.error(`❌ MQTT 发布失败 [${topic}]:`, err.message);
|
logger.error('mqtt.publish.error', { topic, deviceSerial, error: err.message });
|
} else {
|
console.log(`✅ MQTT 已发布 [${topic}]`);
|
logger.info('mqtt.publish', { topic, deviceSerial, clientId });
|
}
|
});
|
} catch (err) {
|
console.error('❌ MQTT 发布异常:', err.message);
|
logger.error('mqtt.publish.exception', { deviceSerial, error: err.message });
|
}
|
}
|
|
/**
|
* 关闭 MQTT 连接
|
*/
|
function closeMqtt() {
|
if (client) {
|
client.end(() => {
|
console.log('🔌 MQTT 已断开连接');
|
logger.info('mqtt.close');
|
});
|
client = null;
|
isConnected = false;
|
}
|
}
|
|
module.exports = {
|
initMqtt,
|
publishToMqtt,
|
closeMqtt,
|
isConnected: () => isConnected
|
};
|