// ✅ 正确写法(适用于 pkg 打包环境) const fs = require('fs'); const path = require('path'); // 获取 exe 所在目录(兼容 pkg 打包后的 __dirname) const appPath = process.pkg ? path.dirname(process.execPath) : __dirname; const mqttConfigPath = path.join(appPath, 'mqtt.json'); const aliyunConfigPath = path.join(appPath, 'aliyun.json'); const httpConfigPath = path.join(appPath, 'httpConfig.json'); const homeConfigPath = path.join(appPath, 'homeConfig.json'); const mqttConfig = JSON.parse(fs.readFileSync(mqttConfigPath, 'utf8')); const aliyunConfig = JSON.parse(fs.readFileSync(aliyunConfigPath, 'utf8')); const httpConfig = JSON.parse(fs.readFileSync(httpConfigPath, 'utf8')); const homeConfig = JSON.parse(fs.readFileSync(homeConfigPath, 'utf8')); console.log(aliyunConfig) const { initMqtt, publishMessage } = require('./mqttClient'); const net = require('net'); const logger = require('./logger'); const EventEmitter = require('events'); const aliyunIot = require('aliyun-iot-device-sdk'); const { getAliyunDeviceSecret } = require('./api'); const toModel = require('./Strholp'); const dataCache = require('./dataCache'); const HttpServer = require('./httpServer'); // 初始化 MQTT(独立于阿里云) initMqtt(mqttConfig); // ========== 自定义错误类(可选)========== class CustomError extends Error { constructor(message, cause) { super(message); this.cause = cause; } } // ========== 常量配置 ========== const MAX_BUFFER_SIZE = 500; // 缓冲区最大长度 const RETRY_INTERVAL_MS = 10000; // 重试间隔 10s const KEEP_ALIVE_INTERVAL_MS = 60000; // 保活间隔 60s const DEVICE_TIMEOUT_MS = 120000; // 设备无响应超时 2分钟 // ========== DeviceManager 类 ========== class DeviceManager extends EventEmitter { constructor() { super(); this.devices = new Map(); // 启动全局超时清理定时器 setInterval(() => { const now = Date.now(); for (const [deviceId, info] of this.devices.entries()) { if (now - info.lastAck > DEVICE_TIMEOUT_MS) { logger.warn(`设备 ${deviceId} 超时(${DEVICE_TIMEOUT_MS / 1000}s 无响应),自动断开`); this.removeDevice(deviceId); } } }, 30000); // 每30秒检查一次 } handleDevice(socket) { // 处理 IPv6 映射 IPv4 地址 (::ffff:127.0.0.1 -> 127.0.0.1) let remoteAddress = socket.remoteAddress; if (remoteAddress.startsWith('::ffff:')) { remoteAddress = remoteAddress.slice(7); // 移除 ::ffff: 前缀 } const deviceId = remoteAddress + ':' + socket.remotePort; logger.info(`建立新连接: ${deviceId}`); const deviceInfo = { socket, lastAck: Date.now(), status: 'pending', // pending → valid → registered retryInterval: null, keepAliveInterval: null, lastSignal: 'K', // 初始握手信号 iotDevice: null, masData: '', iotDeviceNo: '' }; this.devices.set(deviceId, deviceInfo); // 发送初始握手信号 this.sendKeepAliveToDevice(deviceId); this.startRetryMechanism(deviceId); let buffer = ''; socket.on('data', (chunk) => { buffer += chunk.toString('ascii'); while (true) { const startIdx = buffer.indexOf('K1'); if (startIdx === -1) { // 没有起始标志,但保留数据(防分包) if (buffer.length > MAX_BUFFER_SIZE) buffer = ''; break; } const endIdx = buffer.indexOf('\r\n', startIdx); if (endIdx === -1) { // 结束符未到,保留从 K1 开始的数据 if (buffer.length > MAX_BUFFER_SIZE) { buffer = buffer.substring(startIdx); } break; } const message = buffer.substring(startIdx, endIdx).trim(); buffer = buffer.substring(endIdx + 2); // 移除已处理部分(含 \r\n) logger.info(`${deviceId} 接收到完整消息: q1261A00.00B00.00C00.00D0DSDASDSADSADSADSew5w5${message}nq4ds65g6m6767m`); this.handleData(deviceId, message); } }); socket.on('error', (err) => { logger.error(`设备 ${deviceId} 套接字错误:`, err.message); this.removeDevice(deviceId); }); socket.on('end', () => { logger.info(`设备断开连接: ${deviceId}`); this.removeDevice(deviceId); }); } startRetryMechanism(deviceId) { const deviceInfo = this.devices.get(deviceId); if (!deviceInfo) return; this.stopRetryMechanism(deviceId); deviceInfo.retryInterval = setInterval(() => { if (deviceInfo.status === 'pending' || deviceInfo.status === 'invalid') { // 握手阶段:在 'K' 和 'K0000' 之间切换(你的原始逻辑) deviceInfo.lastSignal = deviceInfo.lastSignal === 'K' ? 'K0000' : 'K'; logger.info(`重试发送 'abc${deviceInfo.lastSignal==='K'?'1':'0000'}abc' 给设备 ${deviceId}`); this.sendKeepAliveToDevice(deviceId); } }, RETRY_INTERVAL_MS); } stopRetryMechanism(deviceId) { const deviceInfo = this.devices.get(deviceId); if (deviceInfo?.retryInterval) { clearInterval(deviceInfo.retryInterval); deviceInfo.retryInterval = null; } } startKeepAlive(deviceId, signal) { const deviceInfo = this.devices.get(deviceId); if (!deviceInfo) return; this.stopKeepAlive(deviceId); deviceInfo.lastSignal = signal; deviceInfo.keepAliveInterval = setInterval(() => { this.sendKeepAliveToDevice(deviceId); }, KEEP_ALIVE_INTERVAL_MS); } stopKeepAlive(deviceId) { const deviceInfo = this.devices.get(deviceId); if (deviceInfo?.keepAliveInterval) { clearInterval(deviceInfo.keepAliveInterval); deviceInfo.keepAliveInterval = null; } } sendKeepAliveToDevice(deviceId) { const deviceInfo = this.devices.get(deviceId); if (!deviceInfo || !deviceInfo.socket.writable) return; try { deviceInfo.socket.write(`abc${deviceInfo.lastSignal}abc\r\n`); logger.info(`发送信号 'abc${deviceInfo.lastSignal==='K'?'1':'0000'}abc' 给设备 ${deviceId}`); } catch (err) { logger.error(`发送信号失败 ${deviceId}:`, err.message); this.removeDevice(deviceId); } } async handleData(deviceId, message) { const deviceInfo = this.devices.get(deviceId); if (!deviceInfo) return; deviceInfo.lastAck = Date.now(); try { const ipAddress = deviceId const masData = toModel(message, ipAddress); deviceInfo.iotDeviceNo = masData.n; deviceInfo.masData = masData; // ✅【新增】缓存数据到内存(按设备序号) dataCache.setDeviceData(masData.n, masData); // ✅【核心改动】收到数据立即发 MQTT(不管阿里云) if (mqttConfig.enabled) { const topic = `${mqttConfig.defaultTopicPrefix}/${masData.n}`; const payload = JSON.stringify({ ...masData, deviceId: deviceId, timestamp: new Date().toISOString() }); publishMessage(topic, payload); logger.info(`📡 已通过 MQTT 发送数据到 ${topic}`); } // 【可选】仅当阿里云启用时才注册并上报 if (aliyunConfig.enabled) { if (deviceInfo.status !== 'valid') { logger.info(`${deviceId} 阿里云设备状态变为有效`); deviceInfo.status = 'valid'; this.stopRetryMechanism(deviceId); logger.info(`${deviceId} 停止重试机制`); this.startKeepAlive(deviceId, deviceInfo.lastSignal); logger.info(`${deviceId} 启动保持连接机制`); await this.registerAliyunDevice(deviceId); // 改名避免混淆 } else { this.postPropsToAliyun(deviceId); } } else { // 阿里云未启用,但已通过 MQTT 发送,无需其他操作 if (deviceInfo.status !== 'valid') { deviceInfo.status = 'valid'; this.stopRetryMechanism(deviceId); this.startKeepAlive(deviceId, deviceInfo.lastSignal); logger.info(`${deviceId} 已完成握手(阿里云未启用)`); } } } catch (err) { logger.error(`${deviceId} 处理消息出错:`, err.message); } } async registerAliyunDevice(deviceId) { const deviceInfo = this.devices.get(deviceId); if (!deviceInfo || deviceInfo.iotDevice) return; try { logger.info(`${deviceId} 请求三元组,设备号: ${deviceInfo.iotDeviceNo}`); const { data } = await getDeviceSYZ(deviceInfo.iotDeviceNo); logger.info(`${deviceId} 三元组返回: ${JSON.stringify(data)}`); const model = data.data if (model?.productKey && model?.deviceName && model?.deviceSecret) { deviceInfo.iotDevice = aliyunIot.device({ ProductKey: model.productKey, DeviceName: model.deviceName, DeviceSecret: model.deviceSecret }); deviceInfo.iotDevice.on('connect', () => { logger.info(`${deviceId} 连接阿里云 IoT 成功`); }); deviceInfo.iotDevice.on('error', (err) => { logger.error(`${deviceId} 阿里云 IoT 错误:`, err.message); }); } } catch (err) { logger.error(`${deviceId} 阿里云注册失败:`, err.message); } } postPropsToAliyun(deviceId) { const deviceInfo = this.devices.get(deviceId); if (!deviceInfo?.iotDevice || !deviceInfo.masData) return; logger.info(`${deviceId} 阿里云上报属性: ${JSON.stringify(deviceInfo.masData)}`); const props = deviceInfo.masData; deviceInfo.iotDevice.postProps(props, (res) => { if (res?.message === 'success') { logger.info(`${deviceId} 阿里云上报属性成功`); } else { logger.error(`${deviceId} 阿里云上报属性失败:`, res?.message || 'unknown'); } }); } removeDevice(deviceId) { const deviceInfo = this.devices.get(deviceId); if (!deviceInfo) return; this.stopRetryMechanism(deviceId); this.stopKeepAlive(deviceId); if (deviceInfo.socket && !deviceInfo.socket.destroyed) { deviceInfo.socket.destroy(); } if (deviceInfo.iotDevice) { deviceInfo.iotDevice.end(); // 安全关闭 IoT 连接 } this.devices.delete(deviceId); logger.info(`设备已移除: ${deviceId}`); } } // ========== 辅助函数 ========== async function getDeviceSYZ(deviceNo) { try { const { data, message } = await getAliyunDeviceSecret('device/info/getAliyunDeviceSecret', deviceNo); return { data }; } catch (err) { logger.error(`${deviceNo} 获取三元组失败:`, err.message); throw new CustomError("获取三元组失败", err); } } // ========== 启动服务器 ========== const manager = new DeviceManager(); // ✅ 单例! manager.on('validResponse', (deviceId) => { logger.info(`设备 ${deviceId} 成功完成握手`); }); manager.on('invalidResponse', (deviceId) => { logger.info(`设备 ${deviceId} 无效响应`); }); const server = net.createServer((socket) => { manager.handleDevice(socket); }); const PORT = homeConfig.socketPort || 10961; server.listen(PORT, () => { logger.info(`Socket 服务已启动,监听超级端口: ${PORT}`); }); // ========== 启动 HTTP 服务 ========== const HTTP_PORT = process.env.HTTP_PORT || httpConfig.port || 8080; const httpServer = new HttpServer(HTTP_PORT, httpConfig); httpServer.start();