const net = require('net'); // 引入 Node.js 的 net 模块用于创建网络服务器和客户端
|
const logger = require('./logger'); // 引入上面创建的logger
|
const EventEmitter = require('events'); // 引入 Node.js 的 events 模块,用于事件驱动编程
|
const aliyunIot = require('aliyun-iot-device-sdk');
|
const { getAliyunDeviceSecret } = require('./api');
|
const toModel = require('./Strholp');
|
// 定义 DeviceManager 类来管理设备连接
|
class DeviceManager extends EventEmitter {
|
constructor() {
|
super(); // 调用父类构造函数
|
this.devices = new Map(); // 使用 Map 存储设备连接及其状态
|
}
|
// 处理新设备连接
|
handleDevice(socket) {
|
const deviceId = socket.remoteAddress + ':' + socket.remotePort; // 根据 IP 和端口生成唯一设备ID
|
try{
|
logger.info(`建立新连接${deviceId}`)
|
this.devices.set(deviceId, {
|
socket,
|
lastAck: Date.now(), // 记录最后一次收到 ACK 的时间
|
status: 'pending', // 初始状态为 pending(等待确认)
|
retryInterval: null, // 重试机制定时器ID
|
keepAliveInterval: null, // 定时发送保持连接信号的定时器ID
|
lastSignal: 'K', // 最后发送的信号,默认为 'K'
|
iotDevice:null,
|
masData:'',
|
iotDeviceNo:'' // SOCKET连接的设备号
|
});
|
// 发送初始 'K'
|
this.sendKeepAliveToDevice(deviceId);
|
|
// 启动重试机制
|
this.startRetryMechanism(deviceId);
|
|
// 每个客户端连接都有一个独立的缓冲区 buffer。
|
let buffer = '';
|
// 监听客户端发送的数据
|
socket.on('data', (chunk) => {
|
buffer += chunk.toString('ascii'); // 将新的数据片段添加到缓冲区
|
console.log(buffer.length,'-------------------')
|
while (true) {
|
console.log(buffer)
|
const messageBengIndex=buffer.indexOf('K1');
|
const messageEndIndex = buffer.indexOf('\r\n'); // 假设消息以 '\r\n' 结束
|
//如果没有K1说明没有开始符号 直接放弃之前接受的数据 如果拼接字符串太长还没有结束符号也丢弃
|
if(messageBengIndex===-1||buffer.length>300){
|
buffer=''
|
break
|
}
|
if (messageBengIndex===-1||messageEndIndex === -1) break; // 如果没有找到结束符,跳出循环等待更多数据
|
|
const completeMessage = buffer.substring(messageBengIndex, messageEndIndex).trim(); // 提取完整消息
|
buffer = ''; // 移除已处理的部分
|
logger.info(`${deviceId}接收到的完整消息`)
|
// 解析数据
|
this.handleData(deviceId, completeMessage.toString()); // 处理接收到的数据
|
}
|
|
});
|
}catch(err){
|
logger.error(`${deviceId}建立连接异常出错:${err}`)
|
}
|
}
|
// 启动针对每个设备的重试机制
|
startRetryMechanism(deviceId) {
|
const deviceInfo = this.devices.get(deviceId);
|
try{
|
if (deviceInfo.retryInterval) clearInterval(deviceInfo.retryInterval); // 清除之前的重试定时器
|
deviceInfo.retryInterval = setInterval(() => {
|
if (deviceInfo.status === 'pending'||deviceInfo.status==='invalid') {
|
// 来回切换发送,命令
|
if(deviceInfo.lastSignal==='K'){
|
deviceInfo.lastSignal='K0000'
|
}else{
|
deviceInfo.lastSignal='K'
|
}
|
logger.info(`尝试重新发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
|
this.sendKeepAliveToDevice(deviceId); // 如果设备状态是待确认,则重发上次的信号
|
}
|
}, 5000); // 每2秒重试一次
|
}catch(err){
|
logger.error(`${deviceId}设备重试机制出错:${err}`)
|
}
|
}
|
|
// 停止针对每个设备的重试机制
|
stopRetryMechanism(deviceId) {
|
const deviceInfo = this.devices.get(deviceId);
|
if (deviceInfo.retryInterval) {
|
clearInterval(deviceInfo.retryInterval); // 清除重试定时器
|
deviceInfo.retryInterval = null;
|
}
|
}
|
|
// 启动针对每个设备的定时发送保持连接信号的机制
|
startKeepAlive(deviceId, signal) {
|
const deviceInfo = this.devices.get(deviceId);
|
if (deviceInfo.keepAliveInterval) clearInterval(deviceInfo.keepAliveInterval); // 清除之前的定时器
|
deviceInfo.lastSignal = signal; // 更新最后发送的信号
|
deviceInfo.keepAliveInterval = setInterval(() => {
|
this.sendKeepAliveToDevice(deviceId); // 每60秒发送一次保持连接信号
|
}, 60000); // 修改为每60秒发送一次
|
}
|
|
// 停止针对每个设备的定时发送保持连接信号的机制
|
stopKeepAlive(deviceId) {
|
const deviceInfo = this.devices.get(deviceId);
|
if (deviceInfo.keepAliveInterval) {
|
clearInterval(deviceInfo.keepAliveInterval); // 清除定时器
|
deviceInfo.keepAliveInterval = null;
|
}
|
}
|
|
// 向特定设备发送保持连接信号
|
sendKeepAliveToDevice(deviceId) {
|
const deviceInfo = this.devices.get(deviceId);
|
if (!deviceInfo) return;
|
deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`); // 根据上次成功信号发送 'K' 或 'K0000'
|
logger.info(`发送保持连接信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
|
}
|
|
// 处理来自客户端的数据
|
handleData(deviceId, message) {
|
try{
|
const deviceInfo = this.devices.get(deviceId);
|
if (!deviceInfo) return;
|
logger.info(`接收到来自 ${deviceId} 的消息: ${message}`);
|
deviceInfo.lastAck = Date.now(); // 更新最后收到 ACK 的时间
|
const masData= toModel(message)
|
deviceInfo.iotDeviceNo=masData.n
|
deviceInfo.masData= masData
|
// 如果已经是有效的接受数据了 就不用在启用定时发送了 直接改状体
|
if(deviceInfo.status!=='valid'){
|
this.emit('validResponse', deviceId); // 触发 validResponse 事件
|
deviceInfo.status = 'valid'; // 设备状态更新为有效
|
console.log(`停止重试${deviceId}`)
|
this.stopRetryMechanism(deviceId); // 停止重试机制
|
// 根据上次发送的信号启动相应的定时发送机制
|
this.startKeepAlive(deviceId, deviceInfo.lastSignal); // 启动定时发送 'K0000' 的机制
|
logger.info(`${deviceId}启动定时发送 'K' 的机制60秒一次`);
|
this.registerDevice(deviceId)
|
}else{
|
logger.info('注册成功后第二次就发送数据到阿里云')
|
this.postPropsToDevice(deviceId)
|
|
}
|
}catch(err){
|
logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`)
|
}
|
|
|
}
|
// 处理设备上阿里云
|
async registerDevice(deviceId) {
|
const deviceInfo = this.devices.get(deviceId);
|
logger.info(`${deviceId}注册阿里云iot设备`);
|
try{
|
if (!deviceInfo.iotDevice) {
|
// 请求三元组信息
|
logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`);
|
const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo)
|
logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data));
|
if(data&&data.productKey&&data.deviceName&&data.deviceSecret){
|
// 设备配置信息
|
const productKey = data.productKey; // 替换为你的产品密钥
|
const deviceName = data.deviceName; // 替换为你的设备名称
|
const deviceSecret = data.deviceSecret; // 替换为你的设备密钥
|
// 创建设备实例aliyunIot.device(devConfig);
|
deviceInfo.iotDevice = aliyunIot.device({
|
ProductKey: productKey,
|
DeviceName: deviceName,
|
DeviceSecret: deviceSecret
|
});
|
// 监听设备连接状态变化
|
deviceInfo.iotDevice.on('connect', () => {
|
logger.info(`${deviceId} 连接到阿里云IoT平台成功`);
|
});
|
|
deviceInfo.iotDevice.on('error', (err) => {
|
logger.info(`${deviceId} 设备连接错误:`, err);
|
});
|
}
|
|
}
|
}
|
catch(err){
|
logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`)
|
}
|
}
|
//发送消息到iot云端
|
postPropsToDevice(deviceId) {
|
try{
|
const deviceInfo = this.devices.get(deviceId);
|
if (deviceInfo.iotDevice) {
|
// 上报属性数据
|
const props = deviceInfo.masData
|
deviceInfo.iotDevice.postProps(props, (res) => {
|
if (res.message==='success') {
|
logger.info(`${deviceId} 上报属性成功:`, res);
|
|
} else {
|
logger.error(`${deviceId} 上报属性失败:`, res);
|
}
|
});
|
}
|
}catch(err){
|
logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err);
|
}
|
|
}
|
|
|
// 移除设备连接
|
removeDevice(deviceId) {
|
if (this.devices.has(deviceId)) {
|
this.stopRetryMechanism(deviceId); // 停止重试机制
|
this.stopKeepAlive(deviceId); // 停止定时发送保持连接信号的机制
|
this.devices.delete(deviceId); // 从设备列表中移除
|
logger.info(`移除设备: ${deviceId}`);
|
}
|
}
|
}
|
|
// 创建服务器实例
|
const server = net.createServer((socket) => {
|
const manager = new DeviceManager();
|
manager.handleDevice(socket);
|
manager.on('validResponse', (deviceId) => {
|
logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`);
|
// 可以在这里添加更多处理逻辑
|
});
|
manager.on('invalidResponse', (deviceId) => {
|
logger.info(`设备 ${deviceId} 响应了无效或没有响应`);
|
// 可以在这里添加更多处理逻辑
|
});
|
|
// 处理错误
|
socket.on('error', (err) => {
|
const deviceId = socket.remoteAddress + ':' + socket.remotePort;
|
logger.error(`与 ${deviceId} 的套接字发生错误:`, err);
|
manager.removeDevice(deviceId); // 移除设备连接
|
});
|
|
// 客户端断开时触发
|
socket.on('end', () => {
|
const deviceId = socket.remoteAddress + ':' + socket.remotePort;
|
logger.info(`设备断开连接: ${deviceId}`);
|
manager.removeDevice(deviceId); // 移除设备连接
|
});
|
});
|
|
// 绑定端口并开始监听
|
server.listen(961, () => {
|
logger.info('socket服务已经启动端口号:961');
|
|
});
|
const getDeviceSYZ=async (devcieNo)=>{
|
try{
|
const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo)
|
if(data){
|
return data
|
}else{
|
return ''
|
}
|
|
}catch(err){
|
logger.error(` ${devcieNo} 获取三元组信息错误:`, err);
|
throw new CustomError("新错误", error);
|
|
}
|
}
|