const logger = require("./logger");
|
|
class DeviceManager {
|
constructor({ idleTimeoutMs, onFrameParsed, maxBufferBytes, maxFramesPerChunk }) {
|
this.devices = new Map(); // key(ip:port) -> device
|
this.idleTimeoutMs = idleTimeoutMs || 2 * 60 * 1000;
|
this.onFrameParsed = onFrameParsed;
|
this.maxBufferBytes = maxBufferBytes || 1024 * 1024;
|
this.maxFramesPerChunk = maxFramesPerChunk || 50;
|
|
// 启动全局超时清理
|
const intervalMs = 5 * 60 * 1000;
|
setInterval(() => this.cleanupIdleDevices(), intervalMs).unref();
|
}
|
|
_keyFromSocket(socket) {
|
return `${socket.remoteAddress}:${socket.remotePort}`;
|
}
|
|
addConnection(socket) {
|
const key = this._keyFromSocket(socket);
|
const connectedAt = Date.now();
|
const device = {
|
key,
|
socket,
|
status: "pending",
|
connectedAt,
|
lastAck: connectedAt,
|
buffer: Buffer.alloc(0),
|
totalBytes: 0,
|
totalFrames: 0
|
};
|
this.devices.set(key, device);
|
logger.info("Device connected", {
|
key,
|
connectedAt: new Date(connectedAt).toISOString()
|
});
|
return device;
|
}
|
|
removeConnection(socket) {
|
const key = this._keyFromSocket(socket);
|
const device = this.devices.get(key);
|
if (device) {
|
this.devices.delete(key);
|
logger.info("Device disconnected", {
|
key,
|
deviceNumber: device.deviceNumber,
|
connectedAt: device.connectedAt
|
? new Date(device.connectedAt).toISOString()
|
: undefined,
|
disconnectedAt: new Date().toISOString(),
|
onlineMs: device.connectedAt ? Date.now() - device.connectedAt : undefined,
|
totalBytes: device.totalBytes || 0,
|
totalFrames: device.totalFrames || 0
|
});
|
}
|
}
|
|
handleData(socket, data, protocolParser) {
|
const key = this._keyFromSocket(socket);
|
let device = this.devices.get(key);
|
if (!device) {
|
device = this.addConnection(socket);
|
}
|
|
device.lastAck = Date.now();
|
device.status = "valid";
|
device.totalBytes = (device.totalBytes || 0) + data.length;
|
// 累加 buffer
|
device.buffer = Buffer.concat([device.buffer, data]);
|
|
// 防御:异常数据流导致 buffer 无上限增长时,保留尾部并告警
|
if (device.buffer.length > this.maxBufferBytes) {
|
const keepBytes = Math.floor(this.maxBufferBytes / 2);
|
device.buffer = device.buffer.slice(device.buffer.length - keepBytes);
|
logger.warn("Device buffer exceeded limit, truncated", {
|
key,
|
maxBufferBytes: this.maxBufferBytes,
|
keepBytes
|
});
|
}
|
|
// 按协议拆包
|
let frames = [];
|
try {
|
const extracted = protocolParser.extractFrames(device.buffer);
|
frames = extracted.frames || [];
|
device.buffer = extracted.remaining || Buffer.alloc(0);
|
} catch (err) {
|
logger.error("extractFrames failed", { key, error: err.message || err });
|
// 拆包失败时清空本连接 buffer,避免脏数据反复触发异常
|
device.buffer = Buffer.alloc(0);
|
return;
|
}
|
|
if (frames.length > this.maxFramesPerChunk) {
|
logger.warn("Too many frames in one chunk, capping parse count", {
|
key,
|
frames: frames.length,
|
cap: this.maxFramesPerChunk
|
});
|
frames = frames.slice(0, this.maxFramesPerChunk);
|
}
|
|
for (const frame of frames) {
|
try {
|
const { deviceNumber, data: parsed } = protocolParser.parseFrame(
|
frame,
|
socket.remoteAddress
|
);
|
const previousDeviceNumber = device.deviceNumber;
|
device.status = "registered";
|
device.deviceNumber = deviceNumber;
|
device.lastFrame = parsed;
|
device.totalFrames = (device.totalFrames || 0) + 1;
|
|
if (!previousDeviceNumber || previousDeviceNumber !== deviceNumber) {
|
logger.info("Device identified", { key, deviceNumber });
|
}
|
|
if (typeof this.onFrameParsed === "function") {
|
this.onFrameParsed({ device, deviceNumber, data: parsed });
|
}
|
} catch (err) {
|
logger.error("Failed to parse frame", { key, error: err.message || err });
|
}
|
}
|
}
|
|
cleanupIdleDevices() {
|
const now = Date.now();
|
for (const [key, device] of this.devices.entries()) {
|
if (now - device.lastAck > this.idleTimeoutMs) {
|
logger.warn("Device idle timeout, closing", { key });
|
try {
|
device.socket.destroy();
|
} catch (e) {
|
// ignore
|
}
|
this.devices.delete(key);
|
}
|
}
|
}
|
|
getStats() {
|
const list = [];
|
for (const [key, device] of this.devices.entries()) {
|
list.push({
|
key,
|
status: device.status,
|
lastAck: device.lastAck,
|
deviceNumber: device.deviceNumber
|
});
|
}
|
return list;
|
}
|
}
|
|
module.exports = DeviceManager;
|