From 55c8181bbe4c198f9bda5520ea0d8ba148933f9e Mon Sep 17 00:00:00 2001
From: chenyc <501753378@qq.com>
Date: 星期五, 16 一月 2026 10:34:53 +0800
Subject: [PATCH] gx重试指令间隔
---
index.js | 545 +++++++++++++++++++++++++++++++-----------------------
1 files changed, 315 insertions(+), 230 deletions(-)
diff --git a/index.js b/index.js
index 6991b24..826b343 100644
--- a/index.js
+++ b/index.js
@@ -1,271 +1,356 @@
-const net = require('net'); // 引入 Node.js 的 net 模块用于创建网络服务器和客户端
-const logger = require('./logger'); // 引入上面创建的logger
-const EventEmitter = require('events'); // 引入 Node.js 的 events 模块,用于事件驱动编程
+
+
+// ✅ 正确写法(适用于 pkg 打包环境)
+const fs = require('fs');
+const path = require('path');
+
+
+// 获取 exe 所在目录(兼容 pkg 打包后的 __dirname)
+const appPath = process.pkg ? path.dirname(process.execPath) : __dirname;
+const mqttConfigPath = path.join(appPath, 'mqtt.json');
+const aliyunConfigPath = path.join(appPath, 'aliyun.json');
+const httpConfigPath = path.join(appPath, 'httpConfig.json');
+const homeConfigPath = path.join(appPath, 'homeConfig.json');
+
+const mqttConfig = JSON.parse(fs.readFileSync(mqttConfigPath, 'utf8'));
+const aliyunConfig = JSON.parse(fs.readFileSync(aliyunConfigPath, 'utf8'));
+const httpConfig = JSON.parse(fs.readFileSync(httpConfigPath, 'utf8'));
+const homeConfig = JSON.parse(fs.readFileSync(homeConfigPath, 'utf8'));
+
+
+console.log(aliyunConfig)
+
+
+
+const { initMqtt, publishMessage } = require('./mqttClient');
+
+const net = require('net');
+const logger = require('./logger');
+const EventEmitter = require('events');
const aliyunIot = require('aliyun-iot-device-sdk');
const { getAliyunDeviceSecret } = require('./api');
const toModel = require('./Strholp');
-// 定义 DeviceManager 类来管理设备连接
+const dataCache = require('./dataCache');
+const HttpServer = require('./httpServer');
+
+// 初始化 MQTT(独立于阿里云)
+initMqtt(mqttConfig);
+
+// ========== 自定义错误类(可选)==========
+class CustomError extends Error {
+ constructor(message, cause) {
+ super(message);
+ this.cause = cause;
+ }
+}
+
+// ========== 常量配置 ==========
+const MAX_BUFFER_SIZE = 500; // 缓冲区最大长度
+const RETRY_INTERVAL_MS = 30000; // 重试间隔 30s
+const KEEP_ALIVE_INTERVAL_MS = 60000; // 保活间隔 60s
+const DEVICE_TIMEOUT_MS = 120000; // 设备无响应超时 2分钟
+
+// ========== DeviceManager 类 ==========
class DeviceManager extends EventEmitter {
constructor() {
- super(); // 调用父类构造函数
- this.devices = new Map(); // 使用 Map 存储设备连接及其状态
- }
- // 处理新设备连接
- handleDevice(socket) {
- const deviceId = socket.remoteAddress + ':' + socket.remotePort; // 根据 IP 和端口生成唯一设备ID
- try{
- logger.info(`建立新连接${deviceId}`)
- this.devices.set(deviceId, {
- socket,
- lastAck: Date.now(), // 记录最后一次收到 ACK 的时间
- status: 'pending', // 初始状态为 pending(等待确认)
- retryInterval: null, // 重试机制定时器ID
- keepAliveInterval: null, // 定时发送保持连接信号的定时器ID
- lastSignal: 'K', // 最后发送的信号,默认为 'K'
- iotDevice:null,
- masData:'',
- iotDeviceNo:'' // SOCKET连接的设备号
- });
- // 发送初始 'K'
- this.sendKeepAliveToDevice(deviceId);
+ super();
+ this.devices = new Map();
- // 启动重试机制
- this.startRetryMechanism(deviceId);
-
- // 每个客户端连接都有一个独立的缓冲区 buffer。
- let buffer = '';
- // 监听客户端发送的数据
- socket.on('data', (chunk) => {
- buffer += chunk.toString('ascii'); // 将新的数据片段添加到缓冲区
- console.log(buffer.length,'-------------------')
- while (true) {
- console.log(buffer)
- const messageBengIndex=buffer.indexOf('K1');
- const messageEndIndex = buffer.indexOf('\r\n'); // 假设消息以 '\r\n' 结束
- //如果没有K1说明没有开始符号 直接放弃之前接受的数据 如果拼接字符串太长还没有结束符号也丢弃
- if(messageBengIndex===-1||buffer.length>300){
- buffer=''
- break
- }
- if (messageBengIndex===-1||messageEndIndex === -1) break; // 如果没有找到结束符,跳出循环等待更多数据
-
- const completeMessage = buffer.substring(messageBengIndex, messageEndIndex).trim(); // 提取完整消息
- buffer = ''; // 移除已处理的部分
- logger.info(`${deviceId}接收到的完整消息`)
- // 解析数据
- this.handleData(deviceId, completeMessage.toString()); // 处理接收到的数据
+ // 启动全局超时清理定时器
+ setInterval(() => {
+ const now = Date.now();
+ for (const [deviceId, info] of this.devices.entries()) {
+ if (now - info.lastAck > DEVICE_TIMEOUT_MS) {
+ logger.warn(`设备 ${deviceId} 超时(${DEVICE_TIMEOUT_MS / 1000}s 无响应),自动断开`);
+ this.removeDevice(deviceId);
}
-
- });
- }catch(err){
- logger.error(`${deviceId}建立连接异常出错:${err}`)
- }
+ }
+ }, 30000); // 每30秒检查一次
}
- // 启动针对每个设备的重试机制
+
+ handleDevice(socket) {
+ // 处理 IPv6 映射 IPv4 地址 (::ffff:127.0.0.1 -> 127.0.0.1)
+ let remoteAddress = socket.remoteAddress;
+ if (remoteAddress.startsWith('::ffff:')) {
+ remoteAddress = remoteAddress.slice(7); // 移除 ::ffff: 前缀
+ }
+ const deviceId = remoteAddress + ':' + socket.remotePort;
+ logger.info(`建立新连接: ${deviceId}`);
+
+ const deviceInfo = {
+ socket,
+ lastAck: Date.now(),
+ status: 'pending', // pending → valid → registered
+ retryInterval: null,
+ keepAliveInterval: null,
+ lastSignal: 'K', // 初始握手信号
+ iotDevice: null,
+ masData: '',
+ iotDeviceNo: ''
+ };
+
+ this.devices.set(deviceId, deviceInfo);
+
+ // 发送初始握手信号
+ this.sendKeepAliveToDevice(deviceId);
+ this.startRetryMechanism(deviceId);
+
+ let buffer = '';
+
+ socket.on('data', (chunk) => {
+ buffer += chunk.toString('ascii');
+
+ while (true) {
+ const startIdx = buffer.indexOf('K1');
+ if (startIdx === -1) {
+ // 没有起始标志,但保留数据(防分包)
+ if (buffer.length > MAX_BUFFER_SIZE) buffer = '';
+ break;
+ }
+
+ const endIdx = buffer.indexOf('\r\n', startIdx);
+ if (endIdx === -1) {
+ // 结束符未到,保留从 K1 开始的数据
+ if (buffer.length > MAX_BUFFER_SIZE) {
+ buffer = buffer.substring(startIdx);
+ }
+ break;
+ }
+
+ const message = buffer.substring(startIdx, endIdx).trim();
+ buffer = buffer.substring(endIdx + 2); // 移除已处理部分(含 \r\n)
+
+ logger.info(`${deviceId} 接收到完整消息: ${randomLetters(20)}${message}${randomLetters(20)}`);
+ this.handleData(deviceId, message);
+ }
+ });
+
+ socket.on('error', (err) => {
+ logger.error(`设备 ${deviceId} 套接字错误:`, err.message);
+ this.removeDevice(deviceId);
+ });
+
+ socket.on('end', () => {
+ logger.info(`设备断开连接: ${deviceId}`);
+ this.removeDevice(deviceId);
+ });
+ }
+
startRetryMechanism(deviceId) {
const deviceInfo = this.devices.get(deviceId);
- try{
- if (deviceInfo.retryInterval) clearInterval(deviceInfo.retryInterval); // 清除之前的重试定时器
- deviceInfo.retryInterval = setInterval(() => {
- if (deviceInfo.status === 'pending'||deviceInfo.status==='invalid') {
- // 来回切换发送,命令
- if(deviceInfo.lastSignal==='K'){
- deviceInfo.lastSignal='K0000'
- }else{
- deviceInfo.lastSignal='K'
- }
- logger.info(`尝试重新发送 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
- this.sendKeepAliveToDevice(deviceId); // 如果设备状态是待确认,则重发上次的信号
- }
- }, 5000); // 每2秒重试一次
- }catch(err){
- logger.error(`${deviceId}设备重试机制出错:${err}`)
- }
+ if (!deviceInfo) return;
+
+ this.stopRetryMechanism(deviceId);
+ deviceInfo.retryInterval = setInterval(() => {
+ if (deviceInfo.status === 'pending' || deviceInfo.status === 'invalid') {
+ // 握手阶段:在 'K' 和 'K0000' 之间切换(你的原始逻辑)
+ deviceInfo.lastSignal = deviceInfo.lastSignal === 'K' ? 'K0000' : 'K';
+ logger.info(`重试发送 '${randomLetters(10)}${deviceInfo.lastSignal==='K'?'a':'b'}${randomLetters(10)}' 给设备 ${deviceId}`);
+ this.sendKeepAliveToDevice(deviceId);
+ }
+ }, RETRY_INTERVAL_MS);
}
- // 停止针对每个设备的重试机制
stopRetryMechanism(deviceId) {
const deviceInfo = this.devices.get(deviceId);
- if (deviceInfo.retryInterval) {
- clearInterval(deviceInfo.retryInterval); // 清除重试定时器
+ if (deviceInfo?.retryInterval) {
+ clearInterval(deviceInfo.retryInterval);
deviceInfo.retryInterval = null;
}
}
- // 启动针对每个设备的定时发送保持连接信号的机制
startKeepAlive(deviceId, signal) {
const deviceInfo = this.devices.get(deviceId);
- if (deviceInfo.keepAliveInterval) clearInterval(deviceInfo.keepAliveInterval); // 清除之前的定时器
- deviceInfo.lastSignal = signal; // 更新最后发送的信号
+ if (!deviceInfo) return;
+
+ this.stopKeepAlive(deviceId);
+ deviceInfo.lastSignal = signal;
deviceInfo.keepAliveInterval = setInterval(() => {
- this.sendKeepAliveToDevice(deviceId); // 每60秒发送一次保持连接信号
- }, 60000); // 修改为每60秒发送一次
+ this.sendKeepAliveToDevice(deviceId);
+ }, KEEP_ALIVE_INTERVAL_MS);
}
- // 停止针对每个设备的定时发送保持连接信号的机制
stopKeepAlive(deviceId) {
const deviceInfo = this.devices.get(deviceId);
- if (deviceInfo.keepAliveInterval) {
- clearInterval(deviceInfo.keepAliveInterval); // 清除定时器
+ if (deviceInfo?.keepAliveInterval) {
+ clearInterval(deviceInfo.keepAliveInterval);
deviceInfo.keepAliveInterval = null;
}
}
- // 向特定设备发送保持连接信号
sendKeepAliveToDevice(deviceId) {
const deviceInfo = this.devices.get(deviceId);
- if (!deviceInfo) return;
- deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`); // 根据上次成功信号发送 'K' 或 'K0000'
- logger.info(`发送保持连接信号 '${deviceInfo.lastSignal}' 给设备 ${deviceId}`);
- }
+ if (!deviceInfo || !deviceInfo.socket.writable) return;
- // 处理来自客户端的数据
- handleData(deviceId, message) {
- try{
- const deviceInfo = this.devices.get(deviceId);
- if (!deviceInfo) return;
- logger.info(`接收到来自 ${deviceId} 的消息: ${message}`);
- deviceInfo.lastAck = Date.now(); // 更新最后收到 ACK 的时间
- const masData= toModel(message)
- deviceInfo.iotDeviceNo=masData.n
- deviceInfo.masData= masData
- // 如果已经是有效的接受数据了 就不用在启用定时发送了 直接改状体
- if(deviceInfo.status!=='valid'){
- this.emit('validResponse', deviceId); // 触发 validResponse 事件
- deviceInfo.status = 'valid'; // 设备状态更新为有效
- console.log(`停止重试${deviceId}`)
- this.stopRetryMechanism(deviceId); // 停止重试机制
- // 根据上次发送的信号启动相应的定时发送机制
- this.startKeepAlive(deviceId, deviceInfo.lastSignal); // 启动定时发送 'K0000' 的机制
- logger.info(`${deviceId}启动定时发送 'K' 的机制60秒一次`);
- this.registerDevice(deviceId)
- }else{
- logger.info('注册成功后第二次就发送数据到阿里云')
- this.postPropsToDevice(deviceId)
-
- }
- }catch(err){
- logger.error(`${deviceId}处理客户端的消息数据出错了;${err}`)
+ try {
+ deviceInfo.socket.write(`${deviceInfo.lastSignal}\r\n`);
+ logger.info(`发送信号 '${randomLetters(10)}${deviceInfo.lastSignal==='K'?'a':'b'}${randomLetters(10)}' 给设备 ${deviceId}`);
+ } catch (err) {
+ logger.error(`发送信号失败 ${deviceId}:`, err.message);
+ this.removeDevice(deviceId);
}
-
-
}
- // 处理设备上阿里云
- async registerDevice(deviceId) {
+
+ async handleData(deviceId, message) {
const deviceInfo = this.devices.get(deviceId);
- logger.info(`${deviceId}注册阿里云iot设备`);
- try{
- if (!deviceInfo.iotDevice) {
- // 请求三元组信息
- logger.info(`${deviceId}请求三元组请求信息${deviceInfo.iotDeviceNo}`);
- const {data}=await getDeviceSYZ(deviceInfo.iotDeviceNo)
- logger.info(`${deviceId}请求三元组返回信息`+JSON.stringify(data));
- if(data&&data.productKey&&data.deviceName&&data.deviceSecret){
- // 设备配置信息
- const productKey = data.productKey; // 替换为你的产品密钥
- const deviceName = data.deviceName; // 替换为你的设备名称
- const deviceSecret = data.deviceSecret; // 替换为你的设备密钥
- // 创建设备实例aliyunIot.device(devConfig);
- deviceInfo.iotDevice = aliyunIot.device({
- ProductKey: productKey,
- DeviceName: deviceName,
- DeviceSecret: deviceSecret
- });
- // 监听设备连接状态变化
- deviceInfo.iotDevice.on('connect', () => {
- logger.info(`${deviceId} 连接到阿里云IoT平台成功`);
- });
-
- deviceInfo.iotDevice.on('error', (err) => {
- logger.info(`${deviceId} 设备连接错误:`, err);
- });
- }
-
- }
- }
- catch(err){
- logger.error(`${deviceId} 阿里云iot设备连接错误:${err}`)
- }
- }
- //发送消息到iot云端
- postPropsToDevice(deviceId) {
- try{
- const deviceInfo = this.devices.get(deviceId);
- if (deviceInfo.iotDevice) {
- // 上报属性数据
- const props = deviceInfo.masData
- deviceInfo.iotDevice.postProps(props, (res) => {
- if (res.message==='success') {
- logger.info(`${deviceId} 上报属性成功:`, res);
-
- } else {
- logger.error(`${deviceId} 上报属性失败:`, res);
- }
- });
- }
- }catch(err){
- logger.error(`${deviceId} 数据上传到阿里云失败报错:`, err);
- }
-
- }
-
+ if (!deviceInfo) return;
+ deviceInfo.lastAck = Date.now();
- // 移除设备连接
- removeDevice(deviceId) {
- if (this.devices.has(deviceId)) {
- this.stopRetryMechanism(deviceId); // 停止重试机制
- this.stopKeepAlive(deviceId); // 停止定时发送保持连接信号的机制
- this.devices.delete(deviceId); // 从设备列表中移除
- logger.info(`移除设备: ${deviceId}`);
+ try {
+ const ipAddress = deviceId
+ const masData = toModel(message, ipAddress);
+ deviceInfo.iotDeviceNo = masData.n;
+ deviceInfo.masData = masData;
+
+
+ // ✅【新增】缓存数据到内存(按设备序号)
+ dataCache.setDeviceData(masData.n, masData);
+
+ // ✅【核心改动】收到数据立即发 MQTT(不管阿里云)
+ if (mqttConfig.enabled) {
+ const topic = `${mqttConfig.defaultTopicPrefix}/${masData.n}`;
+ const payload = JSON.stringify({
+ ...masData,
+ deviceId: deviceId,
+ timestamp: new Date().toISOString()
+ });
+ publishMessage(topic, payload);
+ logger.info(`📡 已通过 MQTT 发送数据到 ${topic}`);
+ }
+
+ // 【可选】仅当阿里云启用时才注册并上报
+ if (aliyunConfig.enabled) {
+ if (deviceInfo.status !== 'valid') {
+ logger.info(`${deviceId} 阿里云设备状态变为有效`);
+ deviceInfo.status = 'valid';
+ this.stopRetryMechanism(deviceId);
+ logger.info(`${deviceId} 停止重试机制`);
+ this.startKeepAlive(deviceId, deviceInfo.lastSignal);
+ logger.info(`${deviceId} 启动保持连接机制`);
+ await this.registerAliyunDevice(deviceId); // 改名避免混淆
+ } else {
+ this.postPropsToAliyun(deviceId);
+ }
+ } else {
+ // 阿里云未启用,但已通过 MQTT 发送,无需其他操作
+ if (deviceInfo.status !== 'valid') {
+ deviceInfo.status = 'valid';
+ this.stopRetryMechanism(deviceId);
+ this.startKeepAlive(deviceId, deviceInfo.lastSignal);
+ logger.info(`${deviceId} 已完成握手(阿里云未启用)`);
+ }
+ }
+ } catch (err) {
+ logger.error(`${deviceId} 处理消息出错:`, err.message);
}
+ }
+
+ async registerAliyunDevice(deviceId) {
+ const deviceInfo = this.devices.get(deviceId);
+ if (!deviceInfo || deviceInfo.iotDevice) return;
+
+ try {
+ logger.info(`${deviceId} 请求三元组,设备号: ${deviceInfo.iotDeviceNo}`);
+ const { data } = await getDeviceSYZ(deviceInfo.iotDeviceNo);
+ logger.info(`${deviceId} 三元组返回: ${JSON.stringify(data)}`);
+ const model = data.data
+ if (model?.productKey && model?.deviceName && model?.deviceSecret) {
+ deviceInfo.iotDevice = aliyunIot.device({
+ ProductKey: model.productKey,
+ DeviceName: model.deviceName,
+ DeviceSecret: model.deviceSecret
+ });
+
+ deviceInfo.iotDevice.on('connect', () => {
+ logger.info(`${deviceId} 连接阿里云 IoT 成功`);
+ });
+
+ deviceInfo.iotDevice.on('error', (err) => {
+ logger.error(`${deviceId} 阿里云 IoT 错误:`, err.message);
+ });
+ }
+ } catch (err) {
+ logger.error(`${deviceId} 阿里云注册失败:`, err.message);
+ }
+ }
+
+ postPropsToAliyun(deviceId) {
+ const deviceInfo = this.devices.get(deviceId);
+ if (!deviceInfo?.iotDevice || !deviceInfo.masData) return;
+ logger.info(`${deviceId} 阿里云上报属性: ${JSON.stringify(deviceInfo.masData)}`);
+ const props = deviceInfo.masData;
+ deviceInfo.iotDevice.postProps(props, (res) => {
+ if (res?.message === 'success') {
+ logger.info(`${deviceId} 阿里云上报属性成功`);
+ } else {
+ logger.error(`${deviceId} 阿里云上报属性失败:`, res?.message || 'unknown');
+ }
+ });
+ }
+
+ removeDevice(deviceId) {
+ const deviceInfo = this.devices.get(deviceId);
+ if (!deviceInfo) return;
+
+ this.stopRetryMechanism(deviceId);
+ this.stopKeepAlive(deviceId);
+
+ if (deviceInfo.socket && !deviceInfo.socket.destroyed) {
+ deviceInfo.socket.destroy();
+ }
+
+ if (deviceInfo.iotDevice) {
+ deviceInfo.iotDevice.end(); // 安全关闭 IoT 连接
+ }
+
+ this.devices.delete(deviceId);
+ logger.info(`设备已移除: ${deviceId}`);
}
}
-// 创建服务器实例
-const server = net.createServer((socket) => {
- const manager = new DeviceManager();
- manager.handleDevice(socket);
- manager.on('validResponse', (deviceId) => {
- logger.info(`设备 ${deviceId} 成功响应了有效的 ACK`);
- // 可以在这里添加更多处理逻辑
- });
- manager.on('invalidResponse', (deviceId) => {
- logger.info(`设备 ${deviceId} 响应了无效或没有响应`);
- // 可以在这里添加更多处理逻辑
- });
-
- // 处理错误
- socket.on('error', (err) => {
- const deviceId = socket.remoteAddress + ':' + socket.remotePort;
- logger.error(`与 ${deviceId} 的套接字发生错误:`, err);
- manager.removeDevice(deviceId); // 移除设备连接
- });
-
- // 客户端断开时触发
- socket.on('end', () => {
- const deviceId = socket.remoteAddress + ':' + socket.remotePort;
- logger.info(`设备断开连接: ${deviceId}`);
- manager.removeDevice(deviceId); // 移除设备连接
- });
-});
-
-// 绑定端口并开始监听
-server.listen(961, () => {
- logger.info('socket服务已经启动端口号:961');
-
-});
-const getDeviceSYZ=async (devcieNo)=>{
- try{
- const {data,message}= await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret',devcieNo)
- if(data){
- return data
- }else{
- return ''
- }
-
- }catch(err){
- logger.error(` ${devcieNo} 获取三元组信息错误:`, err);
- throw new CustomError("新错误", error);
-
+// ========== 辅助函数 ==========
+async function getDeviceSYZ(deviceNo) {
+ try {
+ const { data, message } = await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret', deviceNo);
+ return { data };
+ } catch (err) {
+ logger.error(`${deviceNo} 获取三元组失败:`, err.message);
+ throw new CustomError("获取三元组失败", err);
}
-}
\ No newline at end of file
+}
+// 生成随机字母字符串
+function randomLetters(length) {
+ const chars = 'abcdefghijklmnopqrstuvwxyz';
+ let result = '';
+ for (let i = 0; i < length; i++) {
+ result += chars.charAt(Math.floor(Math.random() * chars.length));
+ }
+ return result;
+}
+
+
+
+// ========== 启动服务器 ==========
+const manager = new DeviceManager(); // ✅ 单例!
+
+manager.on('validResponse', (deviceId) => {
+ logger.info(`设备 ${deviceId} 成功完成握手`);
+});
+
+manager.on('invalidResponse', (deviceId) => {
+ logger.info(`设备 ${deviceId} 无效响应`);
+});
+
+const server = net.createServer((socket) => {
+ manager.handleDevice(socket);
+});
+
+const PORT = homeConfig.socketPort || 10961;
+server.listen(PORT, () => {
+ logger.info(`Socket 服务已启动,监听超级端口: ${PORT}`);
+});
+
+// ========== 启动 HTTP 服务 ==========
+const HTTP_PORT = process.env.HTTP_PORT || httpConfig.port || 8080;
+const httpServer = new HttpServer(HTTP_PORT, httpConfig);
+httpServer.start();
\ No newline at end of file
--
Gitblit v1.8.0