东丽网口版透析机 socket- server 通讯
chenyc
2026-01-01 ce98f732b9e4f32154d39454213e1abf3dc07f5b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
const net = require('net');  // 引入 Node.js 的 net 模块用于创建网络服务器和客户端
const logger = require('./logger'); // 引入上面创建的logger
const EventEmitter = require('events');  // 引入 Node.js 的 events 模块,用于事件驱动编程
const aliyunIot = require('aliyun-iot-device-sdk');
const { getAliyunDeviceSecret } = require('./api');
const toModel = require('./Strholp');
const { publishMessage } = require('./mqttClient');
// 定义 DeviceManager 类来管理设备连接
class DeviceManager extends EventEmitter {
    constructor() {
        super();  // 调用父类构造函数
        this.devices = new Map();  // 使用 Map 存储设备连接及其状态
    }
    // 处理新设备连接
    handleDevice(socket) {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;  // 根据 IP 和端口生成唯一设备ID
        try{
            logger.info(`建立新连接${deviceId}`)
            this.devices.set(deviceId, { 
                socket, 
                lastAck: Date.now(),  // 记录最后一次收到 ACK 的时间
                status: 'pending',  // 初始状态为 pending(等待确认)
                retryInterval: null,  // 重试机制定时器ID
                keepAliveInterval: null,  // 定时发送保持连接信号的定时器ID
                lastSignal: 'K',  // 最后发送的信号,默认为 'K'
                iotDevice:null,
                masData:'',
                iotDeviceNo:'' // SOCKET连接的设备号
            });
            // 发送初始 'K'
            this.sendKeepAliveToDevice(deviceId);
 
            // 启动重试机制
            this.startRetryMechanism(deviceId);
 
            // 每个客户端连接都有一个独立的缓冲区 buffer。
            let buffer = '';
            // 监听客户端发送的数据
            socket.on('data', (chunk) => {
                buffer += chunk.toString('ascii');  // 将新的数据片段添加到缓冲区
                console.log(buffer.length,'-------------------')
                while (true) {
                    console.log(buffer)
                    const messageBengIndex=buffer.indexOf('K1');
                    const messageEndIndex = buffer.indexOf('\r\n');  // 假设消息以 '\r\n' 结束
                    //如果没有K1说明没有开始符号  直接放弃之前接受的数据  如果拼接字符串太长还没有结束符号也丢弃
                    if(messageBengIndex===-1||buffer.length>300){
                        buffer=''
                        break
                    }
                    if (messageBengIndex===-1||messageEndIndex === -1) break;  // 如果没有找到结束符,跳出循环等待更多数据
                    
                    const completeMessage = buffer.substring(messageBengIndex, messageEndIndex).trim();  // 提取完整消息
                    buffer = ''; // 移除已处理的部分
                    logger.info(`${deviceId}接收到的完整消息`)
                    // 解析数据
                    this.handleData(deviceId, completeMessage.toString());  // 处理接收到的数据
                }
                
            });
        }catch(err){
            logger.error(`${deviceId}建立连接异常出错:${err}`)
        }
    }
    // 启动针对每个设备的重试机制
    startRetryMechanism(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        try{
            if (deviceInfo.retryInterval) clearInterval(deviceInfo.retryInterval);  // 清除之前的重试定时器
            deviceInfo.retryInterval = setInterval(() => {
                if (deviceInfo.status === 'pending'||deviceInfo.status==='invalid') {
                    // 来回切换发送,命令
                    if(deviceInfo.lastSignal==='K'){
                        deviceInfo.lastSignal='K0000'
                    }else{
                        deviceInfo.lastSignal='K'
                    }
                    logger.info(`尝试重新发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
                    this.sendKeepAliveToDevice(deviceId);  // 如果设备状态是待确认,则重发上次的信号
                }
            }, 10000);  // 每2秒重试一次
        }catch(err){
            logger.error(`${deviceId}设备重试机制出错:${err}`)
        }
    }
 
    // 停止针对每个设备的重试机制
    stopRetryMechanism(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        if (deviceInfo.retryInterval) {
            clearInterval(deviceInfo.retryInterval);  // 清除重试定时器
            deviceInfo.retryInterval = null;
        }
    }
 
    // 启动针对每个设备的定时发送保持连接信号的机制
    startKeepAlive(deviceId, signal) {
        const deviceInfo = this.devices.get(deviceId);
        if (deviceInfo.keepAliveInterval) clearInterval(deviceInfo.keepAliveInterval);  // 清除之前的定时器
        deviceInfo.lastSignal = signal;  // 更新最后发送的信号
        deviceInfo.keepAliveInterval = setInterval(() => {
            this.sendKeepAliveToDevice(deviceId);  // 每60秒发送一次保持连接信号
        }, 60000);  // 修改为每60秒发送一次
    }
 
    // 停止针对每个设备的定时发送保持连接信号的机制
    stopKeepAlive(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        if (deviceInfo.keepAliveInterval) {
            clearInterval(deviceInfo.keepAliveInterval);  // 清除定时器
            deviceInfo.keepAliveInterval = null;
        }
    }
 
    // 向特定设备发送保持连接信号
    sendKeepAliveToDevice(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        if (!deviceInfo) return;
        deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`);  // 根据上次成功信号发送 'K' 或 'K0000'
        logger.info(`发送保持连接信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
    }
 
    // 处理来自客户端的数据
    handleData(deviceId, message) {
        try{
            const deviceInfo = this.devices.get(deviceId);
            if (!deviceInfo) return;
            logger.info(`接收到来自 ${deviceId} 的消息: ${message}`);
            deviceInfo.lastAck = Date.now();  // 更新最后收到 ACK 的时间
            const masData= toModel(message)
            deviceInfo.iotDeviceNo=masData.n
            deviceInfo.masData= masData
            // 如果已经是有效的接受数据了  就不用在启用定时发送了  直接改状体
            if(deviceInfo.status!=='valid'){
                this.emit('validResponse', deviceId);  // 触发 validResponse 事件
                deviceInfo.status = 'valid';  // 设备状态更新为有效
                console.log(`停止重试${deviceId}`)
                this.stopRetryMechanism(deviceId);  // 停止重试机制
                // 根据上次发送的信号启动相应的定时发送机制
                this.startKeepAlive(deviceId, deviceInfo.lastSignal);  // 启动定时发送 'K0000' 的机制
                logger.info(`${deviceId}启动定时发送 ${deviceInfo.lastSignal} 的机制60秒一次`);
                this.registerDevice(deviceId)
            }else{
                logger.info('注册成功后第二次就发送数据到阿里云')
                this.postPropsToDevice(deviceId)
 
            }
        }catch(err){
            logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`)
        }
        
       
    }
    // 处理设备上阿里云
    async registerDevice(deviceId) {
        const deviceInfo = this.devices.get(deviceId);
        logger.info(`${deviceId}注册阿里云iot设备`);
        try{
            if (!deviceInfo.iotDevice) {
                // 请求三元组信息
                logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`);
                const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo)
                logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data));
                if(data&&data.productKey&&data.deviceName&&data.deviceSecret){
                    // 设备配置信息
                    const productKey = data.productKey; // 替换为你的产品密钥
                    const deviceName = data.deviceName; // 替换为你的设备名称
                    const deviceSecret = data.deviceSecret; // 替换为你的设备密钥
                    // 创建设备实例aliyunIot.device(devConfig);
                    deviceInfo.iotDevice = aliyunIot.device({
                        ProductKey: productKey,
                        DeviceName: deviceName,
                        DeviceSecret: deviceSecret
                    });
                    // 监听设备连接状态变化
                    deviceInfo.iotDevice.on('connect', () => {
                        // logger.info(`${deviceId} 连接到阿里云IoT平台成功`);
                    });
                
                    deviceInfo.iotDevice.on('error', (err) => {
                        logger.info(`${deviceId} 设备连接到阿里云IoT平台错误:`, err);
                    });
                }
                
            }
        }
        catch(err){
            logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`)
        }
    }
    //发送消息到iot云端
    postPropsToDevice(deviceId) {
        try{
            const deviceInfo = this.devices.get(deviceId);
            if (deviceInfo.iotDevice) {
              // 上报属性数据
              const props = deviceInfo.masData
 
              onDeviceDataReceived(deviceInfo.masData); // 调用函数处理接收到的数据
              deviceInfo.iotDevice.postProps(props, (res) => {
                if (res.message==='success') {
                    logger.info(`${deviceId} 上报属性成功:`, res);
                  
                } else {
                    logger.error(`${deviceId} 上报属性失败:`, res);
                }
              });
            }
        }catch(err){
            logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err);
        }
       
    }
    
 
    // 移除设备连接
    removeDevice(deviceId) {
        if (this.devices.has(deviceId)) {
            this.stopRetryMechanism(deviceId);  // 停止重试机制
            this.stopKeepAlive(deviceId);  // 停止定时发送保持连接信号的机制
            this.devices.delete(deviceId);  // 从设备列表中移除
            logger.info(`移除设备: ${deviceId}`);
        }
    }
}
 
// 创建服务器实例
const server = net.createServer((socket) => {
    const manager = new DeviceManager();
    manager.handleDevice(socket);
    manager.on('validResponse', (deviceId) => {
        logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`);
        // 可以在这里添加更多处理逻辑
    });
    manager.on('invalidResponse', (deviceId) => {
        logger.info(`设备 ${deviceId} 响应了无效或没有响应`);
        // 可以在这里添加更多处理逻辑
    });
   
    // 处理错误
    socket.on('error', (err) => {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
        logger.error(`与 ${deviceId} 的套接字发生错误:`, err);
        manager.removeDevice(deviceId);  // 移除设备连接
    });
 
    // 客户端断开时触发
    socket.on('end', () => {
        const deviceId = socket.remoteAddress + ':' + socket.remotePort;
        logger.info(`设备断开连接: ${deviceId}`);
        manager.removeDevice(deviceId);  // 移除设备连接
    });
});
 
// 绑定端口并开始监听
server.listen(961, () => {
    logger.info('socket服务已经启动端口号:961');
 
});
const getDeviceSYZ=async (devcieNo)=>{
    try{
        const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo)
        if(data){
            return data
        }else{
            return ''
        }
 
    }catch(err){
        logger.error(` ${devcieNo} 获取三元组信息错误:`, err);
        throw new CustomError("新错误", error);
        
    }
}
// 接收到的数据
const  onDeviceDataReceived=(data)=> {
    const topic = `touxiji/${data.n}`;
    const payload = JSON.stringify({
      ...data,
      timestamp: new Date().toISOString()
    });
    try {
        logger.info(`发布消息到主题 ${topic}: ${payload}`);
        publishMessage(topic, payload);
    } catch (error) {  
        logger.error(`发布消息到主题 ${topic} 失败:`, error);
        throw new CustomError("发布消息失败", error);
 
     }
   
  }