chenyc
2025-08-07 9b83b704063a0ec9db6c3ed8b0665783f1961c19
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
const net = require('net');  // 引入 Node.js 的 net 模块用于创建网络服务器和客户端
const { crc16 } = require('./crc16'); // 引入上面的文件
const logger = require('./logger'); // 引入上面创建的logger
const { toModel,format } = require('./modeTool'); // 引入数据解析工具
const EventEmitter = require('events');  // 引入 Node.js 的 events 模块,用于事件驱动编程
const aliyunIot = require('aliyun-iot-device-sdk');
const { getAliyunDeviceSecret } = require('./api');
const { publishMessage } = require('./mqttClient');
// 定义 DeviceManager 类来管理设备连接
class DeviceManager extends EventEmitter {
    constructor() {
        super();  // 调用父类构造函数
        this.devices = new Map();  // 使用 Map 存储设备连接及其状态
    }
    // 处理新设备连接
    handleDevice(socket) {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;  // 根据 IP 和端口生成唯一设备ID
        try{
            logger.info(`建立新连接${deviceId}`)
            this.devices.set(deviceId, { 
                socket, 
                lastAck: Date.now(),  // 记录最后一次收到 ACK 的时间
                status: 'pending',  // 初始状态为 pending(等待确认)
                retryInterval: null,  // 重试机制定时器ID
                keepAliveInterval: null,  // 定时发送保持连接信号的定时器ID
                lastSignal: 'K',  // 最后发送的信号,默认为 'K'
                iotDevice:null,
                masData:'',
                iotDeviceNo:'' // SOCKET连接的设备号
            });
            // 每个客户端连接都有一个独立的缓冲区 buffer。
            let buffer = '';
            // 监听客户端发送的数据
            socket.on('data', (data) => {
                const buffer = Buffer.from(data);
                // 检查数据长度是否符合预期
                if (buffer.length < 4) {
                    console.error('Invalid packet length');
                    return;
                }
                // 提取头信息和 CRC16
                const header = buffer.slice(0, 2).toString('hex');
                const receivedCrc = buffer.readUInt16LE(2); // 假设 CRC 在第3-4字节
                const payload = buffer.slice(4);
                // 计算 CRC16
                const calculatedCrc = crc16(0x0000, payload);
                if (receivedCrc === calculatedCrc&&header=='aabb') {
                    logger.info(`来自 ${deviceId} 的数据 CRC 校验通过`);
                    // CRC 校验通过,处理数据
                    this.handleData(deviceId, buffer);  // 处理数据
                }else{
                    logger.error(`来自 ${deviceId} 的数据 CRC 校验失败`);
                }
            });
        }catch(err){
            logger.error(`${deviceId}建立连接异常出错:${err}`)
        }
    }
 
 
    // 处理来自客户端的数据
    handleData(deviceId, message) {
        try{
            const deviceInfo = this.devices.get(deviceId);
            if (!deviceInfo) return;
            logger.info(`接收到来自 ${deviceId} 的消息: ${message.toString('hex')}`);
             const masData= toModel(message)
            deviceInfo.iotDeviceNo=masData.设备编号前16位+masData.设备编号后16位.toString() // 设备号
            deviceInfo.masData = {
                deviceType:'宝特莱D800',
                suedtime:format(new Date()), // 使用当前时间作为使用时间
                xycsML: masData.血氧参数脉率, // 血氧参数脉率
                xycsSOP2: masData.血氧参数Spo2值, // 血氧参数 Spo2 值
                xyjpjxy: masData.血压计平均血压, // 血压计平均血压
                xdxrl: masData.相对血容量, // 相对血容量
                n: deviceInfo.iotDeviceNo, // 设备号
                qcl: masData.清除率, // 清除率
                KTV: masData.KtV, // Kt/V
                sbzt:masData.模式, // 设备状态
                mb:masData.血压计脉搏, // 血压计脉搏
                szy: masData.血压计舒张血压, // 血压计舒张血压
                ssy: masData.血压计收缩血压, // 血压计收缩血压
                hybyl: masData.后已补液量, // 后已补液量
                hbyzl: masData.后补液总量, // 后补液总量
                qybyl: masData.前已补液量, // 前已补液量
                qbyzl: masData.前补液总量, // 前补液总量
                gszjl: masData.追加量, // 追加量
                gsdzsh: masData.肝素停止时间, // 肝素停止时间
                E:masData.肝素速率, // 肝素速率
                F: masData.透析液实际温度, // 透析液实际温度
                L: masData.透析液实际流量, // 透析液实际流量
                G:masData.电导度,// 电导度
                J:masData.跨膜压, // 跨膜压
                o:masData.动脉压, // 动脉压
                H:masData.静脉压, // 静脉压
                D:masData.实际血流量, // 实际血流量
                SDXYLS:masData.设定血流量, // 设定血流量
                A:masData.超滤目标, // 超滤目标
                B:masData.当前已超滤量, // 当前已超滤量
                C:masData.超滤率, // 超滤率
                sysj:masData.治疗剩余时间, // 治疗剩余时间
            };  // 保存解析后的数据
            console.log(deviceInfo.masData)
            if(deviceInfo.status!=='valid'){
                deviceInfo.status = 'valid';  // 设备状态更新为有效
                this.registerDevice(deviceId)
            }else{
                logger.info('注册成功后第二次就发送数据到阿里云')
                this.postPropsToDevice(deviceId)
            }
 
        }catch(err){
            logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`)
        }
        
       
    }
    // 处理设备上阿里云
    /**
     * 用设备编号换阿里云注册信息上云 
     * @param {*} deviceId 
     */
    async registerDevice(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        logger.info(`${deviceId}注册阿里云iot设备`);
        try{
            if (!deviceInfo.iotDevice) {
                // 请求三元组信息
                logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`);
                const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo)
                logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data));
                if(data&&data.productKey&&data.deviceName&&data.deviceSecret){
                    // 设备配置信息
                    const productKey = data.productKey; // 替换为你的产品密钥
                    const deviceName = data.deviceName; // 替换为你的设备名称
                    const deviceSecret = data.deviceSecret; // 替换为你的设备密钥
                    // 创建设备实例aliyunIot.device(devConfig);
                    deviceInfo.iotDevice = aliyunIot.device({
                        ProductKey: productKey,
                        DeviceName: deviceName,
                        DeviceSecret: deviceSecret
                    });
                    // 监听设备连接状态变化
                    deviceInfo.iotDevice.on('connect', () => {
                    });
                
                    deviceInfo.iotDevice.on('error', (err) => {
                        logger.info(`${deviceId} 设备连接到阿里云IoT平台错误:`, err);
                    });
                }
                
            }
        }
        catch(err){
            logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`)
        }
    }
    //发送消息到iot云端
    postPropsToDevice(deviceId) {
        try{
            const deviceInfo = this.devices.get(deviceId);
            if (deviceInfo.iotDevice) {
              // 上报属性数据
              const props = deviceInfo.masData
 
              onDeviceDataReceived(deviceInfo.masData); // 调用函数处理接收到的数据
              deviceInfo.iotDevice.postProps(props, (res) => {
                if (res.message==='success') {
                    logger.info(`${deviceId} 上报属性成功:`, res);
                  
                } else {
                    logger.error(`${deviceId} 上报属性失败:`, res);
                }
              });
            }
        }catch(err){
            logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err);
        }
       
    }
    
 
    // 移除设备连接
    removeDevice(deviceId) {
        if (this.devices.has(deviceId)) {
            this.devices.delete(deviceId);  // 从设备列表中移除
            logger.info(`移除设备: ${deviceId}`);
        }
    }
}
 
// 创建服务器实例
const server = net.createServer((socket) => {
    const manager = new DeviceManager();
    manager.handleDevice(socket);
    manager.on('validResponse', (deviceId) => {
        logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`);
        // 可以在这里添加更多处理逻辑
    });
    manager.on('invalidResponse', (deviceId) => {
        logger.info(`设备 ${deviceId} 响应了无效或没有响应`);
        // 可以在这里添加更多处理逻辑
    });
   
    // 处理错误
    socket.on('error', (err) => {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
        logger.error(`与 ${deviceId} 的套接字发生错误:`, err);
        manager.removeDevice(deviceId);  // 移除设备连接
    });
 
    // 客户端断开时触发
    socket.on('end', () => {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
        logger.info(`设备断开连接: ${deviceId}`);
        manager.removeDevice(deviceId);  // 移除设备连接
    });
});
 
// 绑定端口并开始监听
server.listen(961, () => {
    logger.info('socket服务已经启动端口号:961');
 
});
const getDeviceSYZ=async (devcieNo)=>{
    try{
        const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo)
        if(data){
            return data
        }else{
            return ''
        }
 
    }catch(err){
        logger.error(` ${devcieNo} 获取三元组信息错误:`, err);
        throw new CustomError("新错误", error);
        
    }
}
// 接收到的数据
const  onDeviceDataReceived=(data)=> {
    const topic = `touxiji/${data.n}`;
    const payload = JSON.stringify({
      ...data,
      timestamp: new Date().toISOString()
    });
    try {
        logger.info(`发布消息到主题 ${topic}: ${payload}`);
        publishMessage(topic, payload);
    } catch (error) {  
        logger.error(`发布消息到主题 ${topic} 失败:`, error);
        throw new CustomError("发布消息失败", error);
 
     }
   
  }