const net = require('net'); // 引入 Node.js 的 net 模块用于创建网络服务器和客户端 const { crc16 } = require('./crc16'); // 引入上面的文件 const logger = require('./logger'); // 引入上面创建的logger const { toModel,format } = require('./modeTool'); // 引入数据解析工具 const EventEmitter = require('events'); // 引入 Node.js 的 events 模块,用于事件驱动编程 const aliyunIot = require('aliyun-iot-device-sdk'); const { getAliyunDeviceSecret } = require('./api'); const { publishMessage } = require('./mqttClient'); // 定义 DeviceManager 类来管理设备连接 class DeviceManager extends EventEmitter { constructor() { super(); // 调用父类构造函数 this.devices = new Map(); // 使用 Map 存储设备连接及其状态 } // 处理新设备连接 handleDevice(socket) { const deviceId = socket.remoteAddress + ':' + socket.remotePort; // 根据 IP 和端口生成唯一设备ID try{ logger.info(`建立新连接${deviceId}`) this.devices.set(deviceId, { socket, lastAck: Date.now(), // 记录最后一次收到 ACK 的时间 status: 'pending', // 初始状态为 pending(等待确认) retryInterval: null, // 重试机制定时器ID keepAliveInterval: null, // 定时发送保持连接信号的定时器ID lastSignal: 'K', // 最后发送的信号,默认为 'K' iotDevice:null, masData:'', iotDeviceNo:'' // SOCKET连接的设备号 }); // 每个客户端连接都有一个独立的缓冲区 buffer。 let buffer = ''; // 监听客户端发送的数据 socket.on('data', (data) => { const buffer = Buffer.from(data); // 检查数据长度是否符合预期 if (buffer.length < 4) { console.error('Invalid packet length'); return; } // 提取头信息和 CRC16 const header = buffer.slice(0, 2).toString('hex'); const receivedCrc = buffer.readUInt16LE(2); // 假设 CRC 在第3-4字节 const payload = buffer.slice(4); // 计算 CRC16 const calculatedCrc = crc16(0x0000, payload); if (receivedCrc === calculatedCrc&&header=='aabb') { logger.info(`来自 ${deviceId} 的数据 CRC 校验通过`); // CRC 校验通过,处理数据 this.handleData(deviceId, buffer); // 处理数据 }else{ logger.error(`来自 ${deviceId} 的数据 CRC 校验失败`); } }); }catch(err){ logger.error(`${deviceId}建立连接异常出错:${err}`) } } // 处理来自客户端的数据 handleData(deviceId, message) { try{ const deviceInfo = this.devices.get(deviceId); if (!deviceInfo) return; logger.info(`接收到来自 ${deviceId} 的消息: ${message.toString('hex')}`); const masData= toModel(message) deviceInfo.iotDeviceNo=masData.设备编号前16位+masData.设备编号后16位.toString() // 设备号 deviceInfo.masData = { deviceType:'宝特莱D800', suedtime:format(new Date()), // 使用当前时间作为使用时间 xycsML: masData.血氧参数脉率, // 血氧参数脉率 xycsSOP2: masData.血氧参数Spo2值, // 血氧参数 Spo2 值 xyjpjxy: masData.血压计平均血压, // 血压计平均血压 xdxrl: masData.相对血容量, // 相对血容量 n: deviceInfo.iotDeviceNo, // 设备号 qcl: masData.清除率, // 清除率 KTV: masData.KtV, // Kt/V sbzt:masData.模式, // 设备状态 mb:masData.血压计脉搏, // 血压计脉搏 szy: masData.血压计舒张血压, // 血压计舒张血压 ssy: masData.血压计收缩血压, // 血压计收缩血压 hybyl: masData.后已补液量, // 后已补液量 hbyzl: masData.后补液总量, // 后补液总量 qybyl: masData.前已补液量, // 前已补液量 qbyzl: masData.前补液总量, // 前补液总量 gszjl: masData.追加量, // 追加量 gsdzsh: masData.肝素停止时间, // 肝素停止时间 E:masData.肝素速率, // 肝素速率 F: masData.透析液实际温度, // 透析液实际温度 L: masData.透析液实际流量, // 透析液实际流量 G:masData.电导度,// 电导度 J:masData.跨膜压, // 跨膜压 o:masData.动脉压, // 动脉压 H:masData.静脉压, // 静脉压 D:masData.实际血流量, // 实际血流量 SDXYLS:masData.设定血流量, // 设定血流量 A:masData.超滤目标, // 超滤目标 B:masData.当前已超滤量, // 当前已超滤量 C:masData.超滤率, // 超滤率 sysj:masData.治疗剩余时间, // 治疗剩余时间 }; // 保存解析后的数据 console.log(deviceInfo.masData) if(deviceInfo.status!=='valid'){ deviceInfo.status = 'valid'; // 设备状态更新为有效 this.registerDevice(deviceId) }else{ logger.info('注册成功后第二次就发送数据到阿里云') this.postPropsToDevice(deviceId) } }catch(err){ logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`) } } // 处理设备上阿里云 /** * 用设备编号换阿里云注册信息上云 * @param {*} deviceId */ async registerDevice(deviceId) { const deviceInfo = this.devices.get(deviceId); logger.info(`${deviceId}注册阿里云iot设备`); try{ if (!deviceInfo.iotDevice) { // 请求三元组信息 logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`); const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo) logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data)); if(data&&data.productKey&&data.deviceName&&data.deviceSecret){ // 设备配置信息 const productKey = data.productKey; // 替换为你的产品密钥 const deviceName = data.deviceName; // 替换为你的设备名称 const deviceSecret = data.deviceSecret; // 替换为你的设备密钥 // 创建设备实例aliyunIot.device(devConfig); deviceInfo.iotDevice = aliyunIot.device({ ProductKey: productKey, DeviceName: deviceName, DeviceSecret: deviceSecret }); // 监听设备连接状态变化 deviceInfo.iotDevice.on('connect', () => { }); deviceInfo.iotDevice.on('error', (err) => { logger.info(`${deviceId} 设备连接到阿里云IoT平台错误:`, err); }); } } } catch(err){ logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`) } } //发送消息到iot云端 postPropsToDevice(deviceId) { try{ const deviceInfo = this.devices.get(deviceId); if (deviceInfo.iotDevice) { // 上报属性数据 const props = deviceInfo.masData onDeviceDataReceived(deviceInfo.masData); // 调用函数处理接收到的数据 deviceInfo.iotDevice.postProps(props, (res) => { if (res.message==='success') { logger.info(`${deviceId} 上报属性成功:`, res); } else { logger.error(`${deviceId} 上报属性失败:`, res); } }); } }catch(err){ logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err); } } // 移除设备连接 removeDevice(deviceId) { if (this.devices.has(deviceId)) { this.devices.delete(deviceId); // 从设备列表中移除 logger.info(`移除设备: ${deviceId}`); } } } // 创建服务器实例 const server = net.createServer((socket) => { const manager = new DeviceManager(); manager.handleDevice(socket); manager.on('validResponse', (deviceId) => { logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`); // 可以在这里添加更多处理逻辑 }); manager.on('invalidResponse', (deviceId) => { logger.info(`设备 ${deviceId} 响应了无效或没有响应`); // 可以在这里添加更多处理逻辑 }); // 处理错误 socket.on('error', (err) => { const deviceId = socket.remoteAddress + ':' + socket.remotePort; logger.error(`与 ${deviceId} 的套接字发生错误:`, err); manager.removeDevice(deviceId); // 移除设备连接 }); // 客户端断开时触发 socket.on('end', () => { const deviceId = socket.remoteAddress + ':' + socket.remotePort; logger.info(`设备断开连接: ${deviceId}`); manager.removeDevice(deviceId); // 移除设备连接 }); }); // 绑定端口并开始监听 server.listen(961, () => { logger.info('socket服务已经启动端口号:961'); }); const getDeviceSYZ=async (devcieNo)=>{ try{ const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo) if(data){ return data }else{ return '' } }catch(err){ logger.error(` ${devcieNo} 获取三元组信息错误:`, err); throw new CustomError("新错误", error); } } // 接收到的数据 const onDeviceDataReceived=(data)=> { const topic = `touxiji/${data.n}`; const payload = JSON.stringify({ ...data, timestamp: new Date().toISOString() }); try { logger.info(`发布消息到主题 ${topic}: ${payload}`); publishMessage(topic, payload); } catch (error) { logger.error(`发布消息到主题 ${topic} 失败:`, error); throw new CustomError("发布消息失败", error); } }