const fs = require('fs'); const path = require('path'); const net = require('net'); const http = require('http'); const crypto = require('crypto'); const axios = require('axios'); const mqtt = require('mqtt'); const Qs = require('qs'); const util = require('util'); const winston = require('winston'); require('winston-daily-rotate-file'); const { extractFramesFromUtf16leBuffer } = require('./framing'); const { parseAndPrintData } = require('./index'); // ============================================================================ // 网关主流程(gateway.js) // 1) 读取配置并初始化日志/缓存/发布器 // 2) 为每台透析机创建设备客户端 DeviceClient 并建立 TCP 连接 // 3) 收到字节流后进行分帧(UTF-16LE XML) // 4) 调用 index.parseAndPrintData 解析 XML -> 结构化 frameObj // 5) 统一标准化后写入本地缓存(latest-device-cache.json) // 6) 按发布目标转发到 MQTT / 阿里云 IoT // 7) 连接异常时记录日志并自动重连 // ============================================================================ function nowIso() { return new Date().toISOString(); } function readJson(filePath) { return JSON.parse(fs.readFileSync(filePath, 'utf8')); } function stringifyLogArg(arg) { if (typeof arg === 'string') return arg; return util.inspect(arg, { depth: 6, colors: false, breakLength: 120 }); } let auditLogger = null; function setAuditLogger(logger) { auditLogger = logger; } function logEvent(level, event, fields = {}) { if (!auditLogger) return; auditLogger.log({ level, message: event, event, ...fields, }); } // 日志级别对应的图标,提升控制台/文件可读性 const LEVEL_ICON = { error: '❌', warn: '⚠️ ', info: 'ℹ️ ', debug: '🔍' }; function setupPersistentConsoleLogger(config = {}) { // 日志目录:统一存放到 logs/ 子目录,按日期自动分文件 // 文件命名: // logs/gateway-YYYY-MM-DD.log 人类可读文本(每行一条) // logs/gateway-YYYY-MM-DD.jsonl 结构化 JSON(按设备/事件检索) // 保留天数:默认 30 天,超出自动删除 const logsDir = path.resolve(process.cwd(), config.logsDir || 'logs'); if (!fs.existsSync(logsDir)) fs.mkdirSync(logsDir, { recursive: true }); const maxDays = String(config.maxDays || 30) + 'd'; const logLevel = config.level || 'info'; // ── 格式1:人类可读文本 ────────────────────────────────────────────── // 示例: // 2026-03-15 14:23:01 INFO ✅ [设备接入] 连接成功 | name=床位01 endpoint=192.168.1.101:3021 // 2026-03-15 14:23:05 WARN ⚠️ [设备断开] 连接关闭 | name=床位01 abnormal=yes const textFormat = winston.format.combine( winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), winston.format.printf(({ timestamp, level, message, event, ...meta }) => { const icon = LEVEL_ICON[level] || ' '; const lvl = level.toUpperCase().padEnd(5); // 结构化事件额外附加关键字段(设备名、主机等),方便肉眼扫描 const extras = []; if (event) extras.push(`event=${event}`); if (meta.deviceName) extras.push(`device=${meta.deviceName}`); if (meta.deviceKey) extras.push(`key=${meta.deviceKey}`); if (meta.host && meta.port) extras.push(`endpoint=${meta.host}:${meta.port}`); if (meta.error) extras.push(`err=${meta.error}`); if (meta.payloadBytes != null) extras.push(`bytes=${meta.payloadBytes}`); const suffix = extras.length ? ` [${extras.join(' | ')}]` : ''; return `${timestamp} ${lvl} ${icon} ${message}${suffix}`; }) ); // ── 格式2:结构化 JSON ─────────────────────────────────────────────── // 每行一个 JSON 对象,包含所有字段,供程序解析/查询 const jsonFormat = winston.format.combine( winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), winston.format.json() ); // ── 传输层:winston-daily-rotate-file ─────────────────────────────── const textTransport = new winston.transports.DailyRotateFile({ dirname: logsDir, filename: 'gateway-%DATE%.log', datePattern: 'YYYY-MM-DD', maxFiles: maxDays, format: textFormat, auditFile: path.join(logsDir, '.audit-text.json'), }); const jsonTransport = new winston.transports.DailyRotateFile({ dirname: logsDir, filename: 'gateway-%DATE%.jsonl', datePattern: 'YYYY-MM-DD', maxFiles: maxDays, format: jsonFormat, auditFile: path.join(logsDir, '.audit-json.json'), }); const logger = winston.createLogger({ level: logLevel, transports: [textTransport, jsonTransport], }); setAuditLogger(logger); // ── 拦截 console,同时输出到终端 + 文件 ───────────────────────────── const original = { log: console.log.bind(console), warn: console.warn.bind(console), error: console.error.bind(console), }; const write = (level, args, consoleWriter) => { consoleWriter(...args); try { const content = args.map(stringifyLogArg).join(' '); logger.log({ level, message: content }); } catch (e) { original.error(`❌ 日志落盘失败: ${e.message}`); } }; console.log = (...args) => write('info', args, original.log); console.warn = (...args) => write('warn', args, original.warn); console.error = (...args) => write('error', args, original.error); process.on('uncaughtException', (error) => console.error('❌ 未捕获异常:', error?.stack ?? error)); process.on('unhandledRejection', (reason) => console.error('❌ 未处理 Promise 拒绝:', reason)); // 当天日志文件路径(供启动提示用) const today = new Date().toISOString().slice(0, 10); const todayLog = path.join(logsDir, `gateway-${today}.log`); const todayJson = path.join(logsDir, `gateway-${today}.jsonl`); console.log(`📝 日志已启用 → ${logsDir}`); console.log(` 文本: gateway-YYYY-MM-DD.log JSON: gateway-YYYY-MM-DD.jsonl 保留: ${maxDays}`); logEvent('info', 'logger_initialized', { logsDir, todayLog, todayJson, level: logLevel, maxDays, }); return { logsDir, todayLog, todayJson, close: () => { try { logger.end(); } catch (_) {} }, }; } function safePreview(text, maxLen = 220) { const s = String(text || '').replace(/\s+/g, ' ').trim(); if (s.length <= maxLen) return s; return `${s.slice(0, maxLen)}...`; } class LatestCacheStore { constructor(cacheFilePath) { this.cacheFilePath = cacheFilePath; this.map = new Map(); this.flushTimer = null; this.load(); } load() { if (!fs.existsSync(this.cacheFilePath)) return; try { const obj = readJson(this.cacheFilePath); Object.entries(obj).forEach(([key, value]) => this.map.set(key, value)); console.log(`🗂️ 已加载本地缓存: ${this.cacheFilePath}, keys=${this.map.size}`); } catch (error) { console.warn(`⚠️ 缓存文件加载失败: ${error.message}`); } } set(deviceKey, payload) { this.map.set(deviceKey, payload); this.scheduleFlush(); } get(deviceKey) { return this.map.get(deviceKey); } toObject() { const obj = {}; for (const [key, value] of this.map.entries()) obj[key] = value; return obj; } scheduleFlush() { if (this.flushTimer) return; this.flushTimer = setTimeout(() => { this.flushTimer = null; this.flushNow(); }, 200); } flushNow() { try { const folder = path.dirname(this.cacheFilePath); if (!fs.existsSync(folder)) fs.mkdirSync(folder, { recursive: true }); fs.writeFileSync(this.cacheFilePath, JSON.stringify(this.toObject(), null, 2), 'utf8'); } catch (error) { console.error(`❌ 缓存落盘失败: ${error.message}`); logEvent('error', 'cache_flush_failed', { cacheFile: this.cacheFilePath, error: error.message, }); } } } class MqttPublisher { constructor(config) { this.config = config || {}; this.client = null; this.failureQueueConfig = { enabled: this.config.failureQueue?.enabled !== false, filePath: path.resolve(process.cwd(), this.config.failureQueue?.filePath || './cache/mqtt-failed-queue.json'), maxItems: Number(this.config.failureQueue?.maxItems) > 0 ? Number(this.config.failureQueue.maxItems) : 5000, retryIntervalMs: Number(this.config.failureQueue?.retryIntervalMs) > 0 ? Number(this.config.failureQueue.retryIntervalMs) : 5000, retryBatchSize: Number(this.config.failureQueue?.retryBatchSize) > 0 ? Number(this.config.failureQueue.retryBatchSize) : 100, dedupeEnabled: this.config.failureQueue?.dedupeEnabled !== false, dedupeFields: Array.isArray(this.config.failureQueue?.dedupeFields) && this.config.failureQueue.dedupeFields.length > 0 ? this.config.failureQueue.dedupeFields : ['topic', 'deviceKey', 'frameTimestamp'], }; this.failureQueue = []; this.queueFlushTimer = null; this.retryTimer = null; this.retryInProgress = false; this.loadFailureQueue(); } isEnabled() { return !!this.config.enabled; } loadFailureQueue() { if (!this.failureQueueConfig.enabled) return; const fp = this.failureQueueConfig.filePath; if (!fs.existsSync(fp)) return; try { const parsed = readJson(fp); if (Array.isArray(parsed)) { this.failureQueue = parsed; console.log(`📦 已加载 MQTT 失败队列: ${fp}, items=${this.failureQueue.length}`); logEvent('info', 'mqtt_queue_loaded', { filePath: fp, items: this.failureQueue.length, }); } } catch (error) { console.error(`❌ MQTT 失败队列加载失败: ${error.message}`); logEvent('error', 'mqtt_queue_load_failed', { filePath: fp, error: error.message, }); } } flushFailureQueueNow() { if (!this.failureQueueConfig.enabled) return; try { const fp = this.failureQueueConfig.filePath; const folder = path.dirname(fp); if (!fs.existsSync(folder)) fs.mkdirSync(folder, { recursive: true }); fs.writeFileSync(fp, JSON.stringify(this.failureQueue, null, 2), 'utf8'); } catch (error) { console.error(`❌ MQTT 失败队列落盘失败: ${error.message}`); logEvent('error', 'mqtt_queue_flush_failed', { filePath: this.failureQueueConfig.filePath, error: error.message, }); } } scheduleFailureQueueFlush() { if (!this.failureQueueConfig.enabled) return; if (this.queueFlushTimer) return; this.queueFlushTimer = setTimeout(() => { this.queueFlushTimer = null; this.flushFailureQueueNow(); }, 300); } buildDedupeKey(item) { const fields = this.failureQueueConfig.dedupeFields || []; if (fields.length === 0) return null; const key = fields.map((field) => { const val = item?.[field]; return `${field}=${val == null ? '' : String(val)}`; }).join('|'); return key || null; } enqueueFailedPublish(item, reason = 'publish_failed') { if (!this.failureQueueConfig.enabled) return; const queueItem = { ...item, reason, enqueuedAt: nowIso(), }; if (this.failureQueueConfig.dedupeEnabled) { const key = this.buildDedupeKey(queueItem); if (key) { const idx = this.failureQueue.findIndex((it) => this.buildDedupeKey(it) === key); if (idx >= 0) { this.failureQueue[idx] = queueItem; this.scheduleFailureQueueFlush(); this.scheduleRetryDrain(); logEvent('info', 'mqtt_queue_deduped', { dedupeKey: key, queueSize: this.failureQueue.length, topic: item.topic, deviceKey: item.deviceKey || null, }); return; } } } if (this.failureQueue.length >= this.failureQueueConfig.maxItems) { const dropped = this.failureQueue.shift(); logEvent('warn', 'mqtt_queue_drop_oldest', { maxItems: this.failureQueueConfig.maxItems, droppedTopic: dropped?.topic || null, }); } this.failureQueue.push(queueItem); this.scheduleFailureQueueFlush(); this.scheduleRetryDrain(); logEvent('warn', 'mqtt_queue_enqueued', { topic: item.topic, deviceKey: item.deviceKey || null, queueSize: this.failureQueue.length, reason, }); } attemptPublish(topic, payload, qos) { return new Promise((resolve, reject) => { if (!this.client || !this.client.connected) { reject(new Error('MQTT 未连接')); return; } this.client.publish(topic, payload, { qos }, (err) => { if (err) { reject(err); return; } resolve(); }); }); } scheduleRetryDrain() { if (!this.failureQueueConfig.enabled) return; if (this.retryTimer) return; this.retryTimer = setTimeout(() => { this.retryTimer = null; this.drainFailureQueue('timer').catch((error) => { console.warn(`⚠️ MQTT 队列重试异常: ${error.message}`); }); }, this.failureQueueConfig.retryIntervalMs); } async drainFailureQueue(trigger = 'manual') { if (!this.failureQueueConfig.enabled) return; if (this.retryInProgress) return; if (!this.client || !this.client.connected) { this.scheduleRetryDrain(); return; } if (this.failureQueue.length === 0) return; this.retryInProgress = true; let sent = 0; try { const batchSize = this.failureQueueConfig.retryBatchSize; const totalBefore = this.failureQueue.length; while (this.failureQueue.length > 0 && sent < batchSize) { const item = this.failureQueue[0]; try { await this.attemptPublish(item.topic, item.payload, item.qos); this.failureQueue.shift(); sent += 1; } catch (error) { logEvent('warn', 'mqtt_queue_retry_failed', { topic: item.topic, queueSize: this.failureQueue.length, error: error.message, }); break; } } if (sent > 0) { this.scheduleFailureQueueFlush(); logEvent('info', 'mqtt_queue_drained', { trigger, sent, remain: this.failureQueue.length, totalBefore, }); } } finally { this.retryInProgress = false; if (this.failureQueue.length > 0) this.scheduleRetryDrain(); } } ensureConnected() { if (this.client) return; const mqttConfig = this.config; this.client = mqtt.connect(mqttConfig.brokerUrl, { clientId: mqttConfig.clientId || `artis-gateway-${Date.now()}`, username: mqttConfig.username, password: mqttConfig.password, reconnectPeriod: mqttConfig.reconnectMs || 3000, clean: true, connectTimeout: 10_000, }); this.client.on('connect', () => { console.log(`✅ MQTT 已连接: ${mqttConfig.brokerUrl}`); logEvent('info', 'mqtt_connected', { brokerUrl: mqttConfig.brokerUrl, clientId: mqttConfig.clientId || null, }); this.drainFailureQueue('connect').catch((error) => { console.warn(`⚠️ MQTT 队列重放失败: ${error.message}`); }); }); this.client.on('error', (error) => { console.error(`❌ MQTT 错误: ${error.message}`); logEvent('error', 'mqtt_error', { brokerUrl: mqttConfig.brokerUrl, error: error.message, }); }); this.client.on('offline', () => { logEvent('warn', 'mqtt_offline', { brokerUrl: mqttConfig.brokerUrl }); }); this.client.on('close', () => { logEvent('warn', 'mqtt_closed', { brokerUrl: mqttConfig.brokerUrl }); }); } publish(frameData) { // 发布输入:标准化后的 frameData(包含 device/timestamp/params/latest/source) // 发布输出:topic = {topicPrefix}/{deviceKey}/latest // payload 为完全扁平的物模型对象,所有字段在同一层,与服务端接收格式一致。 if (!this.isEnabled()) return; this.ensureConnected(); const topicPrefix = this.config.topicPrefix || 'dialysis'; // 统一用 deviceName 作为 key const topic = `${topicPrefix}/${frameData.deviceName}`; const qos = Number.isInteger(this.config.qos) ? this.config.qos : 1; // 从 latest(OBX id -> {value,unit})中取指定 OBX 字段的值 const latest = frameData.latest || {}; const pick = (obxId) => { const p = latest[String(obxId)]; return p != null ? p.value : undefined; }; // 构造与物模型一致的扁平 payload,所有字段在同一层 const mqttPayload = { ts: Date.now(), // 毫秒时间戳 suedtime: frameData.timestamp || null, // 帧原始时间(YYYYMMDDHHmmss) deviceId: frameData.deviceName || null, // 设备唯一标识(用name) deviceNo: frameData.device || null, // 机器序列号 deviceType: this.config.deviceType || 'Artis', // 设备型号 IPAddress: frameData.source?.host || null, // 设备 IP zljd: pick(0), // 治疗阶段 sysj: pick(1), // 剩余时间(s) A: pick(34), // 设定超滤量(l) B: pick(2), // 超滤量(l) C: pick(3), // 超滤率(l/hr) D: pick(6), // 血流速(ml/min) sdxll: pick(32), // 设定血流速(ml/min) F: pick(7), // 温度(cel) J: pick(12), // TMP(mmHg) H: pick(13), // 静脉压(mmHg) o: pick(14), // 动脉压(mmHg) L: pick(8), // 透析液流速(ml/min) G: pick(4), // 脱水量(l) xinlv: pick(61), // 心率(pulse) qfqb: pick(65), // QF/QB(%) convVol: pick(67), // Conv Vol(l) convClear: pick(68), // Conv Clear(l/hr) }; // 去除 undefined 值,保持 payload 干净 Object.keys(mqttPayload).forEach(k => { if (mqttPayload[k] === undefined) delete mqttPayload[k]; }); const payload = JSON.stringify(mqttPayload); this.attemptPublish(topic, payload, qos).then(() => { const ts = new Date().toISOString().replace('T', ' ').slice(0, 19); console.log(`\n📤 [${ts}] MQTT 发布成功`); console.log(` Topic : ${topic}`); console.log(` Device : ${frameData.deviceName || '-'}`); console.log(` Time : ${frameData.timestamp || '-'}`); const fieldCount = Object.keys(mqttPayload).length; console.log(` Fields : ${fieldCount} 个物模型字段`); console.log(` Size : ${Buffer.byteLength(payload)} bytes`); logEvent('info', 'mqtt_publish_success', { topic, deviceKey: frameData.deviceName, deviceNo: frameData.device || null, sourceHost: frameData.source?.host || null, sourcePort: frameData.source?.port || null, frameTimestamp: frameData.timestamp || null, sentAt: nowIso(), payloadBytes: Buffer.byteLength(payload), payload: mqttPayload, }); }).catch((err) => { console.error(`❌ MQTT 发布失败 (${topic}): ${err.message}`); logEvent('error', 'mqtt_publish_failed', { topic, deviceKey: frameData.deviceKey, timestamp: frameData.timestamp, error: err.message, }); this.enqueueFailedPublish({ topic, payload, qos, deviceKey: frameData.deviceName, frameTimestamp: frameData.timestamp || null, }, err.message || 'publish_failed'); }); } close() { if (this.queueFlushTimer) { clearTimeout(this.queueFlushTimer); this.queueFlushTimer = null; } if (this.retryTimer) { clearTimeout(this.retryTimer); this.retryTimer = null; } this.flushFailureQueueNow(); if (!this.client) return; try { this.client.end(true); logEvent('info', 'mqtt_client_closed', { queueRemain: this.failureQueue.length, }); } catch (error) { console.warn(`⚠️ MQTT 关闭异常: ${error.message}`); } finally { this.client = null; } } } class AliyunIotPublisher { constructor(config) { this.config = config || {}; this.tripleCache = new Map(); this.clientMap = new Map(); this.propertyTypeMap = new Map(); this.allowedIdentifiers = null; this.productKey = this.config.productKey || null; this.region = this.config.region || 'cn-shanghai'; this.deviceNamePrefix = this.config.deviceNameRule?.prefix || ''; this.deviceSecretStoreFile = path.resolve( process.cwd(), this.config.deviceSecretStoreFile || './cache/aliyun-device-secrets.json' ); this.deviceSecretStore = {}; this.tripleApi = this.config.tripleApi || {}; this.defaultTripleApiConfig = { baseURL: 'https://things.icoldchain.cn/', url: 'device/info/getAliyunDeviceSecret', method: 'post', deviceNameField: 'deviceName', isAutoRegister: 1, timeoutMs: 15000, }; this.loadDeviceSecretStore(); this.loadThingModel(); } isEnabled() { return !!this.config.enabled; } loadDeviceSecretStore() { if (!fs.existsSync(this.deviceSecretStoreFile)) { this.deviceSecretStore = {}; return; } try { const parsed = readJson(this.deviceSecretStoreFile); this.deviceSecretStore = parsed && typeof parsed === 'object' ? parsed : {}; console.log(`🔐 已加载阿里云设备密钥缓存: ${this.deviceSecretStoreFile}, count=${Object.keys(this.deviceSecretStore).length}`); } catch (error) { this.deviceSecretStore = {}; console.warn(`⚠️ 阿里云设备密钥缓存加载失败: ${error.message}`); } } saveDeviceSecretStore() { try { const folder = path.dirname(this.deviceSecretStoreFile); if (!fs.existsSync(folder)) fs.mkdirSync(folder, { recursive: true }); fs.writeFileSync(this.deviceSecretStoreFile, JSON.stringify(this.deviceSecretStore, null, 2), 'utf8'); } catch (error) { console.error(`❌ 阿里云设备密钥缓存保存失败: ${error.message}`); logEvent('error', 'aliyun_secret_store_save_failed', { filePath: this.deviceSecretStoreFile, error: error.message, }); } } buildDeviceName(deviceNo) { const mode = this.config.deviceNameRule?.mode || 'fromDeviceNo'; const base = String(deviceNo || '').trim(); if (!base) throw new Error('设备编号为空,无法生成 deviceName'); const safe = base.replace(/[^a-zA-Z0-9._-]/g, '_'); if (mode !== 'fromDeviceNo') { throw new Error(`不支持的 deviceNameRule.mode: ${mode}`); } return `${this.deviceNamePrefix}${safe}`; } async getTripleByApi(deviceNo, deviceName) { const api = { ...this.defaultTripleApiConfig, ...(this.tripleApi || {}) }; const requestBody = { isAutoRegister: api.isAutoRegister, [api.deviceNameField || 'deviceName']: deviceName, ...(api.extraForm || {}), }; console.log(`🛰️ [三元组接口请求] baseURL=${api.baseURL} url=${api.url} deviceName=${deviceName}`); logEvent('info', 'aliyun_triple_api_request', { baseURL: api.baseURL, url: api.url, method: api.method, request: requestBody, }); try { const res = await axios({ url: api.url, method: api.method || 'post', baseURL: api.baseURL, headers: { 'Content-Type': 'application/x-www-form-urlencoded', ...(api.headers || {}), }, timeout: Number(api.timeoutMs) > 0 ? Number(api.timeoutMs) : 15000, data: Qs.stringify(requestBody), }); const responseData = res.data; try { const preview = JSON.stringify(responseData, null, 2); console.log('🛰️ [三元组接口原始响应]', `status=${res.status}`, '\n', preview); } catch (_) { console.log('🛰️ [三元组接口原始响应]', `status=${res.status}`, responseData); } logEvent('info', 'aliyun_triple_api_response', { httpStatus: res.status, response: responseData, }); const body = responseData?.data && typeof responseData.data === 'object' ? responseData.data : responseData; const code = responseData?.code ?? responseData?.Code ?? body?.code ?? body?.Code ?? 200; const success = code === 200 || code === '200' || code === 0 || responseData?.success === true || body?.success === true; const message = responseData?.message || responseData?.Message || body?.message || body?.Message || null; const productKey = body?.productKey || responseData?.productKey || this.productKey; const outDeviceName = body?.deviceName || responseData?.deviceName || deviceName; const deviceSecret = body?.deviceSecret || responseData?.deviceSecret; const region = body?.region || responseData?.region || this.region; if (!success || !productKey || !outDeviceName || !deviceSecret) { throw new Error(`三元组接口返回异常(code=${code}): ${message || JSON.stringify(responseData)}`); } return { productKey, deviceName: outDeviceName, deviceSecret, region, }; } catch (error) { const status = error?.response?.status; const data = error?.response?.data; const detail = data ? JSON.stringify(data) : error.message; logEvent('error', 'aliyun_triple_api_failed', { status: status || null, error: detail, deviceNo, deviceName, }); throw new Error(`三元组接口调用失败: ${status || '-'} ${detail}`); } } getPreRegisteredTriple(deviceNo, deviceName) { const map = this.config.preRegisteredDevices || {}; const direct = map[deviceNo] || map[deviceName]; if (!direct) return null; // 支持两种写法: // 1) "设备编号": "deviceSecret" // 2) "设备编号": { "deviceName": "xxx", "deviceSecret": "xxx" } if (typeof direct === 'string') { return { productKey: this.productKey, deviceName, deviceSecret: direct, }; } if (typeof direct === 'object' && direct.deviceSecret) { return { productKey: this.productKey, deviceName: direct.deviceName || deviceName, deviceSecret: direct.deviceSecret, }; } return null; } loadThingModel() { const thingModelPath = this.config.thingModelPath ? path.resolve(process.cwd(), this.config.thingModelPath) : path.resolve(process.cwd(), '阿里物模型.json'); if (!fs.existsSync(thingModelPath)) { console.warn(`⚠️ 未找到物模型文件: ${thingModelPath},将按默认映射直接上报`); return; } try { const model = readJson(thingModelPath); const props = Array.isArray(model.properties) ? model.properties : []; const allowed = new Set(); props.forEach((prop) => { if (!prop || !prop.identifier) return; allowed.add(prop.identifier); const type = prop.dataType?.type || 'text'; this.propertyTypeMap.set(prop.identifier, type); }); this.allowedIdentifiers = allowed; console.log(`✅ 已加载阿里物模型: ${thingModelPath}, properties=${allowed.size}`); } catch (error) { console.error(`❌ 物模型加载失败: ${error.message}`); } } toInt(value) { if (value == null || value === '') return undefined; const n = Number(value); if (Number.isNaN(n)) return undefined; return Math.round(n); } pickLatestValue(latestMap, obxId) { const p = latestMap?.[String(obxId)]; return p ? p.value : undefined; } normalizeByPropertyType(identifier, value) { if (value == null) return undefined; const type = this.propertyTypeMap.get(identifier); if (!type) return value; if (type === 'int') { return this.toInt(value); } if (type === 'float' || type === 'double') { const n = Number(value); return Number.isNaN(n) ? undefined : n; } if (type === 'bool') { if (typeof value === 'boolean') return value; const s = String(value).toLowerCase(); if (s === 'true' || s === '1') return true; if (s === 'false' || s === '0') return false; return undefined; } return String(value); } mapFrameToThingModelParams(frameData) { // 将 OBX 最新值映射为阿里物模型 identifier -> value。 // 若加载了物模型文件,则会按 allowedIdentifiers + dataType 做过滤与类型归一。 const latest = frameData.latest || {}; const mapped = { n: frameData.device || frameData.deviceKey, deviceType: this.config.deviceType || 'Artis', IPAddress: frameData.source?.host, zljd: this.pickLatestValue(latest, 0), sysj: this.pickLatestValue(latest, 1), A: this.pickLatestValue(latest, 34), B: this.pickLatestValue(latest, 2), C: this.pickLatestValue(latest, 3), D: this.pickLatestValue(latest, 6), sdxll: this.pickLatestValue(latest, 32), F: this.pickLatestValue(latest, 7), J: this.pickLatestValue(latest, 12), H: this.pickLatestValue(latest, 13), o: this.pickLatestValue(latest, 14), L: this.pickLatestValue(latest, 8), G: this.pickLatestValue(latest, 4), }; const out = {}; Object.entries(mapped).forEach(([identifier, rawVal]) => { if (rawVal == null || rawVal === '') return; if (this.allowedIdentifiers && !this.allowedIdentifiers.has(identifier)) return; const normalizedVal = this.normalizeByPropertyType(identifier, rawVal); if (normalizedVal == null) return; out[identifier] = normalizedVal; }); return out; } async getTripleByDeviceNo(deviceNo) { if (!deviceNo) throw new Error('设备编号为空,无法获取三元组'); if (this.tripleCache.has(deviceNo)) { return this.tripleCache.get(deviceNo); } const deviceName = this.buildDeviceName(deviceNo); const localCacheKey = this.productKey ? `${this.productKey}:${deviceName}` : null; // 先用本地持久化密钥 const existedSecret = localCacheKey ? this.deviceSecretStore?.[localCacheKey] : null; if (existedSecret && this.productKey) { const triple = { productKey: this.productKey, deviceName, deviceSecret: existedSecret, }; this.tripleCache.set(deviceNo, triple); return triple; } let triple; if (this.tripleApi && (this.tripleApi.url || this.tripleApi.baseURL)) { triple = await this.getTripleByApi(deviceNo, deviceName); } else { const fallback = this.getPreRegisteredTriple(deviceNo, deviceName); if (fallback) { console.warn(`⚠️ 未配置三元组接口,使用预注册设备密钥 | deviceNo=${deviceNo} deviceName=${fallback.deviceName}`); logEvent('warn', 'aliyun_use_preregistered_triple', { deviceNo, deviceName: fallback.deviceName, }); triple = fallback; } else { throw new Error('未配置三元组接口 (services.aliyunIot.tripleApi),且没有预注册设备密钥,无法获取三元组'); } } const cacheKey = `${triple.productKey}:${triple.deviceName}`; this.deviceSecretStore[cacheKey] = triple.deviceSecret; this.saveDeviceSecretStore(); logEvent('info', 'aliyun_device_registered', { productKey: triple.productKey, deviceName: triple.deviceName, mode: (this.tripleApi && (this.tripleApi.url || this.tripleApi.baseURL)) ? 'tripleApi' : 'preRegistered', }); this.tripleCache.set(deviceNo, triple); return triple; } createAliyunPassword(clientId, deviceName, productKey, deviceSecret) { const plain = `clientId${clientId}deviceName${deviceName}productKey${productKey}`; return crypto.createHmac('sha1', deviceSecret).update(plain).digest('hex'); } ensureClient(triple) { const key = `${triple.productKey}:${triple.deviceName}`; if (this.clientMap.has(key)) return this.clientMap.get(key); // 按阿里云官方示例:签名使用基础 clientId,扩展参数通过 |securemode=3,signmethod=hmacsha1| 追加 const clientIdBase = `artisGateway_${triple.deviceName}`; const clientIdRaw = `${clientIdBase}|securemode=3,signmethod=hmacsha1|`; const password = this.createAliyunPassword(clientIdBase, triple.deviceName, triple.productKey, triple.deviceSecret); const username = `${triple.deviceName}&${triple.productKey}`; const region = triple.region || this.config.region || 'cn-shanghai'; const host = this.config.brokerHost || `${triple.productKey}.iot-as-mqtt.${region}.aliyuncs.com`; const url = `mqtt://${host}:1883`; const client = mqtt.connect(url, { clientId: clientIdRaw, username, password, reconnectPeriod: 3000, clean: true, connectTimeout: 10_000, }); client.on('connect', () => { console.log(`✅ 阿里云IoT已连接: ${key}`); }); client.on('error', (error) => { console.error(`❌ 阿里云IoT错误 (${key}): ${error.message}`); }); this.clientMap.set(key, client); return client; } buildThingModelPayload(frameData) { const params = this.mapFrameToThingModelParams(frameData); return { id: `${Date.now()}`, version: '1.0', params, method: 'thing.event.property.post', }; } async publish(frameData) { if (!this.isEnabled()) return; // 直接用 deviceName 作为唯一标识 const deviceName = frameData.deviceName; if (!deviceName) { console.warn('⚠️ 阿里云IoT跳过:未解析到设备名称'); return; } // 兼容 getTripleByDeviceNo 只用 name const triple = await this.getTripleByDeviceNo(deviceName); const client = this.ensureClient(triple); const topic = `/sys/${triple.productKey}/${triple.deviceName}/thing/event/property/post`; const payload = this.buildThingModelPayload(frameData); client.publish(topic, JSON.stringify(payload), { qos: 1 }, (err) => { if (err) { console.error(`❌ 阿里云IoT发布失败 (${topic}): ${err.message}`); return; } console.log(`📤 阿里云IoT发布成功: ${topic}`); }); } close() { for (const [key, client] of this.clientMap.entries()) { try { client.end(true); logEvent('info', 'aliyun_client_closed', { key }); } catch (error) { console.warn(`⚠️ 阿里云IoT客户端关闭异常 (${key}): ${error.message}`); } } this.clientMap.clear(); } } class DeviceClient { constructor(deviceConfig, globalConfig, onFrame) { this.deviceConfig = deviceConfig; this.globalConfig = globalConfig; this.onFrame = onFrame; this.bufferCache = Buffer.alloc(0); this.socket = null; this.reconnectTimer = null; this.stopped = false; this.reconnectAttempts = 0; this.maxBufferBytes = Number(this.globalConfig.maxBufferBytes) > 0 ? Number(this.globalConfig.maxBufferBytes) : 8 * 1024 * 1024; this.lastConnectedAt = null; this.lastDisconnectedAt = null; this.lastDataAt = null; this.lastFrameAt = null; this.lastErrorAt = null; this.lastError = null; } start() { this.stopped = false; this.connect(); } stop() { this.stopped = true; if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } if (this.socket) { try { this.socket.removeAllListeners(); this.socket.destroy(); } catch (_) { } this.socket = null; } } getHealthSnapshot() { const isConnected = !!(this.socket && !this.socket.destroyed); return { name: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, connected: isConnected, reconnectAttempts: this.reconnectAttempts, bufferBytes: this.bufferCache.length, maxBufferBytes: this.maxBufferBytes, lastConnectedAt: this.lastConnectedAt, lastDisconnectedAt: this.lastDisconnectedAt, lastDataAt: this.lastDataAt, lastFrameAt: this.lastFrameAt, lastErrorAt: this.lastErrorAt, lastError: this.lastError, }; } calcReconnectDelayMs() { const policy = this.globalConfig.reconnectPolicy || {}; const baseMs = Number(policy.baseMs) > 0 ? Number(policy.baseMs) : (this.globalConfig.reconnectIntervalMs || 5000); const maxMs = Number(policy.maxMs) > 0 ? Number(policy.maxMs) : 300000; const factor = Number(policy.factor) > 1 ? Number(policy.factor) : 2; const jitterRatio = Number(policy.jitterRatio) >= 0 ? Number(policy.jitterRatio) : 0.2; const expDelay = Math.min(maxMs, Math.round(baseMs * Math.pow(factor, Math.max(0, this.reconnectAttempts)))); const jitter = Math.round(expDelay * jitterRatio); const minDelay = Math.max(1000, expDelay - jitter); const maxDelay = expDelay + jitter; return Math.floor(Math.random() * (maxDelay - minDelay + 1)) + minDelay; } connect() { if (this.stopped) return; // 每台设备一个独立 socket:独立收包、独立重连、独立日志。 if (this.socket) { this.socket.destroy(); this.socket = null; } const socket = new net.Socket(); this.socket = socket; socket.setKeepAlive(true, 10_000); console.log(`🔄 [设备接入] 准备连接 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port}`); logEvent('info', 'device_connecting', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, }); socket.connect(this.deviceConfig.port, this.deviceConfig.host, () => { this.reconnectAttempts = 0; this.lastConnectedAt = nowIso(); console.log(`✅ [设备接入] 连接成功 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port}`); logEvent('info', 'device_connected', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, connectedAt: nowIso(), }); }); socket.on('end', () => { console.warn(`⚠️ [设备断开] 远端主动结束连接(FIN) | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port}`); logEvent('warn', 'device_remote_end', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, }); }); socket.on('data', (chunk) => { // 数据链路:接收 chunk -> 追加缓存 -> 分帧 -> 逐帧解析 -> 回调上层分发。 this.lastDataAt = nowIso(); const cacheBefore = this.bufferCache.length; console.log(`📥 [数据接收] 收到chunk | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} bytes=${chunk.length} cacheBefore=${cacheBefore}`); logEvent('info', 'device_data_received', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, bytes: chunk.length, cacheBefore, receivedAt: nowIso(), }); this.bufferCache = Buffer.concat([this.bufferCache, chunk]); if (this.bufferCache.length > this.maxBufferBytes) { const droppedBytes = this.bufferCache.length - this.maxBufferBytes; this.bufferCache = this.bufferCache.slice(-this.maxBufferBytes); console.warn(`⚠️ [内存保护] 缓存超限已截断 | name=${this.deviceConfig.name || '-'} dropped=${droppedBytes} keep=${this.maxBufferBytes}`); logEvent('warn', 'device_buffer_trimmed', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, droppedBytes, keepBytes: this.maxBufferBytes, }); } try { const { frames, remainderBuffer } = extractFramesFromUtf16leBuffer(this.bufferCache); this.bufferCache = remainderBuffer; console.log(`🧱 [分帧结果] frames=${frames.length} cacheAfter=${this.bufferCache.length} | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port}`); for (const frame of frames) { const normalized = frame.replace(/^\uFEFF/, '').trim(); console.log(`🔍 [解析前] XML预览 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} xmlLen=${normalized.length} preview="${safePreview(normalized)}"`); logEvent('info', 'device_frame_before_parse', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, xmlLen: normalized.length, rawXmlPreview: safePreview(normalized, 1200), }); parseAndPrintData(normalized, { onFrame: (frameObj) => { // 解析成功后补充来源信息与设备级覆盖发布目标,再交给 onFrame。 const paramCount = Array.isArray(frameObj.params) ? frameObj.params.length : 0; console.log(`✅ [解析后] 结构化完成 | name=${this.deviceConfig.name || '-'} device=${frameObj.device || '-'} ts=${frameObj.timestamp || '-'} params=${paramCount}`); logEvent('info', 'device_frame_parsed', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, deviceNo: frameObj.device || null, frameTimestamp: frameObj.timestamp || null, paramCount, parsedParams: frameObj.params || [], }); this.lastFrameAt = nowIso(); Promise.resolve(this.onFrame({ ...frameObj, source: { host: this.deviceConfig.host, port: this.deviceConfig.port, }, devicePublishTargets: Array.isArray(this.deviceConfig.subscriptions) ? this.deviceConfig.subscriptions : undefined, deviceName: this.deviceConfig.name, })).catch((error) => { console.error(`❌ [分发异常] onFrame处理失败 | name=${this.deviceConfig.name || '-'} err=${error.message}`); logEvent('error', 'device_onframe_failed', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, error: error.message, }); }); }, }); } } catch (error) { console.warn(`⚠️ [分帧等待] 数据尚不足以形成完整帧 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} cacheNow=${this.bufferCache.length} err=${error.message}`); logEvent('warn', 'device_frame_waiting', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, cacheNow: this.bufferCache.length, error: error.message, }); } }); socket.on('error', (error) => { this.lastErrorAt = nowIso(); this.lastError = error.message; console.error(`❌ [设备异常] 连接错误 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} err=${error.message}`); logEvent('error', 'device_socket_error', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, error: error.message, }); }); socket.on('close', (hadError) => { this.lastDisconnectedAt = nowIso(); console.warn(`🔌 [设备断开] 连接关闭 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} abnormal=${hadError ? 'yes' : 'no'}`); logEvent('warn', 'device_closed', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, abnormal: !!hadError, closedAt: nowIso(), }); if (this.stopped) return; this.scheduleReconnect(); }); } scheduleReconnect() { if (this.stopped) return; if (this.reconnectTimer) return; const ms = this.calcReconnectDelayMs(); this.reconnectAttempts += 1; console.log(`⏳ [重连计划] ${ms}ms 后重连 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} attempt=${this.reconnectAttempts}`); logEvent('warn', 'device_reconnect_scheduled', { deviceName: this.deviceConfig.name || null, host: this.deviceConfig.host, port: this.deviceConfig.port, reconnectInMs: ms, attempt: this.reconnectAttempts, }); this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; this.connect(); }, ms); } } function setupHealthProbe(config, clients) { const probeConfig = config || {}; if (probeConfig.enabled === false) return null; const host = probeConfig.host || '127.0.0.1'; const port = Number(probeConfig.port) > 0 ? Number(probeConfig.port) : 18080; const staleFrameMs = Number(probeConfig.staleFrameMs) > 0 ? Number(probeConfig.staleFrameMs) : 180000; const server = http.createServer((req, res) => { const pathname = (req.url || '').split('?')[0]; if (pathname !== '/health' && pathname !== '/ready') { res.statusCode = 404; res.setHeader('Content-Type', 'application/json; charset=utf-8'); res.end(JSON.stringify({ error: 'not_found' })); return; } const now = Date.now(); const devices = clients.map((client) => client.getHealthSnapshot()); const connectedDevices = devices.filter((d) => d.connected).length; const freshDevices = devices.filter((d) => { if (!d.lastFrameAt) return false; return (now - Date.parse(d.lastFrameAt)) <= staleFrameMs; }).length; let status = 'ok'; if (connectedDevices === 0) status = 'degraded'; if (devices.length > 0 && freshDevices === 0) status = 'degraded'; const body = { status, timestamp: nowIso(), pid: process.pid, uptimeSec: Math.round(process.uptime()), memory: process.memoryUsage(), summary: { totalDevices: devices.length, connectedDevices, freshDevices, staleFrameMs, }, devices, }; res.statusCode = status === 'ok' ? 200 : 503; res.setHeader('Content-Type', 'application/json; charset=utf-8'); res.end(JSON.stringify(body)); }); server.listen(port, host, () => { console.log(`🩺 健康探针已启动: http://${host}:${port}/health`); logEvent('info', 'health_probe_started', { host, port, staleFrameMs }); }); server.on('error', (error) => { console.error(`❌ 健康探针启动/运行异常: ${error.message}`); logEvent('error', 'health_probe_error', { error: error.message, host, port }); }); return server; } function normalizeFrameForCache(frame) { // deviceKey 直接取配置文件 devices 中的 name 字段(通过 frame.deviceName 传递) const deviceKey = frame.deviceName || frame.device || `${frame.source?.host || 'unknown'}:${frame.source?.port || 0}`; const out = { deviceKey, deviceName: frame.deviceName, // 补充deviceName字段,确保mqtt topic正确 device: frame.device, timestamp: frame.timestamp, receivedAt: frame.receivedAt || nowIso(), source: frame.source, params: frame.params || [], latest: frame.latest || {}, }; return out; } function normalizePublishTargets(targets, fallback = ['mqtt']) { // 仅允许已支持的发布目标,自动去重;非法值会被忽略。 const source = Array.isArray(targets) && targets.length > 0 ? targets : fallback; const allowed = new Set(['mqtt', 'aliyunIot']); const out = []; source.forEach((item) => { const key = String(item || '').trim(); if (!allowed.has(key)) return; if (!out.includes(key)) out.push(key); }); return out.length > 0 ? out : fallback; } async function main() { // main 负责装配所有组件,不直接处理协议细节。 const configArgIndex = process.argv.indexOf('--config'); // 配置文件默认查找策略: // - pkg 打包运行:使用 exe 同级目录 // - node 脚本运行:使用当前工作目录(npm run gateway 时即项目根目录) const baseDir = process.pkg ? path.dirname(process.execPath) : process.cwd(); const configPath = (configArgIndex !== -1 && process.argv[configArgIndex + 1]) ? path.resolve(process.cwd(), process.argv[configArgIndex + 1]) : path.resolve(baseDir, 'gateway.config.json'); if (!fs.existsSync(configPath)) { console.error(`❌ 配置文件不存在: ${configPath}`); console.error('请先复制 gateway.config.example.json 为 gateway.config.json 并填写参数。'); process.exit(1); } const config = readJson(configPath); const runtimeLogger = setupPersistentConsoleLogger(config.runtimeLog || {}); const cacheStore = new LatestCacheStore(path.resolve(process.cwd(), config.cacheFile || './cache/latest-device-cache.json')); const mqttPublisher = new MqttPublisher(config.services?.mqtt || {}); const aliyunPublisher = new AliyunIotPublisher(config.services?.aliyunIot || {}); const globalPublishTargets = normalizePublishTargets(config.publishTargets, ['mqtt']); console.log(`🚀 网关启动完成 | config=${configPath}`); console.log(`📦 全局转发目标: ${globalPublishTargets.join(', ')}`); if ((config.services?.aliyunIot?.enabled === true) && !globalPublishTargets.includes('aliyunIot')) { console.warn('⚠️ 配置检查: aliyunIot.enabled=true,但 publishTargets 未包含 aliyunIot,当前不会上报阿里云。'); } if ((config.services?.mqtt?.enabled === true) && !globalPublishTargets.includes('mqtt')) { console.warn('⚠️ 配置检查: mqtt.enabled=true,但 publishTargets 未包含 mqtt,当前不会上报 MQTT。'); } const onFrame = async (frameObj) => { // 统一分发入口:缓存 -> 计算发布目标 -> 执行发布 -> 记录结果。 const frameData = normalizeFrameForCache(frameObj); cacheStore.set(frameData.deviceKey, frameData); console.log(`💾 缓存更新: key=${frameData.deviceKey} params=${frameData.params.length}`); const publishTargets = normalizePublishTargets(frameObj.devicePublishTargets, globalPublishTargets); console.log(`📮 [转发准备] device=${frameData.deviceKey} targets=${publishTargets.join(',')} ts=${frameData.timestamp || '-'}`); logEvent('info', 'dispatch_prepare', { deviceKey: frameData.deviceKey, deviceNo: frameData.device || null, frameTimestamp: frameData.timestamp || null, targets: publishTargets, }); try { if (publishTargets.includes('mqtt')) { console.log(`➡️ [转发执行] MQTT | device=${frameData.deviceKey}`); logEvent('info', 'dispatch_execute_mqtt', { deviceKey: frameData.deviceKey, frameTimestamp: frameData.timestamp || null, }); mqttPublisher.publish(frameData); } if (publishTargets.includes('aliyunIot')) { console.log(`➡️ [转发执行] 阿里云IoT | device=${frameData.deviceKey}`); logEvent('info', 'dispatch_execute_aliyun', { deviceKey: frameData.deviceKey, frameTimestamp: frameData.timestamp || null, }); await aliyunPublisher.publish(frameData); } console.log(`✅ [转发完成] device=${frameData.deviceKey}`); logEvent('info', 'dispatch_success', { deviceKey: frameData.deviceKey, frameTimestamp: frameData.timestamp || null, targets: publishTargets, dispatchAt: nowIso(), }); } catch (error) { console.error(`❌ 分发失败 (${frameData.deviceKey}): ${error.message}`); logEvent('error', 'dispatch_failed', { deviceKey: frameData.deviceKey, frameTimestamp: frameData.timestamp || null, targets: publishTargets, error: error.message, }); } }; const devices = config.devices || []; if (devices.length === 0) { console.error('❌ 未配置设备列表(devices 为空)'); process.exit(1); } const clients = devices.map(device => new DeviceClient(device, config, onFrame)); clients.forEach(client => client.start()); const healthServer = setupHealthProbe(config.healthProbe || {}, clients); let shuttingDown = false; const shutdown = (signal = 'SIGINT') => { if (shuttingDown) return; shuttingDown = true; console.warn(`🛑 收到 ${signal},准备安全退出...`); clients.forEach((client) => client.stop()); mqttPublisher.close(); aliyunPublisher.close(); if (healthServer) { try { healthServer.close(); } catch (_) { } } cacheStore.flushNow(); runtimeLogger.close(); process.exit(0); }; process.on('SIGINT', () => shutdown('SIGINT')); process.on('SIGTERM', () => shutdown('SIGTERM')); } if (require.main === module) { main().catch((error) => { console.error(`❌ 程序启动失败: ${error.message}`); process.exit(1); }); }