const logger = require("./logger"); class DeviceManager { constructor({ idleTimeoutMs, onFrameParsed, maxBufferBytes, maxFramesPerChunk }) { this.devices = new Map(); // key(ip:port) -> device this.idleTimeoutMs = idleTimeoutMs || 2 * 60 * 1000; this.onFrameParsed = onFrameParsed; this.maxBufferBytes = maxBufferBytes || 1024 * 1024; this.maxFramesPerChunk = maxFramesPerChunk || 50; // 启动全局超时清理 const intervalMs = 5 * 60 * 1000; setInterval(() => this.cleanupIdleDevices(), intervalMs).unref(); } _keyFromSocket(socket) { return `${socket.remoteAddress}:${socket.remotePort}`; } addConnection(socket) { const key = this._keyFromSocket(socket); const connectedAt = Date.now(); const device = { key, socket, status: "pending", connectedAt, lastAck: connectedAt, buffer: Buffer.alloc(0), totalBytes: 0, totalFrames: 0 }; this.devices.set(key, device); logger.info("Device connected", { key, connectedAt: new Date(connectedAt).toISOString() }); return device; } removeConnection(socket) { const key = this._keyFromSocket(socket); const device = this.devices.get(key); if (device) { this.devices.delete(key); logger.info("Device disconnected", { key, deviceNumber: device.deviceNumber, connectedAt: device.connectedAt ? new Date(device.connectedAt).toISOString() : undefined, disconnectedAt: new Date().toISOString(), onlineMs: device.connectedAt ? Date.now() - device.connectedAt : undefined, totalBytes: device.totalBytes || 0, totalFrames: device.totalFrames || 0 }); } } handleData(socket, data, protocolParser) { const key = this._keyFromSocket(socket); let device = this.devices.get(key); if (!device) { device = this.addConnection(socket); } device.lastAck = Date.now(); device.status = "valid"; device.totalBytes = (device.totalBytes || 0) + data.length; // 累加 buffer device.buffer = Buffer.concat([device.buffer, data]); // 防御:异常数据流导致 buffer 无上限增长时,保留尾部并告警 if (device.buffer.length > this.maxBufferBytes) { const keepBytes = Math.floor(this.maxBufferBytes / 2); device.buffer = device.buffer.slice(device.buffer.length - keepBytes); logger.warn("Device buffer exceeded limit, truncated", { key, maxBufferBytes: this.maxBufferBytes, keepBytes }); } // 按协议拆包 let frames = []; try { const extracted = protocolParser.extractFrames(device.buffer); frames = extracted.frames || []; device.buffer = extracted.remaining || Buffer.alloc(0); } catch (err) { logger.error("extractFrames failed", { key, error: err.message || err }); // 拆包失败时清空本连接 buffer,避免脏数据反复触发异常 device.buffer = Buffer.alloc(0); return; } if (frames.length > this.maxFramesPerChunk) { logger.warn("Too many frames in one chunk, capping parse count", { key, frames: frames.length, cap: this.maxFramesPerChunk }); frames = frames.slice(0, this.maxFramesPerChunk); } for (const frame of frames) { try { const { deviceNumber, data: parsed } = protocolParser.parseFrame( frame, socket.remoteAddress ); const previousDeviceNumber = device.deviceNumber; device.status = "registered"; device.deviceNumber = deviceNumber; device.lastFrame = parsed; device.totalFrames = (device.totalFrames || 0) + 1; if (!previousDeviceNumber || previousDeviceNumber !== deviceNumber) { logger.info("Device identified", { key, deviceNumber }); } if (typeof this.onFrameParsed === "function") { this.onFrameParsed({ device, deviceNumber, data: parsed }); } } catch (err) { logger.error("Failed to parse frame", { key, error: err.message || err }); } } } cleanupIdleDevices() { const now = Date.now(); for (const [key, device] of this.devices.entries()) { if (now - device.lastAck > this.idleTimeoutMs) { logger.warn("Device idle timeout, closing", { key }); try { device.socket.destroy(); } catch (e) { // ignore } this.devices.delete(key); } } } getStats() { const list = []; for (const [key, device] of this.devices.entries()) { list.push({ key, status: device.status, lastAck: device.lastAck, deviceNumber: device.deviceNumber }); } return list; } } module.exports = DeviceManager;