"use strict"; const net = require("net"); const protocol = require("./protocol"); class DeviceConnection { constructor({ ip, port, serialNumber, config, dataCache, logger, uploadManager }) { this.ip = ip; this.port = port; this.serialNumber = serialNumber; this.config = config; this.dataCache = dataCache; this.logger = logger; this.uploadManager = uploadManager; this.socket = null; this.buffer = Buffer.alloc(0); this._stopped = false; this._connectTimer = null; this._pollTimer = null; this._reconnectTimer = null; this._reconnectAttempts = 0; } start() { this._stopped = false; this._updateCache({ status: "pending", errorMessage: null, reconnectCount: 0 }); this.connect(); } stop() { this._stopped = true; this._clearAll(); if (this.socket) { this.socket.destroy(); this.socket = null; } this._updateCache({ status: "disconnected", lastPollAt: null, lastDataAt: null, rawFrame: null, parsed: null, errorMessage: null, }); } // ── 连接 ── connect() { if (this._stopped) return; this.buffer = Buffer.alloc(0); this._reconnectAttempts++; this._updateCache({ status: "connecting", reconnectCount: this._reconnectAttempts - 1 }); this.logger.connecting(this.ip); this.socket = net.createConnection({ host: this.ip, port: this.port, }); this.socket.setNoDelay(true); this.socket.setKeepAlive(true, 30000); // 连接建立超时(手动计时,不影响后续长连接) const timeout = this.config.connectTimeoutMs || 5000; this._connectTimer = setTimeout(() => { if (this.socket && !this.socket.destroyed && !this._hasConnected) { this.logger.error(this.ip, "TCP 握手超时"); this.socket.destroy(); this.socket = null; this._updateCache({ status: "disconnected", errorMessage: "连接握手超时" }); this._scheduleReconnect(); } }, timeout); this.socket.on("connect", () => this._onConnect()); this.socket.on("data", (chunk) => this._onData(chunk)); this.socket.on("close", (hadErr) => this._onClose(hadErr)); this.socket.on("error", (err) => this._onError(err)); } _onConnect() { if (this._connectTimer) { clearTimeout(this._connectTimer); this._connectTimer = null; } this._reconnectAttempts = 0; this._hasConnected = true; this._updateCache({ status: "connected", reconnectCount: 0, errorMessage: null }); this.logger.connected(this.ip, this.port); this._sendK(); this._startPolling(); } // ── 数据接收 ── _onData(chunk) { // 缓冲区上限保护:超过 64KB 强制重置,防止畸形数据泄漏内存 if (this.buffer.length + chunk.length > 65536) { this.logger.warn(this.ip, `缓冲区超限 (${this.buffer.length + chunk.length}B),强制清理`); this.buffer = Buffer.alloc(0); } this.buffer = Buffer.concat([this.buffer, chunk]); const extracted = protocol.extractFrames(this.buffer); this.buffer = extracted.remaining; if (extracted.frames.length === 0) { return; } for (const frame of extracted.frames) { let parsed; try { parsed = protocol.parseFrameAuto(frame); } catch (err) { this.logger.error(this.ip, "报文解析失败: " + err.message); continue; } if (!parsed || !parsed.items) { this.logger.warn(this.ip, "无法识别报文: " + frame.toString("ascii").slice(0, 40)); continue; } const now = new Date().toISOString(); const fieldCount = parsed.items.length; this.logger.recvK(this.ip, frame.toString("ascii"), fieldCount, parsed.statusCode); const parsedFields = parsed.items.map((item) => ({ id: item.id, name: item.name, value: item.value, displayValue: item.displayValue, unit: item.unit, })); this._updateCache({ status: "connected", lastPollAt: now, lastDataAt: now, rawFrame: frame.toString("ascii"), statusCode: parsed.statusCode, fieldCount, parsed: parsedFields, }); // 调用上传模块(MQTT + 阿里云) if (this.uploadManager) { this.uploadManager.upload({ serialNumber: this.serialNumber, ip: this.ip, items: parsed.items, }).then(() => { this._updateCache({ lastUploadAt: new Date().toISOString() }); }).catch((err) => { this.logger.error(this.ip, "上传异常: " + err.message); }); } } } // ── 断开处理 ── _onClose(hadErr) { this._clearTimers(); if (this._connectTimer) { clearTimeout(this._connectTimer); this._connectTimer = null; } const wasConnected = this._hasConnected; this._hasConnected = false; this.socket = null; if (this._stopped) return; const reason = hadErr ? "异常断开" : "连接关闭"; if (!wasConnected) { // 从未连接成功(握手超时或被拒),使用更轻的日志级别 this.logger.warn(this.ip, `TCP 连接失败 (重试#${this._reconnectAttempts})`); this._updateCache({ status: "disconnected", errorMessage: "连接失败" }); } else { this.logger.disconnected(this.ip, reason); this._updateCache({ status: "disconnected", errorMessage: reason }); } this._scheduleReconnect(); } _onError(err) { this.logger.error(this.ip, `TCP 错误: ${err.message}`, { code: err.code }); // 出错时销毁 socket 确保状态一致,后续 close 事件触发重连 if (this.socket && !this.socket.destroyed) { this.socket.destroy(); } } // ── K 请求 ── _sendK() { if (!this.socket || this.socket.destroyed) return; const kMsg = Buffer.from("K\r\n", "ascii"); this.socket.write(kMsg); this.logger.sendK(this.ip); this._updateCache({ lastPollAt: new Date().toISOString() }); } // ── 定时器 ── _startPolling() { this._clearPollTimer(); const interval = this.config.pollIntervalMs || 10000; this._pollTimer = setInterval(() => { this._sendK(); }, interval); this._pollTimer.unref(); } _scheduleReconnect() { if (this._stopped) return; const base = this.config.reconnectBaseMs || 3000; const max = this.config.reconnectMaxMs || 60000; const delay = Math.min(base * Math.pow(2, Math.min(this._reconnectAttempts - 1, 10)), max); this.logger.reconnecting(this.ip, delay / 1000); this._reconnectTimer = setTimeout(() => { this.connect(); }, delay); this._reconnectTimer.unref(); } _clearAll() { this._clearTimers(); if (this._connectTimer) { clearTimeout(this._connectTimer); this._connectTimer = null; } this._reconnectAttempts = 0; } _clearTimers() { this._clearPollTimer(); this._clearReconnectTimer(); } _clearPollTimer() { if (this._pollTimer) { clearInterval(this._pollTimer); this._pollTimer = null; } } _clearReconnectTimer() { if (this._reconnectTimer) { clearTimeout(this._reconnectTimer); this._reconnectTimer = null; } } _updateCache(data) { this.dataCache.update(this.ip, { serialNumber: this.serialNumber, ...data }); } } module.exports = { DeviceConnection };