| | |
| | | const net = require('net'); // 引入 Node.js 的 net 模块用于创建网络服务器和客户端 |
| | | const logger = require('./logger'); // 引入上面创建的logger |
| | | const EventEmitter = require('events'); // 引入 Node.js 的 events 模块,用于事件驱动编程 |
| | | const net = require('net'); |
| | | const logger = require('./logger'); |
| | | const EventEmitter = require('events'); |
| | | const aliyunIot = require('aliyun-iot-device-sdk'); |
| | | const { getAliyunDeviceSecret } = require('./api'); |
| | | const toModel = require('./Strholp'); |
| | | const { publishMessage } = require('./mqttClient'); |
| | | // 定义 DeviceManager 类来管理设备连接 |
| | | |
| | | // ========== 自定义错误类(可选)========== |
| | | class CustomError extends Error { |
| | | constructor(message, cause) { |
| | | super(message); |
| | | this.cause = cause; |
| | | } |
| | | } |
| | | |
| | | // ========== 常量配置 ========== |
| | | const MAX_BUFFER_SIZE = 500; // 缓冲区最大长度 |
| | | const RETRY_INTERVAL_MS = 10000; // 重试间隔 10s |
| | | const KEEP_ALIVE_INTERVAL_MS = 60000; // 保活间隔 60s |
| | | const DEVICE_TIMEOUT_MS = 120000; // 设备无响应超时 2分钟 |
| | | |
| | | // ========== DeviceManager 类 ========== |
| | | class DeviceManager extends EventEmitter { |
| | | constructor() { |
| | | super(); // 调用父类构造函数 |
| | | this.devices = new Map(); // 使用 Map 存储设备连接及其状态 |
| | | } |
| | | // 处理新设备连接 |
| | | handleDevice(socket) { |
| | | const deviceId = socket.remoteAddress + ':' + socket.remotePort; // 根据 IP 和端口生成唯一设备ID |
| | | try{ |
| | | logger.info(`建立新连接${deviceId}`) |
| | | this.devices.set(deviceId, { |
| | | socket, |
| | | lastAck: Date.now(), // 记录最后一次收到 ACK 的时间 |
| | | status: 'pending', // 初始状态为 pending(等待确认) |
| | | retryInterval: null, // 重试机制定时器ID |
| | | keepAliveInterval: null, // 定时发送保持连接信号的定时器ID |
| | | lastSignal: 'K', // 最后发送的信号,默认为 'K' |
| | | iotDevice:null, |
| | | masData:'', |
| | | iotDeviceNo:'' // SOCKET连接的设备号 |
| | | }); |
| | | // 发送初始 'K' |
| | | this.sendKeepAliveToDevice(deviceId); |
| | | super(); |
| | | this.devices = new Map(); |
| | | |
| | | // 启动重试机制 |
| | | this.startRetryMechanism(deviceId); |
| | | |
| | | // 每个客户端连接都有一个独立的缓冲区 buffer。 |
| | | let buffer = ''; |
| | | // 监听客户端发送的数据 |
| | | socket.on('data', (chunk) => { |
| | | buffer += chunk.toString('ascii'); // 将新的数据片段添加到缓冲区 |
| | | console.log(buffer.length,'-------------------') |
| | | while (true) { |
| | | console.log(buffer) |
| | | const messageBengIndex=buffer.indexOf('K1'); |
| | | const messageEndIndex = buffer.indexOf('\r\n'); // 假设消息以 '\r\n' 结束 |
| | | //如果没有K1说明没有开始符号 直接放弃之前接受的数据 如果拼接字符串太长还没有结束符号也丢弃 |
| | | if(messageBengIndex===-1||buffer.length>300){ |
| | | buffer='' |
| | | break |
| | | } |
| | | if (messageBengIndex===-1||messageEndIndex === -1) break; // 如果没有找到结束符,跳出循环等待更多数据 |
| | | |
| | | const completeMessage = buffer.substring(messageBengIndex, messageEndIndex).trim(); // 提取完整消息 |
| | | buffer = ''; // 移除已处理的部分 |
| | | logger.info(`${deviceId}接收到的完整消息`) |
| | | // 解析数据 |
| | | this.handleData(deviceId, completeMessage.toString()); // 处理接收到的数据 |
| | | // 启动全局超时清理定时器 |
| | | setInterval(() => { |
| | | const now = Date.now(); |
| | | for (const [deviceId, info] of this.devices.entries()) { |
| | | if (now - info.lastAck > DEVICE_TIMEOUT_MS) { |
| | | logger.warn(`设备 ${deviceId} 超时(${DEVICE_TIMEOUT_MS / 1000}s 无响应),自动断开`); |
| | | this.removeDevice(deviceId); |
| | | } |
| | | |
| | | }); |
| | | }catch(err){ |
| | | logger.error(`${deviceId}建立连接异常出错:${err}`) |
| | | } |
| | | } |
| | | }, 30000); // 每30秒检查一次 |
| | | } |
| | | // 启动针对每个设备的重试机制 |
| | | |
| | | handleDevice(socket) { |
| | | const deviceId = socket.remoteAddress + ':' + socket.remotePort; |
| | | logger.info(`建立新连接: ${deviceId}`); |
| | | |
| | | const deviceInfo = { |
| | | socket, |
| | | lastAck: Date.now(), |
| | | status: 'pending', // pending → valid → registered |
| | | retryInterval: null, |
| | | keepAliveInterval: null, |
| | | lastSignal: 'K', // 初始握手信号 |
| | | iotDevice: null, |
| | | masData: '', |
| | | iotDeviceNo: '' |
| | | }; |
| | | |
| | | this.devices.set(deviceId, deviceInfo); |
| | | |
| | | // 发送初始握手信号 |
| | | this.sendKeepAliveToDevice(deviceId); |
| | | this.startRetryMechanism(deviceId); |
| | | |
| | | let buffer = ''; |
| | | |
| | | socket.on('data', (chunk) => { |
| | | buffer += chunk.toString('ascii'); |
| | | |
| | | while (true) { |
| | | const startIdx = buffer.indexOf('K1'); |
| | | if (startIdx === -1) { |
| | | // 没有起始标志,但保留数据(防分包) |
| | | if (buffer.length > MAX_BUFFER_SIZE) buffer = ''; |
| | | break; |
| | | } |
| | | |
| | | const endIdx = buffer.indexOf('\r\n', startIdx); |
| | | if (endIdx === -1) { |
| | | // 结束符未到,保留从 K1 开始的数据 |
| | | if (buffer.length > MAX_BUFFER_SIZE) { |
| | | buffer = buffer.substring(startIdx); |
| | | } |
| | | break; |
| | | } |
| | | |
| | | const message = buffer.substring(startIdx, endIdx).trim(); |
| | | buffer = buffer.substring(endIdx + 2); // 移除已处理部分(含 \r\n) |
| | | |
| | | logger.info(`${deviceId} 接收到完整消息: ${message}`); |
| | | this.handleData(deviceId, message); |
| | | } |
| | | }); |
| | | |
| | | socket.on('error', (err) => { |
| | | logger.error(`设备 ${deviceId} 套接字错误:`, err.message); |
| | | this.removeDevice(deviceId); |
| | | }); |
| | | |
| | | socket.on('end', () => { |
| | | logger.info(`设备断开连接: ${deviceId}`); |
| | | this.removeDevice(deviceId); |
| | | }); |
| | | } |
| | | |
| | | startRetryMechanism(deviceId) { |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | try{ |
| | | if (deviceInfo.retryInterval) clearInterval(deviceInfo.retryInterval); // 清除之前的重试定时器 |
| | | deviceInfo.retryInterval = setInterval(() => { |
| | | if (deviceInfo.status === 'pending'||deviceInfo.status==='invalid') { |
| | | // 来回切换发送,命令 |
| | | if(deviceInfo.lastSignal==='K'){ |
| | | deviceInfo.lastSignal='K0000' |
| | | }else{ |
| | | deviceInfo.lastSignal='K' |
| | | } |
| | | logger.info(`尝试重新发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`); |
| | | this.sendKeepAliveToDevice(deviceId); // 如果设备状态是待确认,则重发上次的信号 |
| | | } |
| | | }, 10000); // 每2秒重试一次 |
| | | }catch(err){ |
| | | logger.error(`${deviceId}设备重试机制出错:${err}`) |
| | | } |
| | | if (!deviceInfo) return; |
| | | |
| | | this.stopRetryMechanism(deviceId); |
| | | deviceInfo.retryInterval = setInterval(() => { |
| | | if (deviceInfo.status === 'pending' || deviceInfo.status === 'invalid') { |
| | | // 握手阶段:在 'K' 和 'K0000' 之间切换(你的原始逻辑) |
| | | deviceInfo.lastSignal = deviceInfo.lastSignal === 'K' ? 'K0000' : 'K'; |
| | | logger.info(`重试发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`); |
| | | this.sendKeepAliveToDevice(deviceId); |
| | | } |
| | | }, RETRY_INTERVAL_MS); |
| | | } |
| | | |
| | | // 停止针对每个设备的重试机制 |
| | | stopRetryMechanism(deviceId) { |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | if (deviceInfo.retryInterval) { |
| | | clearInterval(deviceInfo.retryInterval); // 清除重试定时器 |
| | | if (deviceInfo?.retryInterval) { |
| | | clearInterval(deviceInfo.retryInterval); |
| | | deviceInfo.retryInterval = null; |
| | | } |
| | | } |
| | | |
| | | // 启动针对每个设备的定时发送保持连接信号的机制 |
| | | startKeepAlive(deviceId, signal) { |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | if (deviceInfo.keepAliveInterval) clearInterval(deviceInfo.keepAliveInterval); // 清除之前的定时器 |
| | | deviceInfo.lastSignal = signal; // 更新最后发送的信号 |
| | | if (!deviceInfo) return; |
| | | |
| | | this.stopKeepAlive(deviceId); |
| | | deviceInfo.lastSignal = signal; |
| | | deviceInfo.keepAliveInterval = setInterval(() => { |
| | | this.sendKeepAliveToDevice(deviceId); // 每60秒发送一次保持连接信号 |
| | | }, 60000); // 修改为每60秒发送一次 |
| | | this.sendKeepAliveToDevice(deviceId); |
| | | }, KEEP_ALIVE_INTERVAL_MS); |
| | | } |
| | | |
| | | // 停止针对每个设备的定时发送保持连接信号的机制 |
| | | stopKeepAlive(deviceId) { |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | if (deviceInfo.keepAliveInterval) { |
| | | clearInterval(deviceInfo.keepAliveInterval); // 清除定时器 |
| | | if (deviceInfo?.keepAliveInterval) { |
| | | clearInterval(deviceInfo.keepAliveInterval); |
| | | deviceInfo.keepAliveInterval = null; |
| | | } |
| | | } |
| | | |
| | | // 向特定设备发送保持连接信号 |
| | | sendKeepAliveToDevice(deviceId) { |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | if (!deviceInfo) return; |
| | | deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`); // 根据上次成功信号发送 'K' 或 'K0000' |
| | | logger.info(`发送保持连接信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`); |
| | | } |
| | | if (!deviceInfo || !deviceInfo.socket.writable) return; |
| | | |
| | | // 处理来自客户端的数据 |
| | | handleData(deviceId, message) { |
| | | try{ |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | if (!deviceInfo) return; |
| | | logger.info(`接收到来自 ${deviceId} 的消息: ${message}`); |
| | | deviceInfo.lastAck = Date.now(); // 更新最后收到 ACK 的时间 |
| | | const masData= toModel(message) |
| | | deviceInfo.iotDeviceNo=masData.n |
| | | deviceInfo.masData= masData |
| | | // 如果已经是有效的接受数据了 就不用在启用定时发送了 直接改状体 |
| | | if(deviceInfo.status!=='valid'){ |
| | | this.emit('validResponse', deviceId); // 触发 validResponse 事件 |
| | | deviceInfo.status = 'valid'; // 设备状态更新为有效 |
| | | console.log(`停止重试${deviceId}`) |
| | | this.stopRetryMechanism(deviceId); // 停止重试机制 |
| | | // 根据上次发送的信号启动相应的定时发送机制 |
| | | this.startKeepAlive(deviceId, deviceInfo.lastSignal); // 启动定时发送 'K0000' 的机制 |
| | | logger.info(`${deviceId}启动定时发送 ${deviceInfo.lastSignal} 的机制60秒一次`); |
| | | this.registerDevice(deviceId) |
| | | }else{ |
| | | logger.info('注册成功后第二次就发送数据到阿里云') |
| | | this.postPropsToDevice(deviceId) |
| | | |
| | | } |
| | | }catch(err){ |
| | | logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`) |
| | | try { |
| | | deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`); |
| | | logger.info(`发送信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`); |
| | | } catch (err) { |
| | | logger.error(`发送信号失败 ${deviceId}:`, err.message); |
| | | this.removeDevice(deviceId); |
| | | } |
| | | |
| | | |
| | | } |
| | | // 处理设备上阿里云 |
| | | |
| | | async handleData(deviceId, message) { |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | if (!deviceInfo) return; |
| | | |
| | | deviceInfo.lastAck = Date.now(); |
| | | |
| | | try { |
| | | const masData = toModel(message); |
| | | deviceInfo.iotDeviceNo = masData.n; |
| | | deviceInfo.masData = masData; |
| | | |
| | | if (deviceInfo.status !== 'valid') { |
| | | deviceInfo.status = 'valid'; |
| | | this.emit('validResponse', deviceId); |
| | | logger.info(`${deviceId} 首次有效响应,停止重试`); |
| | | this.stopRetryMechanism(deviceId); |
| | | // 成功后固定使用 'K0000' 保活(建议) |
| | | this.startKeepAlive(deviceId, deviceInfo.lastSignal); |
| | | await this.registerDevice(deviceId); |
| | | } else { |
| | | logger.info(`${deviceId} 已注册,直接上报数据到阿里云`); |
| | | this.postPropsToDevice(deviceId); |
| | | // this.onDeviceDataReceived(deviceId, masData); |
| | | } |
| | | } catch (err) { |
| | | logger.error(`${deviceId} 处理消息出错:`, err.message); |
| | | } |
| | | } |
| | | |
| | | async registerDevice(deviceId) { |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | logger.info(`${deviceId}注册阿里云iot设备`); |
| | | try{ |
| | | if (!deviceInfo.iotDevice) { |
| | | // 请求三元组信息 |
| | | logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`); |
| | | const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo) |
| | | logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data)); |
| | | if(data&&data.productKey&&data.deviceName&&data.deviceSecret){ |
| | | // 设备配置信息 |
| | | const productKey = data.productKey; // 替换为你的产品密钥 |
| | | const deviceName = data.deviceName; // 替换为你的设备名称 |
| | | const deviceSecret = data.deviceSecret; // 替换为你的设备密钥 |
| | | // 创建设备实例aliyunIot.device(devConfig); |
| | | deviceInfo.iotDevice = aliyunIot.device({ |
| | | ProductKey: productKey, |
| | | DeviceName: deviceName, |
| | | DeviceSecret: deviceSecret |
| | | }); |
| | | // 监听设备连接状态变化 |
| | | deviceInfo.iotDevice.on('connect', () => { |
| | | // logger.info(`${deviceId} 连接到阿里云IoT平台成功`); |
| | | }); |
| | | |
| | | deviceInfo.iotDevice.on('error', (err) => { |
| | | logger.info(`${deviceId} 设备连接到阿里云IoT平台错误:`, err); |
| | | }); |
| | | } |
| | | |
| | | if (!deviceInfo || deviceInfo.iotDevice) return; |
| | | |
| | | try { |
| | | logger.info(`${deviceId} 请求三元组,设备号: ${deviceInfo.iotDeviceNo}`); |
| | | const { data } = await getDeviceSYZ(deviceInfo.iotDeviceNo); |
| | | logger.info(`${deviceId} 三元组返回: ${JSON.stringify(data)}`); |
| | | const model=data.data |
| | | if (model?.productKey && model?.deviceName && model?.deviceSecret) { |
| | | deviceInfo.iotDevice = aliyunIot.device({ |
| | | ProductKey: model.productKey, |
| | | DeviceName: model.deviceName, |
| | | DeviceSecret: model.deviceSecret |
| | | }); |
| | | |
| | | deviceInfo.iotDevice.on('connect', () => { |
| | | logger.info(`${deviceId} 连接阿里云 IoT 成功`); |
| | | }); |
| | | |
| | | deviceInfo.iotDevice.on('error', (err) => { |
| | | logger.error(`${deviceId} 阿里云 IoT 错误:`, err.message); |
| | | }); |
| | | } |
| | | } |
| | | catch(err){ |
| | | logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`) |
| | | } catch (err) { |
| | | logger.error(`${deviceId} 阿里云注册失败:`, err.message); |
| | | } |
| | | } |
| | | //发送消息到iot云端 |
| | | |
| | | postPropsToDevice(deviceId) { |
| | | try{ |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | if (deviceInfo.iotDevice) { |
| | | // 上报属性数据 |
| | | const props = deviceInfo.masData |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | if (!deviceInfo?.iotDevice || !deviceInfo.masData) return; |
| | | |
| | | onDeviceDataReceived(deviceInfo.masData); // 调用函数处理接收到的数据 |
| | | deviceInfo.iotDevice.postProps(props, (res) => { |
| | | if (res.message==='success') { |
| | | logger.info(`${deviceId} 上报属性成功:`, res); |
| | | |
| | | } else { |
| | | logger.error(`${deviceId} 上报属性失败:`, res); |
| | | } |
| | | }); |
| | | const props = deviceInfo.masData; |
| | | deviceInfo.iotDevice.postProps(props, (res) => { |
| | | if (res?.message === 'success') { |
| | | logger.info(`${deviceId} 上报属性成功`); |
| | | } else { |
| | | logger.error(`${deviceId} 上报属性失败:`, res?.message || 'unknown'); |
| | | } |
| | | }catch(err){ |
| | | logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err); |
| | | } |
| | | |
| | | }); |
| | | } |
| | | |
| | | |
| | | // 移除设备连接 |
| | | onDeviceDataReceived(deviceId, data) { |
| | | const topic = `touxiji/${data.n}`; |
| | | const payload = JSON.stringify({ |
| | | ...data, |
| | | timestamp: new Date().toISOString() |
| | | }); |
| | | |
| | | try { |
| | | logger.info(`发布 MQTT 消息到主题 ${topic}: ${payload}`); |
| | | publishMessage(topic, payload); |
| | | } catch (error) { |
| | | logger.error(`MQTT 发布失败 ${topic}:`, error.message); |
| | | } |
| | | } |
| | | |
| | | removeDevice(deviceId) { |
| | | if (this.devices.has(deviceId)) { |
| | | this.stopRetryMechanism(deviceId); // 停止重试机制 |
| | | this.stopKeepAlive(deviceId); // 停止定时发送保持连接信号的机制 |
| | | this.devices.delete(deviceId); // 从设备列表中移除 |
| | | logger.info(`移除设备: ${deviceId}`); |
| | | const deviceInfo = this.devices.get(deviceId); |
| | | if (!deviceInfo) return; |
| | | |
| | | this.stopRetryMechanism(deviceId); |
| | | this.stopKeepAlive(deviceId); |
| | | |
| | | if (deviceInfo.socket && !deviceInfo.socket.destroyed) { |
| | | deviceInfo.socket.destroy(); |
| | | } |
| | | |
| | | if (deviceInfo.iotDevice) { |
| | | deviceInfo.iotDevice.end(); // 安全关闭 IoT 连接 |
| | | } |
| | | |
| | | this.devices.delete(deviceId); |
| | | logger.info(`设备已移除: ${deviceId}`); |
| | | } |
| | | } |
| | | |
| | | // 创建服务器实例 |
| | | const server = net.createServer((socket) => { |
| | | const manager = new DeviceManager(); |
| | | manager.handleDevice(socket); |
| | | manager.on('validResponse', (deviceId) => { |
| | | logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`); |
| | | // 可以在这里添加更多处理逻辑 |
| | | }); |
| | | manager.on('invalidResponse', (deviceId) => { |
| | | logger.info(`设备 ${deviceId} 响应了无效或没有响应`); |
| | | // 可以在这里添加更多处理逻辑 |
| | | }); |
| | | |
| | | // 处理错误 |
| | | socket.on('error', (err) => { |
| | | const deviceId = socket.remoteAddress + ':' + socket.remotePort; |
| | | logger.error(`与 ${deviceId} 的套接字发生错误:`, err); |
| | | manager.removeDevice(deviceId); // 移除设备连接 |
| | | }); |
| | | |
| | | // 客户端断开时触发 |
| | | socket.on('end', () => { |
| | | const deviceId = socket.remoteAddress + ':' + socket.remotePort; |
| | | logger.info(`设备断开连接: ${deviceId}`); |
| | | manager.removeDevice(deviceId); // 移除设备连接 |
| | | }); |
| | | }); |
| | | |
| | | // 绑定端口并开始监听 |
| | | server.listen(961, () => { |
| | | logger.info('socket服务已经启动端口号:961'); |
| | | |
| | | }); |
| | | const getDeviceSYZ=async (devcieNo)=>{ |
| | | try{ |
| | | const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo) |
| | | if(data){ |
| | | return data |
| | | }else{ |
| | | return '' |
| | | } |
| | | |
| | | }catch(err){ |
| | | logger.error(` ${devcieNo} 获取三元组信息错误:`, err); |
| | | throw new CustomError("新错误", error); |
| | | |
| | | } |
| | | } |
| | | // 接收到的数据 |
| | | const onDeviceDataReceived=(data)=> { |
| | | const topic = `touxiji/${data.n}`; |
| | | const payload = JSON.stringify({ |
| | | ...data, |
| | | timestamp: new Date().toISOString() |
| | | }); |
| | | // ========== 辅助函数 ========== |
| | | async function getDeviceSYZ(deviceNo) { |
| | | try { |
| | | logger.info(`发布消息到主题 ${topic}: ${payload}`); |
| | | publishMessage(topic, payload); |
| | | } catch (error) { |
| | | logger.error(`发布消息到主题 ${topic} 失败:`, error); |
| | | throw new CustomError("发布消息失败", error); |
| | | const { data, message } = await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret', deviceNo); |
| | | return { data }; |
| | | } catch (err) { |
| | | logger.error(`${deviceNo} 获取三元组失败:`, err.message); |
| | | throw new CustomError("获取三元组失败", err); |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | // ========== 启动服务器 ========== |
| | | const manager = new DeviceManager(); // ✅ 单例! |
| | | |
| | | manager.on('validResponse', (deviceId) => { |
| | | logger.info(`设备 ${deviceId} 成功完成握手`); |
| | | }); |
| | | |
| | | manager.on('invalidResponse', (deviceId) => { |
| | | logger.info(`设备 ${deviceId} 无效响应`); |
| | | }); |
| | | |
| | | const server = net.createServer((socket) => { |
| | | manager.handleDevice(socket); |
| | | }); |
| | | |
| | | const PORT = process.env.PORT || 10961; |
| | | server.listen(PORT, () => { |
| | | logger.info(`Socket 服务已启动,监听端口: ${PORT}`); |
| | | }); |