const net = require('net'); const fs = require('fs'); const path = require('path'); const crypto = require('crypto'); const { startHttpApi } = require('./http-api'); const logger = require('./logger'); // 阿里云 IoT 长连接上报工具 const { sendDataToAliyun, closeAllConnections } = require('./aliyun-iot'); // 阿里云配置 const { getAliyunConfig, getMqttConfig } = require('./api'); // MQTT 客户端 const { initMqtt, publishToMqtt, closeMqtt } = require('./mqtt-client'); // ====== TCP 配置(从 tcp-config.json 读取,可打包部署)====== function loadTcpConfig() { const baseDir = (typeof process !== 'undefined' && process.pkg) ? path.dirname(process.execPath) : __dirname; const candidates = [ { path: path.join(baseDir, 'tcp-config.json'), source: 'exedir' }, { path: path.join(__dirname, 'tcp-config.json'), source: 'srcdir' } ]; for (const c of candidates) { try { const raw = fs.readFileSync(c.path, 'utf8'); const obj = JSON.parse(raw); obj.__meta = { source: c.source, path: c.path }; return obj; } catch (_) { /* continue */ } } const def = { host: '0.0.0.0', port: 8234, maxClients: 50 }; def.__meta = { source: 'default', path: null }; return def; } const tcpCfg = loadTcpConfig(); const PORT = tcpCfg.port || 8234; const HOST = tcpCfg.host || '0.0.0.0'; const MAX_CLIENTS = tcpCfg.maxClients || 50; // 指令下发间隔(毫秒),从配置读取并做安全兜底 const GET_DATA_INTERVAL = Math.max(1000, tcpCfg.getDataIntervalMs || 60000); const ACTIVATE_INTERVAL = Math.max(60000, tcpCfg.activateIntervalMs || 600000); // ================================= // ====== 阿里云配置(从 aliyun-config.json 读取) ====== const aliyunCfg = getAliyunConfig(); const ALIYUN_ENABLED = aliyunCfg.enabled !== false; // 默认启用,除非显式为 false // ========================================= // ====== MQTT 配置(从 mqtt-config.json 读取) ====== const mqttCfg = getMqttConfig(); const MQTT_ENABLED = mqttCfg.enabled === true; // 默认禁用,需显式启用 if (MQTT_ENABLED) { initMqtt(mqttCfg); } // ========================================= // 指令定义(HEX 字符串) const ACTIVATE_CMD_HEX = '16 2f 40 31 04 03'; const GET_DATA_CMD_HEX = '16 31 3f 43 44 04 03'; // ===== 工具函数 ===== function hexToBuffer(hexStr) { const hex = hexStr.replace(/\s+/g, ''); const bytes = []; for (let i = 0; i < hex.length; i += 2) { bytes.push(parseInt(hex.substr(i, 2), 16)); } return Buffer.from(bytes); } function bufferToHex(buf) { return buf.toString('hex').replace(/(.{2})/g, '$1 ').trim().toUpperCase(); } // 格式化时间为 yyyy-MM-dd HH:mm:ss function formatDateTime(date = new Date()) { const pad = (n) => String(n).padStart(2, '0'); return `${date.getFullYear()}-${pad(date.getMonth() + 1)}-${pad(date.getDate())} ${pad(date.getHours())}:${pad(date.getMinutes())}:${pad(date.getSeconds())}`; } const ACTIVATE_CMD = hexToBuffer(ACTIVATE_CMD_HEX); const GET_DATA_CMD = hexToBuffer(GET_DATA_CMD_HEX); const FRAME_END = Buffer.from([0x06, 0x03]); // 采样十六进制输出,避免日志过长 function bufferToHexSample(buf, limit = 64) { const hex = bufferToHex(buf); if (buf.length <= limit) return hex; const sliced = buf.subarray(0, limit); return bufferToHex(sliced) + ' ...'; } // 指令脱敏:只输出长度与短摘要,避免泄露明文HEX function obfuscateHex(hexStr) { if (!hexStr) return ''; const normalized = hexStr.replace(/\s+/g, '').toUpperCase(); const len = Math.floor(normalized.length / 2); const sha = crypto.createHash('sha256').update(normalized).digest('hex').slice(0, 8).toUpperCase(); return ``; } // ===== 发送辅助:处理回压 ===== function safeWrite(socket, buffer, label, log) { try { const ok = socket.write(buffer); if (!ok) { console.log(`⏳ 写入等待回压恢复: ${label}`); if (log) log.warn('tx.backpressure', { label, bytes: buffer.length }); socket.once('drain', () => { console.log(`✅ 回压恢复,已继续发送: ${label}`); if (log) log.info('tx.drain', { label }); }); } } catch (err) { console.error(`❌ 发送失败 (${label}):`, err.message); if (log) log.error('tx.error', { label, error: err.message }); } } // ===== 解析逻辑(复用 app.js 思路) ===== function extractAllFields(frame) { const ascii = frame.toString('ascii'); const fields = {}; const regex = /([A-Z])([+-]?\d{3,6})/g; let match; while ((match = regex.exec(ascii)) !== null) { const tag = match[1]; const valueStr = match[2]; const num = parseInt(valueStr, 10); if (!isNaN(num) && !(tag in fields)) { fields[tag] = num; } } return fields; } function extractSerialNumber(frame) { const ascii = frame.toString('ascii'); console.log(` └─ 尝试提取序列号,完整 ASCII: "${ascii}"`); const iMatches = [...ascii.matchAll(/I[A-Za-z0-9]{8}(?=[A-Z])/g)]; console.log(` • 找到 ${iMatches.length} 个匹配的 I 字段`); if (iMatches.length < 2) { console.log(' • 未找到第二个 I 字段'); return null; } const secondMatch = iMatches[1]; const serial = secondMatch[0].slice(1); console.log(` • 找到序列号: "${serial}"`); if (/[A-Za-z]/.test(serial) && /\d/.test(serial)) return serial; return null; } // 尝试从原始 ASCII 中探测 Kt/V(例如: KTV034, Kt/V=035 等) function extractKTVFromAscii(ascii) { // 常见书写: KTV034、Kt/V=035、ktv: 034 const patterns = [ /(KTV)\s*[:=]?\s*([0-9]{2,3})/i, /(KT\s*\/\s*V)\s*[:=]?\s*([0-9]{2,3})/i, /(KTV)\s*([0-9]{2,3})/i ]; for (const re of patterns) { const m = ascii.match(re); if (m && m[2]) { const raw = parseInt(m[2], 10); if (!isNaN(raw)) { return +(raw / 100).toFixed(2); } } } return null; } function printClinicalParams(params) { console.log('\n🩺 临床参数解析(TCP 服务端接收):'); const known = {}; if ('V' in params) { console.log(` • 静脉压: ${params.V} mmHg`); known.V = true; } if ('A' in params) { console.log(` • 动脉压: ${params.A} mmHg`); known.A = true; } if ('U' in params) { console.log(` • 跨膜压 (TMP): ${params.U} mmHg`); known.U = true; } if ('T' in params) { console.log(` • 透析液温度: ${(params.T / 10).toFixed(1)} °C`); known.T = true; } if ('C' in params) { console.log(` • 电导率: ${(params.C / 10).toFixed(1)} mS/cm`); known.C = true; } if ('I' in params) { console.log(` • 实际透析液流量: ${params.I} mL/min`); known.I = true; } if ('Q' in params) { console.log(` • 有效血流量: ${params.Q} mL/min`); known.Q = true; } if ('R' in params) { console.log(` • 超滤速率: ${params.R} mL/h`); known.R = true; } if ('P' in params) { console.log(` • 已完成超滤量: ${(params.P / 1000).toFixed(2)} L`); known.P = true; } if ('G' in params) { console.log(` • 超滤目标量: ${(params.G / 1000).toFixed(2)} L`); known.G = true; } if ('H' in params) { const mins = params.H; console.log(` • 剩余治疗时间: ${mins} 分钟 (${Math.floor(mins / 60)}h ${mins % 60}m)`); known.H = true; } if ('N' in params) { console.log(` • 钠浓度: ${(params.N / 10).toFixed(1)} mmol/L`); known.N = true; } if ('Y' in params) { console.log(` • 累计血容量: ${(params.Y / 10).toFixed(1)} L`); known.Y = true; } const maybe = { B: '血泵设定值?', S: '系统状态码?', X: '静脉壶状态?', M: '总治疗时间?', L: '肝素累计?', Z: 'TMP补偿?' }; for (const [tag, desc] of Object.entries(maybe)) { if (tag in params && !(tag in known)) { console.log(` • ${desc} (${tag}): ${params[tag]}`); known[tag] = true; } } const unknownTags = Object.keys(params).filter(t => !(t in known)); if (unknownTags.length > 0) { console.log(` • 未识别字段: ${unknownTags.map(t => `${t}=${params[t]}`).join(', ')}`); } } // 构建中文属性的 JSON 结果 function buildChineseJson(params, serialFromState) { const map = { V: '静脉压', A: '动脉压', U: '跨膜压', T: '透析液温度(°C)', C: '电导率(mS/cm)', I: '实际透析液流量', Q: '有效血流量', R: '超滤速率', P: '已完成超滤量(L)', G: '超滤目标量(L)', H: '剩余治疗时间(分钟)', N: '钠浓度(mmol/L)', Y: '累计血容量(L)', B: '血泵设定值', S: '系统状态码', X: '静脉壶状态', M: '总治疗时间(分钟)', L: '肝素累计', Z: 'TMP补偿' }; function convert(tag, val) { if (val === undefined || val === null) return val; switch (tag) { case 'T': case 'C': case 'N': return parseFloat((val / 10).toFixed(1)); case 'Y': return parseFloat((val / 10).toFixed(1)); case 'P': case 'G': return parseFloat((val / 1000).toFixed(2)); default: return val; } } const obj = { '接收时间': formatDateTime(), '设备序列号': serialFromState || null }; const knownTags = new Set(Object.keys(map)); for (const [tag, cname] of Object.entries(map)) { if (tag in params) { obj[cname] = convert(tag, params[tag]); } } const others = {}; for (const [tag, value] of Object.entries(params)) { if (!knownTags.has(tag)) { others[tag] = value; } } if (Object.keys(others).length > 0) { obj['未识别字段'] = others; } return obj; } function handleFrame(frame, state) { const hexStr = frame.toString('hex').replace(/(.{2})/g, '$1 ').trim().toUpperCase(); const asciiFull = frame.toString('ascii').replace(/[^\x20-\x7E]/g, '.'); console.log(`\n📥 [${new Date().toLocaleTimeString()}] 接收到完整数据帧`); console.log(` HEX (${frame.length} bytes): ${hexStr}`); console.log(` ASCII: "${asciiFull}"`); state.log.info('rx.frame', { clientId: state.id, serial: state.detectedSerial, remote: state.remoteAddress, bytes: frame.length, hex: hexStr, ascii: asciiFull }); if (frame.length < 15) { console.log(' └─ ⚠️ 忽略短帧(ACK)'); return; } if (!state.detectedSerial) { const serial = extractSerialNumber(frame); if (serial) { state.detectedSerial = serial; // 切换到设备子日志器,携带 IP 与 clientId state.log = logger.forDevice(serial, state.remoteAddress, state.id); console.log(` 🏷️ 设备序列号: ${serial}`); state.log.info('id.detected', { clientId: state.id, serial, remote: state.remoteAddress }); } } const params = extractAllFields(frame); printClinicalParams(params); state.log.info('parse.params', { clientId: state.id, serial: state.detectedSerial, params }); // 额外探测 Kt/V 值 const asciiRaw = frame.toString('ascii'); const ktvDetected = extractKTVFromAscii(asciiRaw); if (ktvDetected != null) { console.log(` • Kt/V(探测): ${ktvDetected}`); } // === 生成并打印中文属性的 JSON === const dataJson = buildChineseJson(params, state.detectedSerial); if (ktvDetected != null) { dataJson['Kt/V'] = ktvDetected; } console.log(`\n🧾 数据JSON [#${state.id}]:`); console.log(JSON.stringify(dataJson, null, 2)); // 写入设备数据缓存(按设备序列号) if (state.detectedSerial) { deviceCache.set(state.detectedSerial, { data: dataJson, updatedAt: new Date().toISOString(), clientId: state.id }); state.log.info('cache.update', { serial: state.detectedSerial, clientId: state.id }); // 上报到阿里云 IoT(使用序列号作为设备ID,受 aliyun-config.json 的 enabled 控制) if (ALIYUN_ENABLED) { try { const aliyunProps = mapToAliyunProps(params, state); if (Object.keys(aliyunProps).length > 0) { console.log(`\n📤 准备上报阿里云 [${state.detectedSerial}] 属性:`); console.log(JSON.stringify(aliyunProps, null, 2)); state.log.info('aliyun.sendProps', { serial: state.detectedSerial, clientId: state.id, ip: state.remoteAddress, props: aliyunProps }); sendDataToAliyun(state.detectedSerial, aliyunProps, { ip: state.remoteAddress, clientId: state.id }); } } catch (e) { console.error('❌ 上报阿里云失败:', e.message); } } else { state.log.info('aliyun.disabled', { serial: state.detectedSerial, clientId: state.id, reason: 'aliyun-config.json enabled=false' }); } // 发布到 MQTT(受 mqtt-config.json 的 enabled 控制) if (MQTT_ENABLED) { try { // MQTT 使用与阿里云物模型相同的格式 const aliyunProps = mapToAliyunProps(params, state); publishToMqtt(state.detectedSerial, aliyunProps, `${state.remoteAddress}:${state.id}`); } catch (e) { console.error('❌ MQTT 发布失败:', e.message); state.log.error('mqtt.publish.error', { serial: state.detectedSerial, error: e.message }); } } } } // 每个连接维护独立缓冲区 function createConnectionHandler(socket) { // 连接数量限制 if (clients.size >= MAX_CLIENTS) { console.warn('⚠️ 超出最大并发限制,拒绝新连接'); try { socket.destroy(); } catch {} return; } const id = ++connectionSeq; const state = { id, rxBuffer: Buffer.alloc(0), detectedSerial: null, timerId: null, activateTimerId: null, lastActivityAt: Date.now(), log: logger.child({ clientId: id }) }; clients.set(id, { socket, state }); socket.setKeepAlive(true, 10000); socket.setNoDelay(true); const remote = `${socket.remoteAddress}:${socket.remotePort}`; state.remoteAddress = socket.remoteAddress; // 将连接级子日志器包含 IP,便于排错 state.log = logger.child({ clientId: id, ip: state.remoteAddress }); console.log(`🔗 [#${id}] 客户端已连接: ${remote}`); state.log.info('conn.open', { clientId: id, remote: remote }); // 连接后立即发送激活指令(日志脱敏) console.log(`📤 [#${id}] ${new Date().toLocaleTimeString()} 发送 激活指令: ${obfuscateHex(ACTIVATE_CMD_HEX)}`); safeWrite(socket, ACTIVATE_CMD, '激活指令', state.log); state.log.info('tx.activate', { clientId: id, hexObf: obfuscateHex(ACTIVATE_CMD_HEX), bytes: ACTIVATE_CMD.length }); // 周期发送获取治疗数据指令(日志脱敏) state.timerId = setInterval(() => { console.log(`📤 [#${id}] ${new Date().toLocaleTimeString()} 发送 获取治疗数据: ${obfuscateHex(GET_DATA_CMD_HEX)}`); safeWrite(socket, GET_DATA_CMD, '获取治疗数据', state.log); state.log.info('tx.getData', { clientId: id, hexObf: obfuscateHex(GET_DATA_CMD_HEX), bytes: GET_DATA_CMD.length }); }, GET_DATA_INTERVAL); // 周期发送激活指令(保活,日志脱敏) state.activateTimerId = setInterval(() => { console.log(`📤 [#${id}] ${new Date().toLocaleTimeString()} 发送 保活激活指令: ${obfuscateHex(ACTIVATE_CMD_HEX)}`); safeWrite(socket, ACTIVATE_CMD, '保活激活指令', state.log); state.log.info('tx.activate.periodic', { clientId: id, hexObf: obfuscateHex(ACTIVATE_CMD_HEX), bytes: ACTIVATE_CMD.length }); }, ACTIVATE_INTERVAL); socket.on('data', (chunk) => { state.lastActivityAt = Date.now(); state.log.info('rx.chunk', { clientId: id, bytes: chunk.length, hexSample: bufferToHexSample(chunk), truncated: chunk.length > 64 }); state.rxBuffer = Buffer.concat([state.rxBuffer, chunk]); let endIndex; while ((endIndex = state.rxBuffer.indexOf(FRAME_END)) !== -1) { const frame = state.rxBuffer.subarray(0, endIndex + FRAME_END.length); handleFrame(frame, state); state.rxBuffer = state.rxBuffer.subarray(endIndex + FRAME_END.length); } if (state.rxBuffer.length > 1024 * 1024) { console.warn('⚠️ 接收缓冲区异常增长,清空'); state.log.warn('rx.buffer.reset', { reason: 'overflow', size: state.rxBuffer.length }); state.rxBuffer = Buffer.alloc(0); } }); socket.on('error', (err) => { console.error(`❌ [#${id}] 连接错误 (${remote}):`, err.message); state.log.error('conn.error', { clientId: id, remote, error: err.message }); }); socket.on('close', () => { console.log(`🔌 [#${id}] 客户端断开: ${remote}`); state.log.info('conn.close', { clientId: id, remote }); if (state.timerId) clearInterval(state.timerId); if (state.activateTimerId) clearInterval(state.activateTimerId); clients.delete(id); }); } // 创建并启动服务器 const server = net.createServer(createConnectionHandler); server.maxConnections = MAX_CLIENTS; const clients = new Map(); let connectionSeq = 0; const deviceCache = new Map(); // 序列号 -> 最新数据 let shutdownInitiated = false; const SOCKET_GRACE_MS = 2000; // socket优雅关闭等待 const FORCE_EXIT_MS = 8000; // 最长等待后强制退出 // 将解析到的参数映射为阿里云物模型的属性标识,只保留能匹配的字段 function mapToAliyunProps(params, state) { const out = {}; // A 脱水目标量 ← G 超滤目标量 if (params.G != null) out.A = (params.G / 1000).toFixed(2); // B 脱水量 ← P 已完成超滤量 if (params.P != null) out.B = (params.P / 1000).toFixed(2); // C 脱水速率 ← R 超滤速率 if (params.R != null) out.C = String(params.R); // D 血液流速 ← Q 有效血流量 if (params.Q != null) out.D = String(params.Q); // o 动脉压 ← A 动脉压 if (params.A != null) out.o = String(params.A); // E 肝素速率(暂无解析源,跳过) // F 透析液温度 ← T/10 if (params.T != null) out.F = (params.T / 10).toFixed(1); // G 透析液电导度 ← C/10 if (params.C != null) out.G = (params.C / 10).toFixed(1); // H 静脉压 ← V if (params.V != null) out.H = String(params.V); // I 透析液压(暂无解析源,跳过) // J 跨膜压 ← U if (params.U != null) out.J = String(params.U); // K 透析时间 ← 优先 M(总治疗时间),其次 H(剩余时间) if (params.H != null) out.K = String(params.H); else if (params.H != null) out.K = String(params.H); // L 透析液流速 ← I 实际透析液流量 if (params.I != null) out.L = String(params.I); // U 血流总量 ← Y 累计血容量,转换为 L(一位小数) if (params.Y != null) out.U = (params.Y / 10).toFixed(1); // 辅助属性 if (state.detectedSerial) out.n = state.detectedSerial; // 序列号 out.suedtime = formatDateTime(); // 传输时间 out.deviceType = 'Fresenius4008S'; out.deviceName = state.detectedSerial || 'unknown'; if (state.remoteAddress) out.IPAddress = state.remoteAddress; return out; } server.on('error', (err) => { console.error('❌ 服务器错误:', err.message); logger.error('server.error', { error: err.message }); }); server.listen(PORT, HOST, () => { console.log(`🩺 Fresenius 4008S TCP 服务端已启动`); console.log(`🛰️ 监听: ${HOST}:${PORT} | 最大并发: ${MAX_CLIENTS}`); console.log(`⚙️ TCP配置来源: ${tcpCfg.__meta?.source || 'unknown'}${tcpCfg.__meta?.path ? ' (' + tcpCfg.__meta.path + ')' : ''}`); console.log(`📏 下发指令: 激活=${obfuscateHex(ACTIVATE_CMD_HEX)},取数=${obfuscateHex(GET_DATA_CMD_HEX)}`); console.log(`⏱️ 间隔: 获取数据=${GET_DATA_INTERVAL}ms;保活激活=${ACTIVATE_INTERVAL}ms`); console.log(`☁️ 阿里云IoT: ${ALIYUN_ENABLED ? '✅ 已启用' : '❌ 已禁用'} (配置来源: ${aliyunCfg.__meta?.source || 'unknown'})`); console.log(`📡 MQTT: ${MQTT_ENABLED ? '✅ 已启用' : '❌ 已禁用'} ${MQTT_ENABLED ? `(${mqttCfg.brokerUrl}:${mqttCfg.port})` : '(配置来源: ' + (mqttCfg.__meta?.source || 'unknown') + ')'}`); console.log('📜 行为: 连接即下发激活;按配置周期保活激活与获取数据;收到数据帧后解析'); logger.info('server.start.config', { tcp: { source: tcpCfg.__meta?.source, path: tcpCfg.__meta?.path, host: HOST, port: PORT, maxClients: MAX_CLIENTS, getDataIntervalMs: GET_DATA_INTERVAL, activateIntervalMs: ACTIVATE_INTERVAL, }, http: { enabled: httpCfg.enabled, source: httpCfg.__meta?.source, path: httpCfg.__meta?.path, host: httpCfg.host, port: httpCfg.port, }, aliyun: { enabled: ALIYUN_ENABLED, source: aliyunCfg.__meta?.source, path: aliyunCfg.__meta?.path, }, mqtt: { enabled: MQTT_ENABLED, source: mqttCfg.__meta?.source, path: mqttCfg.__meta?.path, brokerUrl: MQTT_ENABLED ? mqttCfg.brokerUrl : undefined, port: MQTT_ENABLED ? mqttCfg.port : undefined, } }); }); // ===== HTTP 服务:使用独立模块与配置启动 ===== function loadHttpConfig() { const baseDir = (typeof process !== 'undefined' && process.pkg) ? path.dirname(process.execPath) : __dirname; const candidates = [ { path: path.join(baseDir, 'http-config.json'), source: 'exedir' }, { path: path.join(__dirname, 'http-config.json'), source: 'srcdir' } ]; for (const c of candidates) { try { const raw = fs.readFileSync(c.path, 'utf8'); const obj = JSON.parse(raw); obj.__meta = { source: c.source, path: c.path }; // 默认值兜底 if (obj.enabled === undefined) obj.enabled = false; if (obj.host === undefined) obj.host = '0.0.0.0'; if (obj.port === undefined) obj.port = 8080; return obj; } catch (_) { /* continue */ } } const def = { enabled: false, host: '0.0.0.0', port: 8080 }; def.__meta = { source: 'default', path: null }; return def; } const httpCfg = loadHttpConfig(); if (httpCfg.enabled) { const host = httpCfg.host || HOST; const port = httpCfg.port || 8080; console.log(`🌐 HTTP API 配置来源: ${httpCfg.__meta?.source || 'unknown'}${httpCfg.__meta?.path ? ' (' + httpCfg.__meta.path + ')' : ''}`); console.log(`🌐 HTTP API 监听: ${host}:${port}`); logger.info('http.config', { source: httpCfg.__meta?.source, path: httpCfg.__meta?.path, host, port }); startHttpApi({ host, port, getDevices: () => { const serials = []; for (const { state } of clients.values()) { if (state.detectedSerial) serials.push(state.detectedSerial); } return serials; }, getAllData: () => { const all = {}; for (const [serial, entry] of deviceCache.entries()) { all[serial] = { updatedAt: entry.updatedAt, data: entry.data }; } return all; }, getDataBySerial: (serial) => { const entry = deviceCache.get(serial); return entry ? { serial, updatedAt: entry.updatedAt, data: entry.data } : null; } }); } else { console.log('HTTP API 未启用(可通过 http-config.json 启用)'); } // 优雅关闭 function shutdown() { if (shutdownInitiated) return; shutdownInitiated = true; console.log('\n🛑 正在关闭服务器...'); logger.info('server.shutdown.start'); // 停止新连接 try { server.close(() => { console.log('✅ 已关闭监听 (不再接受新连接)'); logger.info('server.stop.listen'); }); } catch (e) { console.error('server.close 出错:', e.message); logger.error('server.stop.error', { error: e.message }); } // 逐个关闭现有连接:清理计时器,优雅结束,再强制销毁 for (const [id, { socket, state }] of clients.entries()) { try { if (state && state.timerId) { clearInterval(state.timerId); state.timerId = null; } if (state && state.activateTimerId) { clearInterval(state.activateTimerId); state.activateTimerId = null; } const remote = `${socket.remoteAddress}:${socket.remotePort}`; console.log(`🔻 关闭连接 [#${id}] ${remote}`); (state.log || logger).info('conn.shutdown', { clientId: id, remote }); // 优雅结束 try { socket.end(); } catch {} // 超时后强制销毁 setTimeout(() => { if (!socket.destroyed) { try { socket.destroy(); } catch {} (state.log || logger).warn('conn.destroy', { clientId: id, remote }); } clients.delete(id); }, SOCKET_GRACE_MS); } catch (e) { logger.error('conn.shutdown.error', { clientId: id, error: e.message }); } } // 关闭阿里云 IoT 所有设备连接 try { closeAllConnections(); logger.info('aliyun.close.all'); } catch (e) { console.error('关闭阿里云 IoT 连接失败:', e.message); logger.error('aliyun.close.error', { error: e.message }); } // 关闭 MQTT 连接 try { if (MQTT_ENABLED) { closeMqtt(); } } catch (e) { console.error('关闭 MQTT 连接失败:', e.message); logger.error('mqtt.close.error', { error: e.message }); } // 最长等待后强制退出 const forceTimer = setTimeout(() => { const remaining = clients.size; if (remaining > 0) { console.warn(`⏱️ 强制退出,仍有 ${remaining} 个连接未释放`); logger.warn('server.stop.force', { remaining }); } process.exit(0); }, FORCE_EXIT_MS); // 若全部释放,尽快退出 const checkClear = () => { if (clients.size === 0) { clearTimeout(forceTimer); console.log('🟢 连接已清空,退出进程'); logger.info('server.stop'); process.exit(0); } }; setTimeout(checkClear, SOCKET_GRACE_MS + 200); } process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown);