"use strict";
|
|
const net = require("net");
|
const protocol = require("./protocol");
|
|
class DeviceConnection {
|
constructor({ ip, port, serialNumber, config, dataCache, logger, uploadManager }) {
|
this.ip = ip;
|
this.port = port;
|
this.serialNumber = serialNumber;
|
this.config = config;
|
this.dataCache = dataCache;
|
this.logger = logger;
|
this.uploadManager = uploadManager;
|
|
this.socket = null;
|
this.buffer = Buffer.alloc(0);
|
this._stopped = false;
|
this._connectTimer = null;
|
this._pollTimer = null;
|
this._reconnectTimer = null;
|
this._reconnectAttempts = 0;
|
}
|
|
start() {
|
this._stopped = false;
|
this._updateCache({ status: "pending", errorMessage: null, reconnectCount: 0 });
|
this.connect();
|
}
|
|
stop() {
|
this._stopped = true;
|
this._clearAll();
|
if (this.socket) {
|
this.socket.destroy();
|
this.socket = null;
|
}
|
this._updateCache({
|
status: "disconnected",
|
lastPollAt: null, lastDataAt: null,
|
rawFrame: null, parsed: null, errorMessage: null,
|
});
|
}
|
|
// ── 连接 ──
|
|
connect() {
|
if (this._stopped) return;
|
this.buffer = Buffer.alloc(0);
|
this._reconnectAttempts++;
|
|
this._updateCache({ status: "connecting", reconnectCount: this._reconnectAttempts - 1 });
|
this.logger.connecting(this.ip);
|
|
this.socket = net.createConnection({
|
host: this.ip,
|
port: this.port,
|
});
|
|
this.socket.setNoDelay(true);
|
this.socket.setKeepAlive(true, 30000);
|
|
// 连接建立超时(手动计时,不影响后续长连接)
|
const timeout = this.config.connectTimeoutMs || 5000;
|
this._connectTimer = setTimeout(() => {
|
if (this.socket && !this.socket.destroyed && !this._hasConnected) {
|
this.logger.error(this.ip, "TCP 握手超时");
|
this.socket.destroy();
|
this.socket = null;
|
this._updateCache({ status: "disconnected", errorMessage: "连接握手超时" });
|
this._scheduleReconnect();
|
}
|
}, timeout);
|
|
this.socket.on("connect", () => this._onConnect());
|
this.socket.on("data", (chunk) => this._onData(chunk));
|
this.socket.on("close", (hadErr) => this._onClose(hadErr));
|
this.socket.on("error", (err) => this._onError(err));
|
}
|
|
_onConnect() {
|
if (this._connectTimer) {
|
clearTimeout(this._connectTimer);
|
this._connectTimer = null;
|
}
|
this._reconnectAttempts = 0;
|
this._hasConnected = true;
|
|
this._updateCache({ status: "connected", reconnectCount: 0, errorMessage: null });
|
this.logger.connected(this.ip, this.port);
|
|
this._sendK();
|
this._startPolling();
|
}
|
|
// ── 数据接收 ──
|
|
_onData(chunk) {
|
// 缓冲区上限保护:超过 64KB 强制重置,防止畸形数据泄漏内存
|
if (this.buffer.length + chunk.length > 65536) {
|
this.logger.warn(this.ip, `缓冲区超限 (${this.buffer.length + chunk.length}B),强制清理`);
|
this.buffer = Buffer.alloc(0);
|
}
|
|
this.buffer = Buffer.concat([this.buffer, chunk]);
|
|
const extracted = protocol.extractFrames(this.buffer);
|
this.buffer = extracted.remaining;
|
|
if (extracted.frames.length === 0) {
|
return;
|
}
|
|
for (const frame of extracted.frames) {
|
let parsed;
|
try {
|
parsed = protocol.parseFrameAuto(frame);
|
} catch (err) {
|
this.logger.error(this.ip, "报文解析失败: " + err.message);
|
continue;
|
}
|
|
if (!parsed || !parsed.items) {
|
this.logger.warn(this.ip, "无法识别报文: " + frame.toString("ascii").slice(0, 40));
|
continue;
|
}
|
|
const now = new Date().toISOString();
|
const fieldCount = parsed.items.length;
|
|
this.logger.recvK(this.ip, frame.toString("ascii"), fieldCount, parsed.statusCode);
|
|
const parsedFields = parsed.items.map((item) => ({
|
id: item.id,
|
name: item.name,
|
value: item.value,
|
displayValue: item.displayValue,
|
unit: item.unit,
|
}));
|
|
this._updateCache({
|
status: "connected",
|
lastPollAt: now,
|
lastDataAt: now,
|
rawFrame: frame.toString("ascii"),
|
statusCode: parsed.statusCode,
|
fieldCount,
|
parsed: parsedFields,
|
});
|
|
// 调用上传模块(MQTT + 阿里云)
|
if (this.uploadManager) {
|
this.uploadManager.upload({
|
serialNumber: this.serialNumber,
|
ip: this.ip,
|
items: parsed.items,
|
}).then(() => {
|
this._updateCache({ lastUploadAt: new Date().toISOString() });
|
}).catch((err) => {
|
this.logger.error(this.ip, "上传异常: " + err.message);
|
});
|
}
|
}
|
}
|
|
// ── 断开处理 ──
|
|
_onClose(hadErr) {
|
this._clearTimers();
|
if (this._connectTimer) { clearTimeout(this._connectTimer); this._connectTimer = null; }
|
const wasConnected = this._hasConnected;
|
this._hasConnected = false;
|
this.socket = null;
|
|
if (this._stopped) return;
|
|
const reason = hadErr ? "异常断开" : "连接关闭";
|
if (!wasConnected) {
|
// 从未连接成功(握手超时或被拒),使用更轻的日志级别
|
this.logger.warn(this.ip, `TCP 连接失败 (重试#${this._reconnectAttempts})`);
|
this._updateCache({ status: "disconnected", errorMessage: "连接失败" });
|
} else {
|
this.logger.disconnected(this.ip, reason);
|
this._updateCache({ status: "disconnected", errorMessage: reason });
|
}
|
this._scheduleReconnect();
|
}
|
|
_onError(err) {
|
this.logger.error(this.ip, `TCP 错误: ${err.message}`, { code: err.code });
|
// 出错时销毁 socket 确保状态一致,后续 close 事件触发重连
|
if (this.socket && !this.socket.destroyed) {
|
this.socket.destroy();
|
}
|
}
|
|
// ── K 请求 ──
|
|
_sendK() {
|
if (!this.socket || this.socket.destroyed) return;
|
const kMsg = Buffer.from("K\r\n", "ascii");
|
this.socket.write(kMsg);
|
this.logger.sendK(this.ip);
|
this._updateCache({ lastPollAt: new Date().toISOString() });
|
}
|
|
// ── 定时器 ──
|
|
_startPolling() {
|
this._clearPollTimer();
|
const interval = this.config.pollIntervalMs || 10000;
|
this._pollTimer = setInterval(() => {
|
this._sendK();
|
}, interval);
|
this._pollTimer.unref();
|
}
|
|
_scheduleReconnect() {
|
if (this._stopped) return;
|
|
const base = this.config.reconnectBaseMs || 3000;
|
const max = this.config.reconnectMaxMs || 60000;
|
const delay = Math.min(base * Math.pow(2, Math.min(this._reconnectAttempts - 1, 10)), max);
|
|
this.logger.reconnecting(this.ip, delay / 1000);
|
|
this._reconnectTimer = setTimeout(() => {
|
this.connect();
|
}, delay);
|
this._reconnectTimer.unref();
|
}
|
|
_clearAll() {
|
this._clearTimers();
|
if (this._connectTimer) { clearTimeout(this._connectTimer); this._connectTimer = null; }
|
this._reconnectAttempts = 0;
|
}
|
|
_clearTimers() {
|
this._clearPollTimer();
|
this._clearReconnectTimer();
|
}
|
|
_clearPollTimer() {
|
if (this._pollTimer) { clearInterval(this._pollTimer); this._pollTimer = null; }
|
}
|
|
_clearReconnectTimer() {
|
if (this._reconnectTimer) { clearTimeout(this._reconnectTimer); this._reconnectTimer = null; }
|
}
|
|
_updateCache(data) {
|
this.dataCache.update(this.ip, { serialNumber: this.serialNumber, ...data });
|
}
|
}
|
|
module.exports = { DeviceConnection };
|