const net = require('net'); // 引入 Node.js 的 net 模块用于创建网络服务器和客户端 const logger = require('./logger'); // 引入上面创建的logger const EventEmitter = require('events'); // 引入 Node.js 的 events 模块,用于事件驱动编程 const aliyunIot = require('aliyun-iot-device-sdk'); const { getAliyunDeviceSecret } = require('./api'); const toModel = require('./Strholp'); // 定义 DeviceManager 类来管理设备连接 class DeviceManager extends EventEmitter { constructor() { super(); // 调用父类构造函数 this.devices = new Map(); // 使用 Map 存储设备连接及其状态 } // 处理新设备连接 handleDevice(socket) { const deviceId = socket.remoteAddress + ':' + socket.remotePort; // 根据 IP 和端口生成唯一设备ID try{ logger.info(`建立新连接${deviceId}`) this.devices.set(deviceId, { socket, lastAck: Date.now(), // 记录最后一次收到 ACK 的时间 status: 'pending', // 初始状态为 pending(等待确认) retryInterval: null, // 重试机制定时器ID keepAliveInterval: null, // 定时发送保持连接信号的定时器ID lastSignal: 'K', // 最后发送的信号,默认为 'K' iotDevice:null, masData:'', iotDeviceNo:'' // SOCKET连接的设备号 }); // 发送初始 'K' this.sendKeepAliveToDevice(deviceId); // 启动重试机制 this.startRetryMechanism(deviceId); // 每个客户端连接都有一个独立的缓冲区 buffer。 let buffer = ''; // 监听客户端发送的数据 socket.on('data', (chunk) => { buffer += chunk.toString('ascii'); // 将新的数据片段添加到缓冲区 console.log(buffer.length,'-------------------') while (true) { console.log(buffer) const messageBengIndex=buffer.indexOf('K1'); const messageEndIndex = buffer.indexOf('\r\n'); // 假设消息以 '\r\n' 结束 //如果没有K1说明没有开始符号 直接放弃之前接受的数据 如果拼接字符串太长还没有结束符号也丢弃 if(messageBengIndex===-1||buffer.length>300){ buffer='' break } if (messageBengIndex===-1||messageEndIndex === -1) break; // 如果没有找到结束符,跳出循环等待更多数据 const completeMessage = buffer.substring(messageBengIndex, messageEndIndex).trim(); // 提取完整消息 buffer = ''; // 移除已处理的部分 logger.info(`${deviceId}接收到的完整消息`) // 解析数据 this.handleData(deviceId, completeMessage.toString()); // 处理接收到的数据 } }); }catch(err){ logger.error(`${deviceId}建立连接异常出错:${err}`) } } // 启动针对每个设备的重试机制 startRetryMechanism(deviceId) { const deviceInfo = this.devices.get(deviceId); try{ if (deviceInfo.retryInterval) clearInterval(deviceInfo.retryInterval); // 清除之前的重试定时器 deviceInfo.retryInterval = setInterval(() => { if (deviceInfo.status === 'pending'||deviceInfo.status==='invalid') { // 来回切换发送,命令 if(deviceInfo.lastSignal==='K'){ deviceInfo.lastSignal='K0000' }else{ deviceInfo.lastSignal='K' } logger.info(`尝试重新发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`); this.sendKeepAliveToDevice(deviceId); // 如果设备状态是待确认,则重发上次的信号 } }, 5000); // 每2秒重试一次 }catch(err){ logger.error(`${deviceId}设备重试机制出错:${err}`) } } // 停止针对每个设备的重试机制 stopRetryMechanism(deviceId) { const deviceInfo = this.devices.get(deviceId); if (deviceInfo.retryInterval) { clearInterval(deviceInfo.retryInterval); // 清除重试定时器 deviceInfo.retryInterval = null; } } // 启动针对每个设备的定时发送保持连接信号的机制 startKeepAlive(deviceId, signal) { const deviceInfo = this.devices.get(deviceId); if (deviceInfo.keepAliveInterval) clearInterval(deviceInfo.keepAliveInterval); // 清除之前的定时器 deviceInfo.lastSignal = signal; // 更新最后发送的信号 deviceInfo.keepAliveInterval = setInterval(() => { this.sendKeepAliveToDevice(deviceId); // 每60秒发送一次保持连接信号 }, 60000); // 修改为每60秒发送一次 } // 停止针对每个设备的定时发送保持连接信号的机制 stopKeepAlive(deviceId) { const deviceInfo = this.devices.get(deviceId); if (deviceInfo.keepAliveInterval) { clearInterval(deviceInfo.keepAliveInterval); // 清除定时器 deviceInfo.keepAliveInterval = null; } } // 向特定设备发送保持连接信号 sendKeepAliveToDevice(deviceId) { const deviceInfo = this.devices.get(deviceId); if (!deviceInfo) return; deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`); // 根据上次成功信号发送 'K' 或 'K0000' logger.info(`发送保持连接信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`); } // 处理来自客户端的数据 handleData(deviceId, message) { try{ const deviceInfo = this.devices.get(deviceId); if (!deviceInfo) return; logger.info(`接收到来自 ${deviceId} 的消息: ${message}`); deviceInfo.lastAck = Date.now(); // 更新最后收到 ACK 的时间 const masData= toModel(message) deviceInfo.iotDeviceNo=masData.n deviceInfo.masData= masData // 如果已经是有效的接受数据了 就不用在启用定时发送了 直接改状体 if(deviceInfo.status!=='valid'){ this.emit('validResponse', deviceId); // 触发 validResponse 事件 deviceInfo.status = 'valid'; // 设备状态更新为有效 console.log(`停止重试${deviceId}`) this.stopRetryMechanism(deviceId); // 停止重试机制 // 根据上次发送的信号启动相应的定时发送机制 this.startKeepAlive(deviceId, deviceInfo.lastSignal); // 启动定时发送 'K0000' 的机制 logger.info(`${deviceId}启动定时发送 'K' 的机制60秒一次`); this.registerDevice(deviceId) }else{ logger.info('注册成功后第二次就发送数据到阿里云') this.postPropsToDevice(deviceId) } }catch(err){ logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`) } } // 处理设备上阿里云 async registerDevice(deviceId) { const deviceInfo = this.devices.get(deviceId); logger.info(`${deviceId}注册阿里云iot设备`); try{ if (!deviceInfo.iotDevice) { // 请求三元组信息 logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`); const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo) logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data)); if(data&&data.productKey&&data.deviceName&&data.deviceSecret){ // 设备配置信息 const productKey = data.productKey; // 替换为你的产品密钥 const deviceName = data.deviceName; // 替换为你的设备名称 const deviceSecret = data.deviceSecret; // 替换为你的设备密钥 // 创建设备实例aliyunIot.device(devConfig); deviceInfo.iotDevice = aliyunIot.device({ ProductKey: productKey, DeviceName: deviceName, DeviceSecret: deviceSecret }); // 监听设备连接状态变化 deviceInfo.iotDevice.on('connect', () => { logger.info(`${deviceId} 连接到阿里云IoT平台成功`); }); deviceInfo.iotDevice.on('error', (err) => { logger.info(`${deviceId} 设备连接错误:`, err); }); } } } catch(err){ logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`) } } //发送消息到iot云端 postPropsToDevice(deviceId) { try{ const deviceInfo = this.devices.get(deviceId); if (deviceInfo.iotDevice) { // 上报属性数据 const props = deviceInfo.masData deviceInfo.iotDevice.postProps(props, (res) => { if (res.message==='success') { logger.info(`${deviceId} 上报属性成功:`, res); } else { logger.error(`${deviceId} 上报属性失败:`, res); } }); } }catch(err){ logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err); } } // 移除设备连接 removeDevice(deviceId) { if (this.devices.has(deviceId)) { this.stopRetryMechanism(deviceId); // 停止重试机制 this.stopKeepAlive(deviceId); // 停止定时发送保持连接信号的机制 this.devices.delete(deviceId); // 从设备列表中移除 logger.info(`移除设备: ${deviceId}`); } } } // 创建服务器实例 const server = net.createServer((socket) => { const manager = new DeviceManager(); manager.handleDevice(socket); manager.on('validResponse', (deviceId) => { logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`); // 可以在这里添加更多处理逻辑 }); manager.on('invalidResponse', (deviceId) => { logger.info(`设备 ${deviceId} 响应了无效或没有响应`); // 可以在这里添加更多处理逻辑 }); // 处理错误 socket.on('error', (err) => { const deviceId = socket.remoteAddress + ':' + socket.remotePort; logger.error(`与 ${deviceId} 的套接字发生错误:`, err); manager.removeDevice(deviceId); // 移除设备连接 }); // 客户端断开时触发 socket.on('end', () => { const deviceId = socket.remoteAddress + ':' + socket.remotePort; logger.info(`设备断开连接: ${deviceId}`); manager.removeDevice(deviceId); // 移除设备连接 }); }); // 绑定端口并开始监听 server.listen(961, () => { logger.info('socket服务已经启动端口号:961'); }); const getDeviceSYZ=async (devcieNo)=>{ try{ const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo) if(data){ return data }else{ return '' } }catch(err){ logger.error(` ${devcieNo} 获取三元组信息错误:`, err); throw new CustomError("新错误", error); } }