东丽网口版透析机 socket- server 通讯
chenyc
2025-11-08 434f42a9b33927a64611c3f9a06c674a3ca6a013
index.js
@@ -1,292 +1,295 @@
const net = require('net');  // 引入 Node.js 的 net 模块用于创建网络服务器和客户端
const logger = require('./logger'); // 引入上面创建的logger
const EventEmitter = require('events');  // 引入 Node.js 的 events 模块,用于事件驱动编程
const net = require('net');
const logger = require('./logger');
const EventEmitter = require('events');
const aliyunIot = require('aliyun-iot-device-sdk');
const { getAliyunDeviceSecret } = require('./api');
const toModel = require('./Strholp');
const { publishMessage } = require('./mqttClient');
// 定义 DeviceManager 类来管理设备连接
// ========== 自定义错误类(可选)==========
class CustomError extends Error {
    constructor(message, cause) {
        super(message);
        this.cause = cause;
    }
}
// ========== 常量配置 ==========
const MAX_BUFFER_SIZE = 500;      // 缓冲区最大长度
const RETRY_INTERVAL_MS = 10000;  // 重试间隔 10s
const KEEP_ALIVE_INTERVAL_MS = 60000; // 保活间隔 60s
const DEVICE_TIMEOUT_MS = 120000; // 设备无响应超时 2分钟
// ========== DeviceManager 类 ==========
class DeviceManager extends EventEmitter {
    constructor() {
        super();  // 调用父类构造函数
        this.devices = new Map();  // 使用 Map 存储设备连接及其状态
    }
    // 处理新设备连接
    handleDevice(socket) {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;  // 根据 IP 和端口生成唯一设备ID
        try{
            logger.info(`建立新连接${deviceId}`)
            this.devices.set(deviceId, {
                socket,
                lastAck: Date.now(),  // 记录最后一次收到 ACK 的时间
                status: 'pending',  // 初始状态为 pending(等待确认)
                retryInterval: null,  // 重试机制定时器ID
                keepAliveInterval: null,  // 定时发送保持连接信号的定时器ID
                lastSignal: 'K',  // 最后发送的信号,默认为 'K'
                iotDevice:null,
                masData:'',
                iotDeviceNo:'' // SOCKET连接的设备号
            });
            // 发送初始 'K'
            this.sendKeepAliveToDevice(deviceId);
        super();
        this.devices = new Map();
            // 启动重试机制
            this.startRetryMechanism(deviceId);
            // 每个客户端连接都有一个独立的缓冲区 buffer。
            let buffer = '';
            // 监听客户端发送的数据
            socket.on('data', (chunk) => {
                buffer += chunk.toString('ascii');  // 将新的数据片段添加到缓冲区
                console.log(buffer.length,'-------------------')
                while (true) {
                    console.log(buffer)
                    const messageBengIndex=buffer.indexOf('K1');
                    const messageEndIndex = buffer.indexOf('\r\n');  // 假设消息以 '\r\n' 结束
                    //如果没有K1说明没有开始符号  直接放弃之前接受的数据  如果拼接字符串太长还没有结束符号也丢弃
                    if(messageBengIndex===-1||buffer.length>300){
                        buffer=''
                        break
                    }
                    if (messageBengIndex===-1||messageEndIndex === -1) break;  // 如果没有找到结束符,跳出循环等待更多数据
                    const completeMessage = buffer.substring(messageBengIndex, messageEndIndex).trim();  // 提取完整消息
                    buffer = ''; // 移除已处理的部分
                    logger.info(`${deviceId}接收到的完整消息`)
                    // 解析数据
                    this.handleData(deviceId, completeMessage.toString());  // 处理接收到的数据
        // 启动全局超时清理定时器
        setInterval(() => {
            const now = Date.now();
            for (const [deviceId, info] of this.devices.entries()) {
                if (now - info.lastAck > DEVICE_TIMEOUT_MS) {
                    logger.warn(`设备 ${deviceId} 超时(${DEVICE_TIMEOUT_MS / 1000}s 无响应),自动断开`);
                    this.removeDevice(deviceId);
                }
            });
        }catch(err){
            logger.error(`${deviceId}建立连接异常出错:${err}`)
        }
            }
        }, 30000); // 每30秒检查一次
    }
    // 启动针对每个设备的重试机制
    handleDevice(socket) {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
        logger.info(`建立新连接: ${deviceId}`);
        const deviceInfo = {
            socket,
            lastAck: Date.now(),
            status: 'pending', // pending → valid → registered
            retryInterval: null,
            keepAliveInterval: null,
            lastSignal: 'K', // 初始握手信号
            iotDevice: null,
            masData: '',
            iotDeviceNo: ''
        };
        this.devices.set(deviceId, deviceInfo);
        // 发送初始握手信号
        this.sendKeepAliveToDevice(deviceId);
        this.startRetryMechanism(deviceId);
        let buffer = '';
        socket.on('data', (chunk) => {
            buffer += chunk.toString('ascii');
            while (true) {
                const startIdx = buffer.indexOf('K1');
                if (startIdx === -1) {
                    // 没有起始标志,但保留数据(防分包)
                    if (buffer.length > MAX_BUFFER_SIZE) buffer = '';
                    break;
                }
                const endIdx = buffer.indexOf('\r\n', startIdx);
                if (endIdx === -1) {
                    // 结束符未到,保留从 K1 开始的数据
                    if (buffer.length > MAX_BUFFER_SIZE) {
                        buffer = buffer.substring(startIdx);
                    }
                    break;
                }
                const message = buffer.substring(startIdx, endIdx).trim();
                buffer = buffer.substring(endIdx + 2); // 移除已处理部分(含 \r\n)
                logger.info(`${deviceId} 接收到完整消息: ${message}`);
                this.handleData(deviceId, message);
            }
        });
        socket.on('error', (err) => {
            logger.error(`设备 ${deviceId} 套接字错误:`, err.message);
            this.removeDevice(deviceId);
        });
        socket.on('end', () => {
            logger.info(`设备断开连接: ${deviceId}`);
            this.removeDevice(deviceId);
        });
    }
    startRetryMechanism(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        try{
            if (deviceInfo.retryInterval) clearInterval(deviceInfo.retryInterval);  // 清除之前的重试定时器
            deviceInfo.retryInterval = setInterval(() => {
                if (deviceInfo.status === 'pending'||deviceInfo.status==='invalid') {
                    // 来回切换发送,命令
                    if(deviceInfo.lastSignal==='K'){
                        deviceInfo.lastSignal='K0000'
                    }else{
                        deviceInfo.lastSignal='K'
                    }
                    logger.info(`尝试重新发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
                    this.sendKeepAliveToDevice(deviceId);  // 如果设备状态是待确认,则重发上次的信号
                }
            }, 10000);  // 每2秒重试一次
        }catch(err){
            logger.error(`${deviceId}设备重试机制出错:${err}`)
        }
        if (!deviceInfo) return;
        this.stopRetryMechanism(deviceId);
        deviceInfo.retryInterval = setInterval(() => {
            if (deviceInfo.status === 'pending' || deviceInfo.status === 'invalid') {
                // 握手阶段:在 'K' 和 'K0000' 之间切换(你的原始逻辑)
                deviceInfo.lastSignal = deviceInfo.lastSignal === 'K' ? 'K0000' : 'K';
                logger.info(`重试发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
                this.sendKeepAliveToDevice(deviceId);
            }
        }, RETRY_INTERVAL_MS);
    }
    // 停止针对每个设备的重试机制
    stopRetryMechanism(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        if (deviceInfo.retryInterval) {
            clearInterval(deviceInfo.retryInterval);  // 清除重试定时器
        if (deviceInfo?.retryInterval) {
            clearInterval(deviceInfo.retryInterval);
            deviceInfo.retryInterval = null;
        }
    }
    // 启动针对每个设备的定时发送保持连接信号的机制
    startKeepAlive(deviceId, signal) {
        const deviceInfo = this.devices.get(deviceId);
        if (deviceInfo.keepAliveInterval) clearInterval(deviceInfo.keepAliveInterval);  // 清除之前的定时器
        deviceInfo.lastSignal = signal;  // 更新最后发送的信号
        if (!deviceInfo) return;
        this.stopKeepAlive(deviceId);
        deviceInfo.lastSignal = signal;
        deviceInfo.keepAliveInterval = setInterval(() => {
            this.sendKeepAliveToDevice(deviceId);  // 每60秒发送一次保持连接信号
        }, 60000);  // 修改为每60秒发送一次
            this.sendKeepAliveToDevice(deviceId);
        }, KEEP_ALIVE_INTERVAL_MS);
    }
    // 停止针对每个设备的定时发送保持连接信号的机制
    stopKeepAlive(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        if (deviceInfo.keepAliveInterval) {
            clearInterval(deviceInfo.keepAliveInterval);  // 清除定时器
        if (deviceInfo?.keepAliveInterval) {
            clearInterval(deviceInfo.keepAliveInterval);
            deviceInfo.keepAliveInterval = null;
        }
    }
    // 向特定设备发送保持连接信号
    sendKeepAliveToDevice(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        if (!deviceInfo) return;
        deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`);  // 根据上次成功信号发送 'K' 或 'K0000'
        logger.info(`发送保持连接信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
    }
        if (!deviceInfo || !deviceInfo.socket.writable) return;
    // 处理来自客户端的数据
    handleData(deviceId, message) {
        try{
            const deviceInfo = this.devices.get(deviceId);
            if (!deviceInfo) return;
            logger.info(`接收到来自 ${deviceId} 的消息: ${message}`);
            deviceInfo.lastAck = Date.now();  // 更新最后收到 ACK 的时间
            const masData= toModel(message)
            deviceInfo.iotDeviceNo=masData.n
            deviceInfo.masData= masData
            // 如果已经是有效的接受数据了  就不用在启用定时发送了  直接改状体
            if(deviceInfo.status!=='valid'){
                this.emit('validResponse', deviceId);  // 触发 validResponse 事件
                deviceInfo.status = 'valid';  // 设备状态更新为有效
                console.log(`停止重试${deviceId}`)
                this.stopRetryMechanism(deviceId);  // 停止重试机制
                // 根据上次发送的信号启动相应的定时发送机制
                this.startKeepAlive(deviceId, deviceInfo.lastSignal);  // 启动定时发送 'K0000' 的机制
                logger.info(`${deviceId}启动定时发送 ${deviceInfo.lastSignal} 的机制60秒一次`);
                this.registerDevice(deviceId)
            }else{
                logger.info('注册成功后第二次就发送数据到阿里云')
                this.postPropsToDevice(deviceId)
            }
        }catch(err){
            logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`)
        try {
            deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`);
            logger.info(`发送信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
        } catch (err) {
            logger.error(`发送信号失败 ${deviceId}:`, err.message);
            this.removeDevice(deviceId);
        }
    }
    // 处理设备上阿里云
    async handleData(deviceId, message) {
        const deviceInfo = this.devices.get(deviceId);
        if (!deviceInfo) return;
        deviceInfo.lastAck = Date.now();
        try {
            const masData = toModel(message);
            deviceInfo.iotDeviceNo = masData.n;
            deviceInfo.masData = masData;
            if (deviceInfo.status !== 'valid') {
                deviceInfo.status = 'valid';
                this.emit('validResponse', deviceId);
                logger.info(`${deviceId} 首次有效响应,停止重试`);
                this.stopRetryMechanism(deviceId);
                // 成功后固定使用 'K0000' 保活(建议)
                this.startKeepAlive(deviceId, deviceInfo.lastSignal);
                await this.registerDevice(deviceId);
            } else {
                logger.info(`${deviceId} 已注册,直接上报数据到阿里云`);
                this.postPropsToDevice(deviceId);
                // this.onDeviceDataReceived(deviceId, masData);
            }
        } catch (err) {
            logger.error(`${deviceId} 处理消息出错:`, err.message);
        }
    }
    async registerDevice(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        logger.info(`${deviceId}注册阿里云iot设备`);
        try{
            if (!deviceInfo.iotDevice) {
                // 请求三元组信息
                logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`);
                const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo)
                logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data));
                if(data&&data.productKey&&data.deviceName&&data.deviceSecret){
                    // 设备配置信息
                    const productKey = data.productKey; // 替换为你的产品密钥
                    const deviceName = data.deviceName; // 替换为你的设备名称
                    const deviceSecret = data.deviceSecret; // 替换为你的设备密钥
                    // 创建设备实例aliyunIot.device(devConfig);
                    deviceInfo.iotDevice = aliyunIot.device({
                        ProductKey: productKey,
                        DeviceName: deviceName,
                        DeviceSecret: deviceSecret
                    });
                    // 监听设备连接状态变化
                    deviceInfo.iotDevice.on('connect', () => {
                        // logger.info(`${deviceId} 连接到阿里云IoT平台成功`);
                    });
                    deviceInfo.iotDevice.on('error', (err) => {
                        logger.info(`${deviceId} 设备连接到阿里云IoT平台错误:`, err);
                    });
                }
        if (!deviceInfo || deviceInfo.iotDevice) return;
        try {
            logger.info(`${deviceId} 请求三元组,设备号: ${deviceInfo.iotDeviceNo}`);
            const { data } = await getDeviceSYZ(deviceInfo.iotDeviceNo);
            logger.info(`${deviceId} 三元组返回: ${JSON.stringify(data)}`);
            const model=data.data
            if (model?.productKey && model?.deviceName && model?.deviceSecret) {
                deviceInfo.iotDevice = aliyunIot.device({
                    ProductKey: model.productKey,
                    DeviceName: model.deviceName,
                    DeviceSecret: model.deviceSecret
                });
                deviceInfo.iotDevice.on('connect', () => {
                    logger.info(`${deviceId} 连接阿里云 IoT 成功`);
                });
                deviceInfo.iotDevice.on('error', (err) => {
                    logger.error(`${deviceId} 阿里云 IoT 错误:`, err.message);
                });
            }
        }
        catch(err){
            logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`)
        } catch (err) {
            logger.error(`${deviceId} 阿里云注册失败:`, err.message);
        }
    }
    //发送消息到iot云端
    postPropsToDevice(deviceId) {
        try{
            const deviceInfo = this.devices.get(deviceId);
            if (deviceInfo.iotDevice) {
              // 上报属性数据
              const props = deviceInfo.masData
        const deviceInfo = this.devices.get(deviceId);
        if (!deviceInfo?.iotDevice || !deviceInfo.masData) return;
              onDeviceDataReceived(deviceInfo.masData); // 调用函数处理接收到的数据
              deviceInfo.iotDevice.postProps(props, (res) => {
                if (res.message==='success') {
                    logger.info(`${deviceId} 上报属性成功:`, res);
                } else {
                    logger.error(`${deviceId} 上报属性失败:`, res);
                }
              });
        const props = deviceInfo.masData;
        deviceInfo.iotDevice.postProps(props, (res) => {
            if (res?.message === 'success') {
                logger.info(`${deviceId} 上报属性成功`);
            } else {
                logger.error(`${deviceId} 上报属性失败:`, res?.message || 'unknown');
            }
        }catch(err){
            logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err);
        }
        });
    }
    // 移除设备连接
    onDeviceDataReceived(deviceId, data) {
        const topic = `touxiji/${data.n}`;
        const payload = JSON.stringify({
            ...data,
            timestamp: new Date().toISOString()
        });
        try {
            logger.info(`发布 MQTT 消息到主题 ${topic}: ${payload}`);
            publishMessage(topic, payload);
        } catch (error) {
            logger.error(`MQTT 发布失败 ${topic}:`, error.message);
        }
    }
    removeDevice(deviceId) {
        if (this.devices.has(deviceId)) {
            this.stopRetryMechanism(deviceId);  // 停止重试机制
            this.stopKeepAlive(deviceId);  // 停止定时发送保持连接信号的机制
            this.devices.delete(deviceId);  // 从设备列表中移除
            logger.info(`移除设备: ${deviceId}`);
        const deviceInfo = this.devices.get(deviceId);
        if (!deviceInfo) return;
        this.stopRetryMechanism(deviceId);
        this.stopKeepAlive(deviceId);
        if (deviceInfo.socket && !deviceInfo.socket.destroyed) {
            deviceInfo.socket.destroy();
        }
        if (deviceInfo.iotDevice) {
            deviceInfo.iotDevice.end(); // 安全关闭 IoT 连接
        }
        this.devices.delete(deviceId);
        logger.info(`设备已移除: ${deviceId}`);
    }
}
// 创建服务器实例
const server = net.createServer((socket) => {
    const manager = new DeviceManager();
    manager.handleDevice(socket);
    manager.on('validResponse', (deviceId) => {
        logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`);
        // 可以在这里添加更多处理逻辑
    });
    manager.on('invalidResponse', (deviceId) => {
        logger.info(`设备 ${deviceId} 响应了无效或没有响应`);
        // 可以在这里添加更多处理逻辑
    });
    // 处理错误
    socket.on('error', (err) => {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
        logger.error(`与 ${deviceId} 的套接字发生错误:`, err);
        manager.removeDevice(deviceId);  // 移除设备连接
    });
    // 客户端断开时触发
    socket.on('end', () => {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
        logger.info(`设备断开连接: ${deviceId}`);
        manager.removeDevice(deviceId);  // 移除设备连接
    });
});
// 绑定端口并开始监听
server.listen(961, () => {
    logger.info('socket服务已经启动端口号:961');
});
const getDeviceSYZ=async (devcieNo)=>{
    try{
        const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo)
        if(data){
            return data
        }else{
            return ''
        }
    }catch(err){
        logger.error(` ${devcieNo} 获取三元组信息错误:`, err);
        throw new CustomError("新错误", error);
    }
}
// 接收到的数据
const  onDeviceDataReceived=(data)=> {
    const topic = `touxiji/${data.n}`;
    const payload = JSON.stringify({
      ...data,
      timestamp: new Date().toISOString()
    });
// ========== 辅助函数 ==========
async function getDeviceSYZ(deviceNo) {
    try {
        logger.info(`发布消息到主题 ${topic}: ${payload}`);
        publishMessage(topic, payload);
    } catch (error) {
        logger.error(`发布消息到主题 ${topic} 失败:`, error);
        throw new CustomError("发布消息失败", error);
        const { data, message } = await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret', deviceNo);
        return { data };
    } catch (err) {
        logger.error(`${deviceNo} 获取三元组失败:`, err.message);
        throw new CustomError("获取三元组失败", err);
    }
}
     }
  }
// ========== 启动服务器 ==========
const manager = new DeviceManager(); // ✅ 单例!
manager.on('validResponse', (deviceId) => {
    logger.info(`设备 ${deviceId} 成功完成握手`);
});
manager.on('invalidResponse', (deviceId) => {
    logger.info(`设备 ${deviceId} 无效响应`);
});
const server = net.createServer((socket) => {
    manager.handleDevice(socket);
});
const PORT = process.env.PORT || 10961;
server.listen(PORT, () => {
    logger.info(`Socket 服务已启动,监听端口: ${PORT}`);
});