From 7885cede659f3255be56f77c1eef2ada7387d6f1 Mon Sep 17 00:00:00 2001
From: chenyc <501753378@qq.com>
Date: 星期日, 22 三月 2026 16:23:21 +0800
Subject: [PATCH] 初始化项目

---
 src/deviceManager.js |  160 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 160 insertions(+), 0 deletions(-)

diff --git a/src/deviceManager.js b/src/deviceManager.js
new file mode 100644
index 0000000..0e6a52d
--- /dev/null
+++ b/src/deviceManager.js
@@ -0,0 +1,160 @@
+const logger = require("./logger");
+
+class DeviceManager {
+  constructor({ idleTimeoutMs, onFrameParsed, maxBufferBytes, maxFramesPerChunk }) {
+    this.devices = new Map(); // key(ip:port) -> device
+    this.idleTimeoutMs = idleTimeoutMs || 2 * 60 * 1000;
+    this.onFrameParsed = onFrameParsed;
+    this.maxBufferBytes = maxBufferBytes || 1024 * 1024;
+    this.maxFramesPerChunk = maxFramesPerChunk || 50;
+
+    // 启动全局超时清理
+    const intervalMs = 5 * 60 * 1000;
+    setInterval(() => this.cleanupIdleDevices(), intervalMs).unref();
+  }
+
+  _keyFromSocket(socket) {
+    return `${socket.remoteAddress}:${socket.remotePort}`;
+  }
+
+  addConnection(socket) {
+    const key = this._keyFromSocket(socket);
+    const connectedAt = Date.now();
+    const device = {
+      key,
+      socket,
+      status: "pending",
+      connectedAt,
+      lastAck: connectedAt,
+      buffer: Buffer.alloc(0),
+      totalBytes: 0,
+      totalFrames: 0
+    };
+    this.devices.set(key, device);
+    logger.info("Device connected", {
+      key,
+      connectedAt: new Date(connectedAt).toISOString()
+    });
+    return device;
+  }
+
+  removeConnection(socket) {
+    const key = this._keyFromSocket(socket);
+    const device = this.devices.get(key);
+    if (device) {
+      this.devices.delete(key);
+      logger.info("Device disconnected", {
+        key,
+        deviceNumber: device.deviceNumber,
+        connectedAt: device.connectedAt
+          ? new Date(device.connectedAt).toISOString()
+          : undefined,
+        disconnectedAt: new Date().toISOString(),
+        onlineMs: device.connectedAt ? Date.now() - device.connectedAt : undefined,
+        totalBytes: device.totalBytes || 0,
+        totalFrames: device.totalFrames || 0
+      });
+    }
+  }
+
+  handleData(socket, data, protocolParser) {
+    const key = this._keyFromSocket(socket);
+    let device = this.devices.get(key);
+    if (!device) {
+      device = this.addConnection(socket);
+    }
+
+    device.lastAck = Date.now();
+    device.status = "valid";
+    device.totalBytes = (device.totalBytes || 0) + data.length;
+    // 累加 buffer
+    device.buffer = Buffer.concat([device.buffer, data]);
+
+    // 防御:异常数据流导致 buffer 无上限增长时,保留尾部并告警
+    if (device.buffer.length > this.maxBufferBytes) {
+      const keepBytes = Math.floor(this.maxBufferBytes / 2);
+      device.buffer = device.buffer.slice(device.buffer.length - keepBytes);
+      logger.warn("Device buffer exceeded limit, truncated", {
+        key,
+        maxBufferBytes: this.maxBufferBytes,
+        keepBytes
+      });
+    }
+
+    // 按协议拆包
+    let frames = [];
+    try {
+      const extracted = protocolParser.extractFrames(device.buffer);
+      frames = extracted.frames || [];
+      device.buffer = extracted.remaining || Buffer.alloc(0);
+    } catch (err) {
+      logger.error("extractFrames failed", { key, error: err.message || err });
+      // 拆包失败时清空本连接 buffer,避免脏数据反复触发异常
+      device.buffer = Buffer.alloc(0);
+      return;
+    }
+
+    if (frames.length > this.maxFramesPerChunk) {
+      logger.warn("Too many frames in one chunk, capping parse count", {
+        key,
+        frames: frames.length,
+        cap: this.maxFramesPerChunk
+      });
+      frames = frames.slice(0, this.maxFramesPerChunk);
+    }
+
+    for (const frame of frames) {
+      try {
+        const { deviceNumber, data: parsed } = protocolParser.parseFrame(
+          frame,
+          socket.remoteAddress
+        );
+        const previousDeviceNumber = device.deviceNumber;
+        device.status = "registered";
+        device.deviceNumber = deviceNumber;
+        device.lastFrame = parsed;
+        device.totalFrames = (device.totalFrames || 0) + 1;
+
+        if (!previousDeviceNumber || previousDeviceNumber !== deviceNumber) {
+          logger.info("Device identified", { key, deviceNumber });
+        }
+
+        if (typeof this.onFrameParsed === "function") {
+          this.onFrameParsed({ device, deviceNumber, data: parsed });
+        }
+      } catch (err) {
+        logger.error("Failed to parse frame", { key, error: err.message || err });
+      }
+    }
+  }
+
+  cleanupIdleDevices() {
+    const now = Date.now();
+    for (const [key, device] of this.devices.entries()) {
+      if (now - device.lastAck > this.idleTimeoutMs) {
+        logger.warn("Device idle timeout, closing", { key });
+        try {
+          device.socket.destroy();
+        } catch (e) {
+          // ignore
+        }
+        this.devices.delete(key);
+      }
+    }
+  }
+
+  getStats() {
+    const list = [];
+    for (const [key, device] of this.devices.entries()) {
+      list.push({
+        key,
+        status: device.status,
+        lastAck: device.lastAck,
+        deviceNumber: device.deviceNumber
+      });
+    }
+    return list;
+  }
+}
+
+module.exports = DeviceManager;

--
Gitblit v1.8.0