const mqtt = require("mqtt"); const config = require("./config"); const logger = require("./logger"); function createMqttClient() { if (!config.mqtt || !config.mqtt.enabled) { logger.info("MQTT is disabled by config"); return { client: null, publishDeviceData: () => {} }; } // 兼容当前配置结构:url 为主机名,port 为端口 const host = config.mqtt.url || "127.0.0.1"; const port = config.mqtt.port || 1883; const brokerUrl = `mqtt://${host}:${port}`; const client = mqtt.connect(brokerUrl, { username: config.mqtt.username || undefined, password: config.mqtt.password || undefined, clientId: config.mqtt.clientId, reconnectPeriod: config.mqtt.reconnectPeriod }); client.on("connect", () => { logger.info("MQTT connected", { url: brokerUrl }); }); client.on("reconnect", () => { logger.info("MQTT reconnecting"); }); client.on("error", (err) => { logger.error("MQTT error", err.message || err); }); client.on("close", () => { logger.warn("MQTT connection closed"); }); function publishDeviceData(deviceNumber, data) { if (!client.connected) { logger.warn("MQTT not connected, skip publish"); return; } // Topic 前缀优先取配置的 defaultTopicPrefix,否则退回 "dialysis" const prefix = config.mqtt.defaultTopicPrefix || config.mqtt.topicPrefix || "dialysis"; const topic = `${prefix}/${deviceNumber}`; const payload = JSON.stringify({ deviceId: data.deviceNumber, ts: Date.now(), ...data }); // 记录发送到 MQTT 的物模型/数据内容,便于对比阿里云或下游收到的报文 logger.info("MQTT publish", { topic, deviceNumber, ...data }); client.publish(topic, payload, { qos: 0, retain: !!config.mqtt.retain }, (err) => { if (err) { logger.error("MQTT publish error", err.message || err); } else { logger.info("MQTT publish success", { topic, deviceNumber }); } }); } return { client, publishDeviceData }; } module.exports = createMqttClient;