chenyc
2026-03-22 7885cede659f3255be56f77c1eef2ada7387d6f1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
const logger = require("./logger");
 
class DeviceManager {
  constructor({ idleTimeoutMs, onFrameParsed, maxBufferBytes, maxFramesPerChunk }) {
    this.devices = new Map(); // key(ip:port) -> device
    this.idleTimeoutMs = idleTimeoutMs || 2 * 60 * 1000;
    this.onFrameParsed = onFrameParsed;
    this.maxBufferBytes = maxBufferBytes || 1024 * 1024;
    this.maxFramesPerChunk = maxFramesPerChunk || 50;
 
    // 启动全局超时清理
    const intervalMs = 5 * 60 * 1000;
    setInterval(() => this.cleanupIdleDevices(), intervalMs).unref();
  }
 
  _keyFromSocket(socket) {
    return `${socket.remoteAddress}:${socket.remotePort}`;
  }
 
  addConnection(socket) {
    const key = this._keyFromSocket(socket);
    const connectedAt = Date.now();
    const device = {
      key,
      socket,
      status: "pending",
      connectedAt,
      lastAck: connectedAt,
      buffer: Buffer.alloc(0),
      totalBytes: 0,
      totalFrames: 0
    };
    this.devices.set(key, device);
    logger.info("Device connected", {
      key,
      connectedAt: new Date(connectedAt).toISOString()
    });
    return device;
  }
 
  removeConnection(socket) {
    const key = this._keyFromSocket(socket);
    const device = this.devices.get(key);
    if (device) {
      this.devices.delete(key);
      logger.info("Device disconnected", {
        key,
        deviceNumber: device.deviceNumber,
        connectedAt: device.connectedAt
          ? new Date(device.connectedAt).toISOString()
          : undefined,
        disconnectedAt: new Date().toISOString(),
        onlineMs: device.connectedAt ? Date.now() - device.connectedAt : undefined,
        totalBytes: device.totalBytes || 0,
        totalFrames: device.totalFrames || 0
      });
    }
  }
 
  handleData(socket, data, protocolParser) {
    const key = this._keyFromSocket(socket);
    let device = this.devices.get(key);
    if (!device) {
      device = this.addConnection(socket);
    }
 
    device.lastAck = Date.now();
    device.status = "valid";
    device.totalBytes = (device.totalBytes || 0) + data.length;
    // 累加 buffer
    device.buffer = Buffer.concat([device.buffer, data]);
 
    // 防御:异常数据流导致 buffer 无上限增长时,保留尾部并告警
    if (device.buffer.length > this.maxBufferBytes) {
      const keepBytes = Math.floor(this.maxBufferBytes / 2);
      device.buffer = device.buffer.slice(device.buffer.length - keepBytes);
      logger.warn("Device buffer exceeded limit, truncated", {
        key,
        maxBufferBytes: this.maxBufferBytes,
        keepBytes
      });
    }
 
    // 按协议拆包
    let frames = [];
    try {
      const extracted = protocolParser.extractFrames(device.buffer);
      frames = extracted.frames || [];
      device.buffer = extracted.remaining || Buffer.alloc(0);
    } catch (err) {
      logger.error("extractFrames failed", { key, error: err.message || err });
      // 拆包失败时清空本连接 buffer,避免脏数据反复触发异常
      device.buffer = Buffer.alloc(0);
      return;
    }
 
    if (frames.length > this.maxFramesPerChunk) {
      logger.warn("Too many frames in one chunk, capping parse count", {
        key,
        frames: frames.length,
        cap: this.maxFramesPerChunk
      });
      frames = frames.slice(0, this.maxFramesPerChunk);
    }
 
    for (const frame of frames) {
      try {
        const { deviceNumber, data: parsed } = protocolParser.parseFrame(
          frame,
          socket.remoteAddress
        );
        const previousDeviceNumber = device.deviceNumber;
        device.status = "registered";
        device.deviceNumber = deviceNumber;
        device.lastFrame = parsed;
        device.totalFrames = (device.totalFrames || 0) + 1;
 
        if (!previousDeviceNumber || previousDeviceNumber !== deviceNumber) {
          logger.info("Device identified", { key, deviceNumber });
        }
 
        if (typeof this.onFrameParsed === "function") {
          this.onFrameParsed({ device, deviceNumber, data: parsed });
        }
      } catch (err) {
        logger.error("Failed to parse frame", { key, error: err.message || err });
      }
    }
  }
 
  cleanupIdleDevices() {
    const now = Date.now();
    for (const [key, device] of this.devices.entries()) {
      if (now - device.lastAck > this.idleTimeoutMs) {
        logger.warn("Device idle timeout, closing", { key });
        try {
          device.socket.destroy();
        } catch (e) {
          // ignore
        }
        this.devices.delete(key);
      }
    }
  }
 
  getStats() {
    const list = [];
    for (const [key, device] of this.devices.entries()) {
      list.push({
        key,
        status: device.status,
        lastAck: device.lastAck,
        deviceNumber: device.deviceNumber
      });
    }
    return list;
  }
}
 
module.exports = DeviceManager;