From 7885cede659f3255be56f77c1eef2ada7387d6f1 Mon Sep 17 00:00:00 2001
From: chenyc <501753378@qq.com>
Date: 星期日, 22 三月 2026 16:23:21 +0800
Subject: [PATCH] 初始化项目
---
src/mqttClient.js | 73 ++++++++++++++++++++++++++++++++++++
1 files changed, 73 insertions(+), 0 deletions(-)
diff --git a/src/mqttClient.js b/src/mqttClient.js
new file mode 100644
index 0000000..07f7d30
--- /dev/null
+++ b/src/mqttClient.js
@@ -0,0 +1,73 @@
+const mqtt = require("mqtt");
+const config = require("./config");
+const logger = require("./logger");
+
+function createMqttClient() {
+ if (!config.mqtt || !config.mqtt.enabled) {
+ logger.info("MQTT is disabled by config");
+ return {
+ client: null,
+ publishDeviceData: () => {}
+ };
+ }
+
+ // 兼容当前配置结构:url 为主机名,port 为端口
+ const host = config.mqtt.url || "127.0.0.1";
+ const port = config.mqtt.port || 1883;
+ const brokerUrl = `mqtt://${host}:${port}`;
+
+ const client = mqtt.connect(brokerUrl, {
+ username: config.mqtt.username || undefined,
+ password: config.mqtt.password || undefined,
+ clientId: config.mqtt.clientId,
+ reconnectPeriod: config.mqtt.reconnectPeriod
+ });
+
+ client.on("connect", () => {
+ logger.info("MQTT connected", { url: brokerUrl });
+ });
+
+ client.on("reconnect", () => {
+ logger.info("MQTT reconnecting");
+ });
+
+ client.on("error", (err) => {
+ logger.error("MQTT error", err.message || err);
+ });
+
+ client.on("close", () => {
+ logger.warn("MQTT connection closed");
+ });
+
+ function publishDeviceData(deviceNumber, data) {
+ if (!client.connected) {
+ logger.warn("MQTT not connected, skip publish");
+ return;
+ }
+ // Topic 前缀优先取配置的 defaultTopicPrefix,否则退回 "dialysis"
+ const prefix = config.mqtt.defaultTopicPrefix || config.mqtt.topicPrefix || "dialysis";
+ const topic = `${prefix}/${deviceNumber}`;
+ const payload = JSON.stringify({
+ deviceId: data.deviceNumber,
+ ts: Date.now(),
+ ...data
+ });
+ // 记录发送到 MQTT 的物模型/数据内容,便于对比阿里云或下游收到的报文
+ logger.info("MQTT publish", {
+ topic,
+ deviceNumber,
+ ...data
+ });
+ client.publish(topic, payload, { qos: 0, retain: !!config.mqtt.retain }, (err) => {
+ if (err) {
+ logger.error("MQTT publish error", err.message || err);
+ } else {
+ logger.info("MQTT publish success", { topic, deviceNumber });
+ }
+ });
+ }
+
+ return { client, publishDeviceData };
+}
+
+module.exports = createMqttClient;
--
Gitblit v1.8.0