const mqtt = require('mqtt');
|
|
// MQTT 服务负责建立连接、处理重连,并按设备编号发布消息。
|
class MqttService {
|
constructor(config, logger = console) {
|
this.config = config;
|
this.logger = logger;
|
this.client = null;
|
this.connected = false;
|
}
|
|
// 把时间格式化为 yyyy-mm-dd hh:MM:ss,供上报字段 suedtime 使用。
|
formatSuedtime(date = new Date()) {
|
const year = date.getFullYear();
|
const month = String(date.getMonth() + 1).padStart(2, '0');
|
const day = String(date.getDate()).padStart(2, '0');
|
const hour = String(date.getHours()).padStart(2, '0');
|
const minute = String(date.getMinutes()).padStart(2, '0');
|
const second = String(date.getSeconds()).padStart(2, '0');
|
|
return `${year}-${month}-${day} ${hour}:${minute}:${second}`;
|
}
|
|
// 在原始指标对象上追加上传时间字段,保证每次发送都有 suedtime。
|
buildPayload(message) {
|
return {
|
...(message || {}),
|
suedtime: this.formatSuedtime(),
|
};
|
}
|
|
// 根据配置拼接 MQTT 连接地址,兼容 brokerUrl 和 protocol/host/port 两种写法。
|
getBrokerUrl() {
|
if (this.config.brokerUrl) {
|
return this.config.brokerUrl;
|
}
|
|
const protocol = this.config.protocol || 'mqtt';
|
const host = this.config.host;
|
const port = this.config.port;
|
|
return `${protocol}://${host}:${port}`;
|
}
|
|
// 为当前实例生成一个客户端标识,未配置时自动生成默认值。
|
getClientId() {
|
if (this.config.clientId) {
|
return this.config.clientId;
|
}
|
|
const hostName = (this.config.host || 'broker').replace(/[^a-zA-Z0-9]/g, '_');
|
return `jhm_tcp_gateway_${hostName}`;
|
}
|
|
// 启动 MQTT 客户端连接。
|
start() {
|
const brokerUrl = this.getBrokerUrl();
|
const clientId = this.getClientId();
|
|
this.logger.info(`[MQTT] 准备连接 broker=${brokerUrl} clientId=${clientId} qos=${this.config.qos || 0} retain=${Boolean(this.config.retain)}`);
|
|
this.client = mqtt.connect(brokerUrl, {
|
clientId,
|
username: this.config.username || undefined,
|
password: this.config.password || undefined,
|
reconnectPeriod: this.config.reconnectPeriod || 5000,
|
connectTimeout: this.config.connectTimeoutMs || 30000,
|
});
|
|
// 连接成功后标记状态,便于日志和故障判断。
|
this.client.on('connect', () => {
|
this.connected = true;
|
this.logger.info(`[MQTT] 已连接到 ${brokerUrl}`);
|
});
|
|
// 重连时输出提示,方便现场排查网络波动问题。
|
this.client.on('reconnect', () => {
|
this.connected = false;
|
this.logger.warn('[MQTT] 正在重连...');
|
});
|
|
// 断开连接时更新状态。
|
this.client.on('close', () => {
|
this.connected = false;
|
this.logger.warn('[MQTT] 连接已关闭');
|
});
|
|
// 出现错误时只记录日志,不让主进程直接崩溃。
|
this.client.on('error', (error) => {
|
this.logger.error(`[MQTT] 错误: ${error.message}`);
|
});
|
}
|
|
// 根据配置模板生成设备专属 topic。
|
buildTopic(device) {
|
// 优先使用完整模板;如果只配置了默认前缀,则按 前缀/设备编号 组织 topic。
|
if (this.config.topicTemplate) {
|
return this.config.topicTemplate
|
.replace('{deviceId}', device.deviceId)
|
.replace('{ip}', device.ip)
|
.replace('{name}', device.name || device.deviceId);
|
}
|
|
const topicPrefix = this.config.defaultTopicPrefix || 'device';
|
return `${topicPrefix}/${device.deviceId}`;
|
}
|
|
// 发布一条最小指标对象,例如 { F: 37.5 }。
|
publish(device, message) {
|
if (!this.client) {
|
throw new Error('MQTT 服务尚未启动');
|
}
|
|
const topic = this.buildTopic(device);
|
const payloadObject = this.buildPayload(message);
|
const payload = JSON.stringify(payloadObject);
|
|
// 如果 MQTT 当前还未连上,则消息会由 mqtt 客户端排队,日志中做提示。
|
if (!this.connected) {
|
this.logger.warn(`[MQTT] 当前未连接,消息将等待发送: ${topic}`);
|
}
|
|
this.client.publish(topic, payload, {
|
qos: this.config.qos || 0,
|
retain: Boolean(this.config.retain),
|
}, (error) => {
|
if (error) {
|
this.logger.error(`[MQTT] 发布失败 ${topic}: ${error.message}`);
|
return;
|
}
|
|
this.logger.info(`[MQTT] 发布成功 ${topic} ${payload}`);
|
});
|
}
|
|
// 关闭 MQTT 连接,便于优雅停机。
|
async stop() {
|
if (!this.client) {
|
return;
|
}
|
|
this.logger.info('[MQTT] 开始关闭连接');
|
|
await new Promise((resolve) => {
|
this.client.end(false, resolve);
|
});
|
|
this.connected = false;
|
this.client = null;
|
this.logger.info('[MQTT] MQTT 服务已停止');
|
}
|
}
|
|
module.exports = {
|
MqttService,
|
};
|