const net = require('net'); const { Jh2028Decoder, bytesToHex } = require('./decoder'); function normalizeIp(address) { if (!address) { return ''; } if (address.startsWith('::ffff:')) { return address.slice(7); } return address; } function formatReceivedTimestamp(date = new Date()) { const year = date.getFullYear(); const month = String(date.getMonth() + 1).padStart(2, '0'); const day = String(date.getDate()).padStart(2, '0'); const hour = String(date.getHours()).padStart(2, '0'); const minute = String(date.getMinutes()).padStart(2, '0'); const second = String(date.getSeconds()).padStart(2, '0'); return `${year}-${month}-${day} ${hour}:${minute}:${second}`; } class TcpService { constructor(options = {}) { this.tcpConfig = options.tcpConfig || {}; this.devices = options.devices || []; this.alModelPath = options.alModelPath; this.logger = options.logger || console; this.logRawHex = Boolean(options.logRawHex); this.onMetric = typeof options.onMetric === 'function' ? options.onMetric : async () => {}; this.onConnectionChange = typeof options.onConnectionChange === 'function' ? options.onConnectionChange : () => {}; this.server = null; this.sessions = new Map(); this.socketsByDeviceId = new Map(); this.deviceMap = this.buildDeviceMap(this.devices); } buildDeviceMap(devices) { const map = new Map(); for (const device of devices) { if (!device || !device.ip || !device.deviceId) { continue; } map.set(normalizeIp(device.ip), { ...device, name: device.name || device.deviceId, }); } return map; } async start() { this.server = net.createServer((socket) => { this.handleConnection(socket); }); this.server.maxConnections = this.tcpConfig.maxConnections || 100; this.server.on('error', (error) => { this.logger.error(`[TCP] 服务异常: ${error.message}`); }); await new Promise((resolve, reject) => { this.server.once('listening', resolve); this.server.once('error', reject); this.server.listen({ host: this.tcpConfig.host, port: this.tcpConfig.port, backlog: this.tcpConfig.backlog || 128, }); }); this.logger.info(`[TCP] 已开始监听 ${this.tcpConfig.host}:${this.tcpConfig.port} 设备数量=${this.deviceMap.size}`); } handleConnection(socket) { const clientIp = normalizeIp(socket.remoteAddress); const device = this.deviceMap.get(clientIp); if (!device) { this.logger.warn(`[TCP] 未配置设备接入 IP=${clientIp} 端口=${socket.remotePort},连接将关闭`); socket.destroy(); return; } const oldSocket = this.socketsByDeviceId.get(device.deviceId); if (oldSocket && oldSocket !== socket) { this.logger.warn(`[TCP] 同设备新连接接入,将关闭旧连接 设备=${device.deviceId} IP=${clientIp}`); oldSocket.destroy(); } const decoder = new Jh2028Decoder({ alModelPath: this.alModelPath, maxBufferBytes: this.tcpConfig.maxBufferBytes || 8192, }); this.sessions.set(socket, { clientIp, device, decoder, connectedAt: Date.now(), lastDataAt: 0, remotePort: socket.remotePort, }); this.socketsByDeviceId.set(device.deviceId, socket); if (this.tcpConfig.keepAlive) { socket.setKeepAlive(true, this.tcpConfig.keepAliveDelayMs || 10000); } socket.setNoDelay(Boolean(this.tcpConfig.noDelay)); socket.setTimeout(this.tcpConfig.socketTimeoutMs || 120000); this.logger.info(`[TCP] 设备已连接 设备=${device.deviceId} IP=${clientIp} 端口=${socket.remotePort}`); this.notifyConnectionChange(device); socket.on('data', (chunk) => { this.handleData(socket, chunk); }); socket.on('timeout', () => { this.logger.warn(`[TCP] 连接超时 设备=${device.deviceId} IP=${clientIp}`); socket.destroy(); }); socket.on('error', (error) => { this.logger.error(`[TCP] 连接异常 设备=${device.deviceId} IP=${clientIp}: ${error.message}`); }); socket.on('close', () => { this.sessions.delete(socket); if (this.socketsByDeviceId.get(device.deviceId) === socket) { this.socketsByDeviceId.delete(device.deviceId); } this.logger.info(`[TCP] 设备已断开 设备=${device.deviceId} IP=${clientIp}`); this.notifyConnectionChange(device); }); } handleData(socket, chunk) { const session = this.sessions.get(socket); if (!session) { return; } this.logger.info(`[TCP] 收到原始数据 设备=${session.device.deviceId} 字节数=${chunk.length}`); session.lastDataAt = Date.now(); if (this.logRawHex) { this.logger.debug(`[TCP] 原始报文 设备=${session.device.deviceId} ${bytesToHex(chunk)}`); } const results = session.decoder.push(chunk); if (results.length === 0) { this.logger.warn(`[TCP] 当前数据片段尚未组成完整报文 设备=${session.device.deviceId} 字节数=${chunk.length}`); return; } for (const result of results) { if (!result.publish) { const level = result.ok ? 'warn' : 'warn'; this.logger[level](`[TCP] 跳过未发布报文 设备=${session.device.deviceId} 原因=${result.reason} 原始报文=${result.rawHex || ''}`); continue; } const metric = { ...result.metric }; if (result.messageType === 'blood-pressure') { metric.M = formatReceivedTimestamp(); } this.logger.info(`[TCP] 解析到指标 设备=${session.device.deviceId} 类型=${result.messageType} 指标=${JSON.stringify(metric)}`); Promise.resolve(this.onMetric(session.device, metric, result)).catch((error) => { this.logger.error(`[TCP] 指标处理失败 设备=${session.device.deviceId}: ${error.message}`); }); } } async stop() { this.logger.info(`[TCP] 正在停止 TCP 服务 当前连接数=${this.sessions.size}`); for (const socket of this.sessions.keys()) { socket.destroy(); } this.sessions.clear(); this.socketsByDeviceId.clear(); if (!this.server) { return; } await new Promise((resolve) => { this.server.close(resolve); }); this.server = null; this.logger.info('[TCP] TCP 服务已停止'); } notifyConnectionChange(device) { Promise.resolve(this.onConnectionChange(device, this.getConnectionSnapshot())).catch((error) => { this.logger.error(`[TCP] 连接状态通知失败 设备=${device.deviceId}: ${error.message}`); }); } getConnectionSnapshot() { return Array.from(this.sessions.values()).map((session) => ({ deviceId: session.device.deviceId, ip: session.clientIp, name: session.device.name || session.device.deviceId, remotePort: session.remotePort, connectedAt: session.connectedAt, lastDataAt: session.lastDataAt, online: true, })); } } module.exports = { TcpService, formatReceivedTimestamp, normalizeIp, };