const net = require('net'); // 引入 Node.js 的 net 模块用于创建网络服务器和客户端
|
const { crc16 } = require('./crc16'); // 引入上面的文件
|
const logger = require('./logger'); // 引入上面创建的logger
|
const { toModel,format } = require('./modeTool'); // 引入数据解析工具
|
const EventEmitter = require('events'); // 引入 Node.js 的 events 模块,用于事件驱动编程
|
const aliyunIot = require('aliyun-iot-device-sdk');
|
const { getAliyunDeviceSecret } = require('./api');
|
const { publishMessage } = require('./mqttClient');
|
// 定义 DeviceManager 类来管理设备连接
|
class DeviceManager extends EventEmitter {
|
constructor() {
|
super(); // 调用父类构造函数
|
this.devices = new Map(); // 使用 Map 存储设备连接及其状态
|
}
|
// 处理新设备连接
|
handleDevice(socket) {
|
const deviceId = socket.remoteAddress + ':' + socket.remotePort; // 根据 IP 和端口生成唯一设备ID
|
try{
|
logger.info(`建立新连接${deviceId}`)
|
this.devices.set(deviceId, {
|
socket,
|
lastAck: Date.now(), // 记录最后一次收到 ACK 的时间
|
status: 'pending', // 初始状态为 pending(等待确认)
|
retryInterval: null, // 重试机制定时器ID
|
keepAliveInterval: null, // 定时发送保持连接信号的定时器ID
|
lastSignal: 'K', // 最后发送的信号,默认为 'K'
|
iotDevice:null,
|
masData:'',
|
iotDeviceNo:'' // SOCKET连接的设备号
|
});
|
// 每个客户端连接都有一个独立的缓冲区 buffer。
|
let buffer = '';
|
// 监听客户端发送的数据
|
socket.on('data', (data) => {
|
const buffer = Buffer.from(data);
|
// 检查数据长度是否符合预期
|
if (buffer.length < 4) {
|
console.error('Invalid packet length');
|
return;
|
}
|
// 提取头信息和 CRC16
|
const header = buffer.slice(0, 2).toString('hex');
|
const receivedCrc = buffer.readUInt16LE(2); // 假设 CRC 在第3-4字节
|
const payload = buffer.slice(4);
|
// 计算 CRC16
|
const calculatedCrc = crc16(0x0000, payload);
|
if (receivedCrc === calculatedCrc&&header=='aabb') {
|
logger.info(`来自 ${deviceId} 的数据 CRC 校验通过`);
|
// CRC 校验通过,处理数据
|
this.handleData(deviceId, buffer); // 处理数据
|
}else{
|
logger.error(`来自 ${deviceId} 的数据 CRC 校验失败`);
|
}
|
});
|
}catch(err){
|
logger.error(`${deviceId}建立连接异常出错:${err}`)
|
}
|
}
|
|
|
// 处理来自客户端的数据
|
handleData(deviceId, message) {
|
try{
|
const deviceInfo = this.devices.get(deviceId);
|
if (!deviceInfo) return;
|
logger.info(`接收到来自 ${deviceId} 的消息: ${message.toString('hex')}`);
|
const masData= toModel(message)
|
deviceInfo.iotDeviceNo=masData.设备编号前16位+masData.设备编号后16位.toString() // 设备号
|
deviceInfo.masData = {
|
deviceType:'宝特莱D800',
|
suedtime:format(new Date()), // 使用当前时间作为使用时间
|
xycsML: masData.血氧参数脉率, // 血氧参数脉率
|
xycsSOP2: masData.血氧参数Spo2值, // 血氧参数 Spo2 值
|
xyjpjxy: masData.血压计平均血压, // 血压计平均血压
|
xdxrl: masData.相对血容量, // 相对血容量
|
n: deviceInfo.iotDeviceNo, // 设备号
|
qcl: masData.清除率, // 清除率
|
KTV: masData.KtV, // Kt/V
|
sbzt:masData.模式, // 设备状态
|
mb:masData.血压计脉搏, // 血压计脉搏
|
szy: masData.血压计舒张血压, // 血压计舒张血压
|
ssy: masData.血压计收缩血压, // 血压计收缩血压
|
hybyl: masData.后已补液量, // 后已补液量
|
hbyzl: masData.后补液总量, // 后补液总量
|
qybyl: masData.前已补液量, // 前已补液量
|
qbyzl: masData.前补液总量, // 前补液总量
|
gszjl: masData.追加量, // 追加量
|
gsdzsh: masData.肝素停止时间, // 肝素停止时间
|
E:masData.肝素速率, // 肝素速率
|
F: masData.透析液实际温度, // 透析液实际温度
|
L: masData.透析液实际流量, // 透析液实际流量
|
G:masData.电导度,// 电导度
|
J:masData.跨膜压, // 跨膜压
|
o:masData.动脉压, // 动脉压
|
H:masData.静脉压, // 静脉压
|
D:masData.实际血流量, // 实际血流量
|
SDXYLS:masData.设定血流量, // 设定血流量
|
A:masData.超滤目标, // 超滤目标
|
B:masData.当前已超滤量, // 当前已超滤量
|
C:masData.超滤率, // 超滤率
|
sysj:masData.治疗剩余时间, // 治疗剩余时间
|
}; // 保存解析后的数据
|
console.log(deviceInfo.masData)
|
if(deviceInfo.status!=='valid'){
|
deviceInfo.status = 'valid'; // 设备状态更新为有效
|
this.registerDevice(deviceId)
|
}else{
|
logger.info('注册成功后第二次就发送数据到阿里云')
|
this.postPropsToDevice(deviceId)
|
}
|
|
}catch(err){
|
logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`)
|
}
|
|
|
}
|
// 处理设备上阿里云
|
/**
|
* 用设备编号换阿里云注册信息上云
|
* @param {*} deviceId
|
*/
|
async registerDevice(deviceId) {
|
const deviceInfo = this.devices.get(deviceId);
|
logger.info(`${deviceId}注册阿里云iot设备`);
|
try{
|
if (!deviceInfo.iotDevice) {
|
// 请求三元组信息
|
logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`);
|
const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo)
|
logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data));
|
if(data&&data.productKey&&data.deviceName&&data.deviceSecret){
|
// 设备配置信息
|
const productKey = data.productKey; // 替换为你的产品密钥
|
const deviceName = data.deviceName; // 替换为你的设备名称
|
const deviceSecret = data.deviceSecret; // 替换为你的设备密钥
|
// 创建设备实例aliyunIot.device(devConfig);
|
deviceInfo.iotDevice = aliyunIot.device({
|
ProductKey: productKey,
|
DeviceName: deviceName,
|
DeviceSecret: deviceSecret
|
});
|
// 监听设备连接状态变化
|
deviceInfo.iotDevice.on('connect', () => {
|
});
|
|
deviceInfo.iotDevice.on('error', (err) => {
|
logger.info(`${deviceId} 设备连接到阿里云IoT平台错误:`, err);
|
});
|
}
|
|
}
|
}
|
catch(err){
|
logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`)
|
}
|
}
|
//发送消息到iot云端
|
postPropsToDevice(deviceId) {
|
try{
|
const deviceInfo = this.devices.get(deviceId);
|
if (deviceInfo.iotDevice) {
|
// 上报属性数据
|
const props = deviceInfo.masData
|
|
onDeviceDataReceived(deviceInfo.masData); // 调用函数处理接收到的数据
|
deviceInfo.iotDevice.postProps(props, (res) => {
|
if (res.message==='success') {
|
logger.info(`${deviceId} 上报属性成功:`, res);
|
|
} else {
|
logger.error(`${deviceId} 上报属性失败:`, res);
|
}
|
});
|
}
|
}catch(err){
|
logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err);
|
}
|
|
}
|
|
|
// 移除设备连接
|
removeDevice(deviceId) {
|
if (this.devices.has(deviceId)) {
|
this.devices.delete(deviceId); // 从设备列表中移除
|
logger.info(`移除设备: ${deviceId}`);
|
}
|
}
|
}
|
|
// 创建服务器实例
|
const server = net.createServer((socket) => {
|
const manager = new DeviceManager();
|
manager.handleDevice(socket);
|
manager.on('validResponse', (deviceId) => {
|
logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`);
|
// 可以在这里添加更多处理逻辑
|
});
|
manager.on('invalidResponse', (deviceId) => {
|
logger.info(`设备 ${deviceId} 响应了无效或没有响应`);
|
// 可以在这里添加更多处理逻辑
|
});
|
|
// 处理错误
|
socket.on('error', (err) => {
|
const deviceId = socket.remoteAddress + ':' + socket.remotePort;
|
logger.error(`与 ${deviceId} 的套接字发生错误:`, err);
|
manager.removeDevice(deviceId); // 移除设备连接
|
});
|
|
// 客户端断开时触发
|
socket.on('end', () => {
|
const deviceId = socket.remoteAddress + ':' + socket.remotePort;
|
logger.info(`设备断开连接: ${deviceId}`);
|
manager.removeDevice(deviceId); // 移除设备连接
|
});
|
});
|
|
// 绑定端口并开始监听
|
server.listen(961, () => {
|
logger.info('socket服务已经启动端口号:961');
|
|
});
|
const getDeviceSYZ=async (devcieNo)=>{
|
try{
|
const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo)
|
if(data){
|
return data
|
}else{
|
return ''
|
}
|
|
}catch(err){
|
logger.error(` ${devcieNo} 获取三元组信息错误:`, err);
|
throw new CustomError("新错误", error);
|
|
}
|
}
|
// 接收到的数据
|
const onDeviceDataReceived=(data)=> {
|
const topic = `touxiji/${data.n}`;
|
const payload = JSON.stringify({
|
...data,
|
timestamp: new Date().toISOString()
|
});
|
try {
|
logger.info(`发布消息到主题 ${topic}: ${payload}`);
|
publishMessage(topic, payload);
|
} catch (error) {
|
logger.error(`发布消息到主题 ${topic} 失败:`, error);
|
throw new CustomError("发布消息失败", error);
|
|
}
|
|
}
|
|