From 434f42a9b33927a64611c3f9a06c674a3ca6a013 Mon Sep 17 00:00:00 2001
From: chenyc <501753378@qq.com>
Date: 星期六, 08 十一月 2025 16:50:20 +0800
Subject: [PATCH] 优化代码

---
 index.js |  483 +++++++++++++++++++++++++++--------------------------
 1 files changed, 243 insertions(+), 240 deletions(-)

diff --git a/index.js b/index.js
index 64b43cb..7a64ad4 100644
--- a/index.js
+++ b/index.js
@@ -1,292 +1,295 @@
-const net = require('net');  // 引入 Node.js 的 net 模块用于创建网络服务器和客户端
-const logger = require('./logger'); // 引入上面创建的logger
-const EventEmitter = require('events');  // 引入 Node.js 的 events 模块,用于事件驱动编程
+const net = require('net');
+const logger = require('./logger');
+const EventEmitter = require('events');
 const aliyunIot = require('aliyun-iot-device-sdk');
 const { getAliyunDeviceSecret } = require('./api');
 const toModel = require('./Strholp');
 const { publishMessage } = require('./mqttClient');
-// 定义 DeviceManager 类来管理设备连接
+
+// ========== 自定义错误类(可选)==========
+class CustomError extends Error {
+    constructor(message, cause) {
+        super(message);
+        this.cause = cause;
+    }
+}
+
+// ========== 常量配置 ==========
+const MAX_BUFFER_SIZE = 500;      // 缓冲区最大长度
+const RETRY_INTERVAL_MS = 10000;  // 重试间隔 10s
+const KEEP_ALIVE_INTERVAL_MS = 60000; // 保活间隔 60s
+const DEVICE_TIMEOUT_MS = 120000; // 设备无响应超时 2分钟
+
+// ========== DeviceManager 类 ==========
 class DeviceManager extends EventEmitter {
     constructor() {
-        super();  // 调用父类构造函数
-        this.devices = new Map();  // 使用 Map 存储设备连接及其状态
-    }
-    // 处理新设备连接
-    handleDevice(socket) {
-        const deviceId = socket.remoteAddress + ':' + socket.remotePort;  // 根据 IP 和端口生成唯一设备ID
-        try{
-            logger.info(`建立新连接${deviceId}`)
-            this.devices.set(deviceId, { 
-                socket, 
-                lastAck: Date.now(),  // 记录最后一次收到 ACK 的时间
-                status: 'pending',  // 初始状态为 pending(等待确认)
-                retryInterval: null,  // 重试机制定时器ID
-                keepAliveInterval: null,  // 定时发送保持连接信号的定时器ID
-                lastSignal: 'K',  // 最后发送的信号,默认为 'K'
-                iotDevice:null,
-                masData:'',
-                iotDeviceNo:'' // SOCKET连接的设备号
-            });
-            // 发送初始 'K'
-            this.sendKeepAliveToDevice(deviceId);
+        super();
+        this.devices = new Map();
 
-            // 启动重试机制
-            this.startRetryMechanism(deviceId);
-
-            // 每个客户端连接都有一个独立的缓冲区 buffer。
-            let buffer = '';
-            // 监听客户端发送的数据
-            socket.on('data', (chunk) => {
-                buffer += chunk.toString('ascii');  // 将新的数据片段添加到缓冲区
-                console.log(buffer.length,'-------------------')
-                while (true) {
-                    console.log(buffer)
-                    const messageBengIndex=buffer.indexOf('K1');
-                    const messageEndIndex = buffer.indexOf('\r\n');  // 假设消息以 '\r\n' 结束
-                    //如果没有K1说明没有开始符号  直接放弃之前接受的数据  如果拼接字符串太长还没有结束符号也丢弃
-                    if(messageBengIndex===-1||buffer.length>300){
-                        buffer=''
-                        break
-                    }
-                    if (messageBengIndex===-1||messageEndIndex === -1) break;  // 如果没有找到结束符,跳出循环等待更多数据
-                    
-                    const completeMessage = buffer.substring(messageBengIndex, messageEndIndex).trim();  // 提取完整消息
-                    buffer = ''; // 移除已处理的部分
-                    logger.info(`${deviceId}接收到的完整消息`)
-                    // 解析数据
-                    this.handleData(deviceId, completeMessage.toString());  // 处理接收到的数据
+        // 启动全局超时清理定时器
+        setInterval(() => {
+            const now = Date.now();
+            for (const [deviceId, info] of this.devices.entries()) {
+                if (now - info.lastAck > DEVICE_TIMEOUT_MS) {
+                    logger.warn(`设备 ${deviceId} 超时(${DEVICE_TIMEOUT_MS / 1000}s 无响应),自动断开`);
+                    this.removeDevice(deviceId);
                 }
-                
-            });
-        }catch(err){
-            logger.error(`${deviceId}建立连接异常出错:${err}`)
-        }
+            }
+        }, 30000); // 每30秒检查一次
     }
-    // 启动针对每个设备的重试机制
+
+    handleDevice(socket) {
+        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
+        logger.info(`建立新连接: ${deviceId}`);
+
+        const deviceInfo = {
+            socket,
+            lastAck: Date.now(),
+            status: 'pending', // pending → valid → registered
+            retryInterval: null,
+            keepAliveInterval: null,
+            lastSignal: 'K', // 初始握手信号
+            iotDevice: null,
+            masData: '',
+            iotDeviceNo: ''
+        };
+
+        this.devices.set(deviceId, deviceInfo);
+
+        // 发送初始握手信号
+        this.sendKeepAliveToDevice(deviceId);
+        this.startRetryMechanism(deviceId);
+
+        let buffer = '';
+
+        socket.on('data', (chunk) => {
+            buffer += chunk.toString('ascii');
+
+            while (true) {
+                const startIdx = buffer.indexOf('K1');
+                if (startIdx === -1) {
+                    // 没有起始标志,但保留数据(防分包)
+                    if (buffer.length > MAX_BUFFER_SIZE) buffer = '';
+                    break;
+                }
+
+                const endIdx = buffer.indexOf('\r\n', startIdx);
+                if (endIdx === -1) {
+                    // 结束符未到,保留从 K1 开始的数据
+                    if (buffer.length > MAX_BUFFER_SIZE) {
+                        buffer = buffer.substring(startIdx);
+                    }
+                    break;
+                }
+
+                const message = buffer.substring(startIdx, endIdx).trim();
+                buffer = buffer.substring(endIdx + 2); // 移除已处理部分(含 \r\n)
+
+                logger.info(`${deviceId} 接收到完整消息: ${message}`);
+                this.handleData(deviceId, message);
+            }
+        });
+
+        socket.on('error', (err) => {
+            logger.error(`设备 ${deviceId} 套接字错误:`, err.message);
+            this.removeDevice(deviceId);
+        });
+
+        socket.on('end', () => {
+            logger.info(`设备断开连接: ${deviceId}`);
+            this.removeDevice(deviceId);
+        });
+    }
+
     startRetryMechanism(deviceId) {
         const deviceInfo = this.devices.get(deviceId);
-        try{
-            if (deviceInfo.retryInterval) clearInterval(deviceInfo.retryInterval);  // 清除之前的重试定时器
-            deviceInfo.retryInterval = setInterval(() => {
-                if (deviceInfo.status === 'pending'||deviceInfo.status==='invalid') {
-                    // 来回切换发送,命令
-                    if(deviceInfo.lastSignal==='K'){
-                        deviceInfo.lastSignal='K0000'
-                    }else{
-                        deviceInfo.lastSignal='K'
-                    }
-                    logger.info(`尝试重新发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
-                    this.sendKeepAliveToDevice(deviceId);  // 如果设备状态是待确认,则重发上次的信号
-                }
-            }, 10000);  // 每2秒重试一次
-        }catch(err){
-            logger.error(`${deviceId}设备重试机制出错:${err}`)
-        }
+        if (!deviceInfo) return;
+
+        this.stopRetryMechanism(deviceId);
+        deviceInfo.retryInterval = setInterval(() => {
+            if (deviceInfo.status === 'pending' || deviceInfo.status === 'invalid') {
+                // 握手阶段:在 'K' 和 'K0000' 之间切换(你的原始逻辑)
+                deviceInfo.lastSignal = deviceInfo.lastSignal === 'K' ? 'K0000' : 'K';
+                logger.info(`重试发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
+                this.sendKeepAliveToDevice(deviceId);
+            }
+        }, RETRY_INTERVAL_MS);
     }
 
-    // 停止针对每个设备的重试机制
     stopRetryMechanism(deviceId) {
         const deviceInfo = this.devices.get(deviceId);
-        if (deviceInfo.retryInterval) {
-            clearInterval(deviceInfo.retryInterval);  // 清除重试定时器
+        if (deviceInfo?.retryInterval) {
+            clearInterval(deviceInfo.retryInterval);
             deviceInfo.retryInterval = null;
         }
     }
 
-    // 启动针对每个设备的定时发送保持连接信号的机制
     startKeepAlive(deviceId, signal) {
         const deviceInfo = this.devices.get(deviceId);
-        if (deviceInfo.keepAliveInterval) clearInterval(deviceInfo.keepAliveInterval);  // 清除之前的定时器
-        deviceInfo.lastSignal = signal;  // 更新最后发送的信号
+        if (!deviceInfo) return;
+
+        this.stopKeepAlive(deviceId);
+        deviceInfo.lastSignal = signal;
         deviceInfo.keepAliveInterval = setInterval(() => {
-            this.sendKeepAliveToDevice(deviceId);  // 每60秒发送一次保持连接信号
-        }, 60000);  // 修改为每60秒发送一次
+            this.sendKeepAliveToDevice(deviceId);
+        }, KEEP_ALIVE_INTERVAL_MS);
     }
 
-    // 停止针对每个设备的定时发送保持连接信号的机制
     stopKeepAlive(deviceId) {
         const deviceInfo = this.devices.get(deviceId);
-        if (deviceInfo.keepAliveInterval) {
-            clearInterval(deviceInfo.keepAliveInterval);  // 清除定时器
+        if (deviceInfo?.keepAliveInterval) {
+            clearInterval(deviceInfo.keepAliveInterval);
             deviceInfo.keepAliveInterval = null;
         }
     }
 
-    // 向特定设备发送保持连接信号
     sendKeepAliveToDevice(deviceId) {
         const deviceInfo = this.devices.get(deviceId);
-        if (!deviceInfo) return;
-        deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`);  // 根据上次成功信号发送 'K' 或 'K0000'
-        logger.info(`发送保持连接信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
-    }
+        if (!deviceInfo || !deviceInfo.socket.writable) return;
 
-    // 处理来自客户端的数据
-    handleData(deviceId, message) {
-        try{
-            const deviceInfo = this.devices.get(deviceId);
-            if (!deviceInfo) return;
-            logger.info(`接收到来自 ${deviceId} 的消息: ${message}`);
-            deviceInfo.lastAck = Date.now();  // 更新最后收到 ACK 的时间
-            const masData= toModel(message)
-            deviceInfo.iotDeviceNo=masData.n
-            deviceInfo.masData= masData
-            // 如果已经是有效的接受数据了  就不用在启用定时发送了  直接改状体
-            if(deviceInfo.status!=='valid'){
-                this.emit('validResponse', deviceId);  // 触发 validResponse 事件
-                deviceInfo.status = 'valid';  // 设备状态更新为有效
-                console.log(`停止重试${deviceId}`)
-                this.stopRetryMechanism(deviceId);  // 停止重试机制
-                // 根据上次发送的信号启动相应的定时发送机制
-                this.startKeepAlive(deviceId, deviceInfo.lastSignal);  // 启动定时发送 'K0000' 的机制
-                logger.info(`${deviceId}启动定时发送 ${deviceInfo.lastSignal} 的机制60秒一次`);
-                this.registerDevice(deviceId)
-            }else{
-                logger.info('注册成功后第二次就发送数据到阿里云')
-                this.postPropsToDevice(deviceId)
-
-            }
-        }catch(err){
-            logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`)
+        try {
+            deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`);
+            logger.info(`发送信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
+        } catch (err) {
+            logger.error(`发送信号失败 ${deviceId}:`, err.message);
+            this.removeDevice(deviceId);
         }
-        
-       
     }
-    // 处理设备上阿里云
+
+    async handleData(deviceId, message) {
+        const deviceInfo = this.devices.get(deviceId);
+        if (!deviceInfo) return;
+
+        deviceInfo.lastAck = Date.now();
+
+        try {
+            const masData = toModel(message);
+            deviceInfo.iotDeviceNo = masData.n;
+            deviceInfo.masData = masData;
+
+            if (deviceInfo.status !== 'valid') {
+                deviceInfo.status = 'valid';
+                this.emit('validResponse', deviceId);
+                logger.info(`${deviceId} 首次有效响应,停止重试`);
+                this.stopRetryMechanism(deviceId);
+                // 成功后固定使用 'K0000' 保活(建议)
+                this.startKeepAlive(deviceId, deviceInfo.lastSignal);
+                await this.registerDevice(deviceId);
+            } else {
+                logger.info(`${deviceId} 已注册,直接上报数据到阿里云`);
+                this.postPropsToDevice(deviceId);
+                // this.onDeviceDataReceived(deviceId, masData);
+            }
+        } catch (err) {
+            logger.error(`${deviceId} 处理消息出错:`, err.message);
+        }
+    }
+
     async registerDevice(deviceId) {
         const deviceInfo = this.devices.get(deviceId);
-        logger.info(`${deviceId}注册阿里云iot设备`);
-        try{
-            if (!deviceInfo.iotDevice) {
-                // 请求三元组信息
-                logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`);
-                const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo)
-                logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data));
-                if(data&&data.productKey&&data.deviceName&&data.deviceSecret){
-                    // 设备配置信息
-                    const productKey = data.productKey; // 替换为你的产品密钥
-                    const deviceName = data.deviceName; // 替换为你的设备名称
-                    const deviceSecret = data.deviceSecret; // 替换为你的设备密钥
-                    // 创建设备实例aliyunIot.device(devConfig);
-                    deviceInfo.iotDevice = aliyunIot.device({
-                        ProductKey: productKey,
-                        DeviceName: deviceName,
-                        DeviceSecret: deviceSecret
-                    });
-                    // 监听设备连接状态变化
-                    deviceInfo.iotDevice.on('connect', () => {
-                        // logger.info(`${deviceId} 连接到阿里云IoT平台成功`);
-                    });
-                
-                    deviceInfo.iotDevice.on('error', (err) => {
-                        logger.info(`${deviceId} 设备连接到阿里云IoT平台错误:`, err);
-                    });
-                }
-                
+        if (!deviceInfo || deviceInfo.iotDevice) return;
+
+        try {
+            logger.info(`${deviceId} 请求三元组,设备号: ${deviceInfo.iotDeviceNo}`);
+            const { data } = await getDeviceSYZ(deviceInfo.iotDeviceNo);
+            logger.info(`${deviceId} 三元组返回: ${JSON.stringify(data)}`);
+            const model=data.data
+            if (model?.productKey && model?.deviceName && model?.deviceSecret) {
+                deviceInfo.iotDevice = aliyunIot.device({
+                    ProductKey: model.productKey,
+                    DeviceName: model.deviceName,
+                    DeviceSecret: model.deviceSecret
+                });
+
+                deviceInfo.iotDevice.on('connect', () => {
+                    logger.info(`${deviceId} 连接阿里云 IoT 成功`);
+                });
+
+                deviceInfo.iotDevice.on('error', (err) => {
+                    logger.error(`${deviceId} 阿里云 IoT 错误:`, err.message);
+                });
             }
-        }
-        catch(err){
-            logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`)
+        } catch (err) {
+            logger.error(`${deviceId} 阿里云注册失败:`, err.message);
         }
     }
-    //发送消息到iot云端
+
     postPropsToDevice(deviceId) {
-        try{
-            const deviceInfo = this.devices.get(deviceId);
-            if (deviceInfo.iotDevice) {
-              // 上报属性数据
-              const props = deviceInfo.masData
+        const deviceInfo = this.devices.get(deviceId);
+        if (!deviceInfo?.iotDevice || !deviceInfo.masData) return;
 
-              onDeviceDataReceived(deviceInfo.masData); // 调用函数处理接收到的数据
-              deviceInfo.iotDevice.postProps(props, (res) => {
-                if (res.message==='success') {
-                    logger.info(`${deviceId} 上报属性成功:`, res);
-                  
-                } else {
-                    logger.error(`${deviceId} 上报属性失败:`, res);
-                }
-              });
+        const props = deviceInfo.masData;
+        deviceInfo.iotDevice.postProps(props, (res) => {
+            if (res?.message === 'success') {
+                logger.info(`${deviceId} 上报属性成功`);
+            } else {
+                logger.error(`${deviceId} 上报属性失败:`, res?.message || 'unknown');
             }
-        }catch(err){
-            logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err);
-        }
-       
+        });
     }
-    
 
-    // 移除设备连接
+    onDeviceDataReceived(deviceId, data) {
+        const topic = `touxiji/${data.n}`;
+        const payload = JSON.stringify({
+            ...data,
+            timestamp: new Date().toISOString()
+        });
+
+        try {
+            logger.info(`发布 MQTT 消息到主题 ${topic}: ${payload}`);
+            publishMessage(topic, payload);
+        } catch (error) {
+            logger.error(`MQTT 发布失败 ${topic}:`, error.message);
+        }
+    }
+
     removeDevice(deviceId) {
-        if (this.devices.has(deviceId)) {
-            this.stopRetryMechanism(deviceId);  // 停止重试机制
-            this.stopKeepAlive(deviceId);  // 停止定时发送保持连接信号的机制
-            this.devices.delete(deviceId);  // 从设备列表中移除
-            logger.info(`移除设备: ${deviceId}`);
+        const deviceInfo = this.devices.get(deviceId);
+        if (!deviceInfo) return;
+
+        this.stopRetryMechanism(deviceId);
+        this.stopKeepAlive(deviceId);
+
+        if (deviceInfo.socket && !deviceInfo.socket.destroyed) {
+            deviceInfo.socket.destroy();
         }
+
+        if (deviceInfo.iotDevice) {
+            deviceInfo.iotDevice.end(); // 安全关闭 IoT 连接
+        }
+
+        this.devices.delete(deviceId);
+        logger.info(`设备已移除: ${deviceId}`);
     }
 }
 
-// 创建服务器实例
-const server = net.createServer((socket) => {
-    const manager = new DeviceManager();
-    manager.handleDevice(socket);
-    manager.on('validResponse', (deviceId) => {
-        logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`);
-        // 可以在这里添加更多处理逻辑
-    });
-    manager.on('invalidResponse', (deviceId) => {
-        logger.info(`设备 ${deviceId} 响应了无效或没有响应`);
-        // 可以在这里添加更多处理逻辑
-    });
-   
-    // 处理错误
-    socket.on('error', (err) => {
-        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
-        logger.error(`与 ${deviceId} 的套接字发生错误:`, err);
-        manager.removeDevice(deviceId);  // 移除设备连接
-    });
-
-    // 客户端断开时触发
-    socket.on('end', () => {
-        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
-        logger.info(`设备断开连接: ${deviceId}`);
-        manager.removeDevice(deviceId);  // 移除设备连接
-    });
-});
-
-// 绑定端口并开始监听
-server.listen(961, () => {
-    logger.info('socket服务已经启动端口号:961');
-
-});
-const getDeviceSYZ=async (devcieNo)=>{
-    try{
-        const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo)
-        if(data){
-            return data
-        }else{
-            return ''
-        }
-
-    }catch(err){
-        logger.error(` ${devcieNo} 获取三元组信息错误:`, err);
-        throw new CustomError("新错误", error);
-        
-    }
-}
-// 接收到的数据
-const  onDeviceDataReceived=(data)=> {
-    const topic = `touxiji/${data.n}`;
-    const payload = JSON.stringify({
-      ...data,
-      timestamp: new Date().toISOString()
-    });
+// ========== 辅助函数 ==========
+async function getDeviceSYZ(deviceNo) {
     try {
-        logger.info(`发布消息到主题 ${topic}: ${payload}`);
-        publishMessage(topic, payload);
-    } catch (error) {  
-        logger.error(`发布消息到主题 ${topic} 失败:`, error);
-        throw new CustomError("发布消息失败", error);
+        const { data, message } = await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret', deviceNo);
+        return { data };
+    } catch (err) {
+        logger.error(`${deviceNo} 获取三元组失败:`, err.message);
+        throw new CustomError("获取三元组失败", err);
+    }
+}
 
-     }
-   
-  }
-  
\ No newline at end of file
+// ========== 启动服务器 ==========
+const manager = new DeviceManager(); // ✅ 单例!
+
+manager.on('validResponse', (deviceId) => {
+    logger.info(`设备 ${deviceId} 成功完成握手`);
+});
+
+manager.on('invalidResponse', (deviceId) => {
+    logger.info(`设备 ${deviceId} 无效响应`);
+});
+
+const server = net.createServer((socket) => {
+    manager.handleDevice(socket);
+});
+
+const PORT = process.env.PORT || 10961;
+server.listen(PORT, () => {
+    logger.info(`Socket 服务已启动,监听端口: ${PORT}`);
+});
\ No newline at end of file

--
Gitblit v1.8.0