From 7885cede659f3255be56f77c1eef2ada7387d6f1 Mon Sep 17 00:00:00 2001
From: chenyc <501753378@qq.com>
Date: 星期日, 22 三月 2026 16:23:21 +0800
Subject: [PATCH] 初始化项目
---
src/deviceManager.js | 160 +++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 160 insertions(+), 0 deletions(-)
diff --git a/src/deviceManager.js b/src/deviceManager.js
new file mode 100644
index 0000000..0e6a52d
--- /dev/null
+++ b/src/deviceManager.js
@@ -0,0 +1,160 @@
+const logger = require("./logger");
+
+class DeviceManager {
+ constructor({ idleTimeoutMs, onFrameParsed, maxBufferBytes, maxFramesPerChunk }) {
+ this.devices = new Map(); // key(ip:port) -> device
+ this.idleTimeoutMs = idleTimeoutMs || 2 * 60 * 1000;
+ this.onFrameParsed = onFrameParsed;
+ this.maxBufferBytes = maxBufferBytes || 1024 * 1024;
+ this.maxFramesPerChunk = maxFramesPerChunk || 50;
+
+ // 启动全局超时清理
+ const intervalMs = 5 * 60 * 1000;
+ setInterval(() => this.cleanupIdleDevices(), intervalMs).unref();
+ }
+
+ _keyFromSocket(socket) {
+ return `${socket.remoteAddress}:${socket.remotePort}`;
+ }
+
+ addConnection(socket) {
+ const key = this._keyFromSocket(socket);
+ const connectedAt = Date.now();
+ const device = {
+ key,
+ socket,
+ status: "pending",
+ connectedAt,
+ lastAck: connectedAt,
+ buffer: Buffer.alloc(0),
+ totalBytes: 0,
+ totalFrames: 0
+ };
+ this.devices.set(key, device);
+ logger.info("Device connected", {
+ key,
+ connectedAt: new Date(connectedAt).toISOString()
+ });
+ return device;
+ }
+
+ removeConnection(socket) {
+ const key = this._keyFromSocket(socket);
+ const device = this.devices.get(key);
+ if (device) {
+ this.devices.delete(key);
+ logger.info("Device disconnected", {
+ key,
+ deviceNumber: device.deviceNumber,
+ connectedAt: device.connectedAt
+ ? new Date(device.connectedAt).toISOString()
+ : undefined,
+ disconnectedAt: new Date().toISOString(),
+ onlineMs: device.connectedAt ? Date.now() - device.connectedAt : undefined,
+ totalBytes: device.totalBytes || 0,
+ totalFrames: device.totalFrames || 0
+ });
+ }
+ }
+
+ handleData(socket, data, protocolParser) {
+ const key = this._keyFromSocket(socket);
+ let device = this.devices.get(key);
+ if (!device) {
+ device = this.addConnection(socket);
+ }
+
+ device.lastAck = Date.now();
+ device.status = "valid";
+ device.totalBytes = (device.totalBytes || 0) + data.length;
+ // 累加 buffer
+ device.buffer = Buffer.concat([device.buffer, data]);
+
+ // 防御:异常数据流导致 buffer 无上限增长时,保留尾部并告警
+ if (device.buffer.length > this.maxBufferBytes) {
+ const keepBytes = Math.floor(this.maxBufferBytes / 2);
+ device.buffer = device.buffer.slice(device.buffer.length - keepBytes);
+ logger.warn("Device buffer exceeded limit, truncated", {
+ key,
+ maxBufferBytes: this.maxBufferBytes,
+ keepBytes
+ });
+ }
+
+ // 按协议拆包
+ let frames = [];
+ try {
+ const extracted = protocolParser.extractFrames(device.buffer);
+ frames = extracted.frames || [];
+ device.buffer = extracted.remaining || Buffer.alloc(0);
+ } catch (err) {
+ logger.error("extractFrames failed", { key, error: err.message || err });
+ // 拆包失败时清空本连接 buffer,避免脏数据反复触发异常
+ device.buffer = Buffer.alloc(0);
+ return;
+ }
+
+ if (frames.length > this.maxFramesPerChunk) {
+ logger.warn("Too many frames in one chunk, capping parse count", {
+ key,
+ frames: frames.length,
+ cap: this.maxFramesPerChunk
+ });
+ frames = frames.slice(0, this.maxFramesPerChunk);
+ }
+
+ for (const frame of frames) {
+ try {
+ const { deviceNumber, data: parsed } = protocolParser.parseFrame(
+ frame,
+ socket.remoteAddress
+ );
+ const previousDeviceNumber = device.deviceNumber;
+ device.status = "registered";
+ device.deviceNumber = deviceNumber;
+ device.lastFrame = parsed;
+ device.totalFrames = (device.totalFrames || 0) + 1;
+
+ if (!previousDeviceNumber || previousDeviceNumber !== deviceNumber) {
+ logger.info("Device identified", { key, deviceNumber });
+ }
+
+ if (typeof this.onFrameParsed === "function") {
+ this.onFrameParsed({ device, deviceNumber, data: parsed });
+ }
+ } catch (err) {
+ logger.error("Failed to parse frame", { key, error: err.message || err });
+ }
+ }
+ }
+
+ cleanupIdleDevices() {
+ const now = Date.now();
+ for (const [key, device] of this.devices.entries()) {
+ if (now - device.lastAck > this.idleTimeoutMs) {
+ logger.warn("Device idle timeout, closing", { key });
+ try {
+ device.socket.destroy();
+ } catch (e) {
+ // ignore
+ }
+ this.devices.delete(key);
+ }
+ }
+ }
+
+ getStats() {
+ const list = [];
+ for (const [key, device] of this.devices.entries()) {
+ list.push({
+ key,
+ status: device.status,
+ lastAck: device.lastAck,
+ deviceNumber: device.deviceNumber
+ });
+ }
+ return list;
+ }
+}
+
+module.exports = DeviceManager;
--
Gitblit v1.8.0