const mqtt = require('mqtt');
|
|
function formatUploadTime(date = new Date()) {
|
const year = date.getFullYear();
|
const month = String(date.getMonth() + 1).padStart(2, '0');
|
const day = String(date.getDate()).padStart(2, '0');
|
const hour = String(date.getHours()).padStart(2, '0');
|
const minute = String(date.getMinutes()).padStart(2, '0');
|
const second = String(date.getSeconds()).padStart(2, '0');
|
return `${year}-${month}-${day} ${hour}:${minute}:${second}`;
|
}
|
|
class MqttService {
|
constructor(config = {}, logger = console) {
|
this.config = config;
|
this.logger = logger;
|
this.client = null;
|
this.connected = false;
|
}
|
|
getBrokerUrl() {
|
if (this.config.brokerUrl) {
|
return this.config.brokerUrl;
|
}
|
|
return `${this.config.protocol || 'mqtt'}://${this.config.host}:${this.config.port}`;
|
}
|
|
getClientId() {
|
if (this.config.clientId) {
|
return this.config.clientId;
|
}
|
|
const hostName = String(this.config.host || 'broker').replace(/[^a-zA-Z0-9]/g, '_');
|
return `jh2028_gateway_${hostName}`;
|
}
|
|
buildTopic(device) {
|
if (this.config.topicTemplate) {
|
return this.config.topicTemplate
|
.replace('{deviceId}', device.deviceId)
|
.replace('{ip}', device.ip)
|
.replace('{name}', device.name || device.deviceId);
|
}
|
|
return `${this.config.defaultTopicPrefix || 'device'}/${device.deviceId}`;
|
}
|
|
buildPayload(message) {
|
return {
|
...(message || {}),
|
suedtime: formatUploadTime(),
|
};
|
}
|
|
start() {
|
const brokerUrl = this.getBrokerUrl();
|
const clientId = this.getClientId();
|
|
this.logger.info(`[MQTT] 正在连接 服务地址=${brokerUrl} 客户端ID=${clientId}`);
|
this.client = mqtt.connect(brokerUrl, {
|
clientId,
|
username: this.config.username || undefined,
|
password: this.config.password || undefined,
|
reconnectPeriod: this.config.reconnectPeriod || 5000,
|
connectTimeout: this.config.connectTimeoutMs || 30000,
|
});
|
|
this.client.on('connect', () => {
|
this.connected = true;
|
this.logger.info(`[MQTT] 已连接 服务地址=${brokerUrl}`);
|
});
|
|
this.client.on('reconnect', () => {
|
this.connected = false;
|
this.logger.warn('[MQTT] 正在重连');
|
});
|
|
this.client.on('close', () => {
|
this.connected = false;
|
this.logger.warn('[MQTT] 连接已关闭');
|
});
|
|
this.client.on('error', (error) => {
|
this.logger.error(`[MQTT] 连接异常: ${error.message}`);
|
});
|
}
|
|
publish(device, message) {
|
if (!this.client) {
|
return Promise.reject(new Error('MQTT service is not started'));
|
}
|
|
const topic = this.buildTopic(device);
|
const payloadObject = this.buildPayload(message);
|
const payload = JSON.stringify(payloadObject);
|
|
if (!this.connected) {
|
this.logger.warn(`[MQTT] 当前未连接,消息将进入客户端队列 主题=${topic}`);
|
}
|
|
return new Promise((resolve, reject) => {
|
this.client.publish(topic, payload, {
|
qos: this.config.qos || 0,
|
retain: Boolean(this.config.retain),
|
}, (error) => {
|
if (error) {
|
this.logger.error(`[MQTT] 发布失败 主题=${topic}: ${error.message}`);
|
reject(error);
|
return;
|
}
|
|
this.logger.info(`[MQTT] 发布成功 主题=${topic} 数据=${payload}`);
|
resolve({ ok: true, topic, payload: payloadObject });
|
});
|
});
|
}
|
|
async stop() {
|
if (!this.client) {
|
return;
|
}
|
|
this.logger.info('[MQTT] 正在停止');
|
await new Promise((resolve) => {
|
this.client.end(false, resolve);
|
});
|
this.connected = false;
|
this.client = null;
|
this.logger.info('[MQTT] 已停止');
|
}
|
}
|
|
module.exports = {
|
MqttService,
|
formatUploadTime,
|
};
|