const fs = require('fs');
|
const path = require('path');
|
const net = require('net');
|
const http = require('http');
|
const crypto = require('crypto');
|
const axios = require('axios');
|
const mqtt = require('mqtt');
|
const Qs = require('qs');
|
const util = require('util');
|
const winston = require('winston');
|
require('winston-daily-rotate-file');
|
const { extractFramesFromUtf16leBuffer } = require('./framing');
|
const { parseAndPrintData } = require('./index');
|
|
// ============================================================================
|
// 网关主流程(gateway.js)
|
// 1) 读取配置并初始化日志/缓存/发布器
|
// 2) 为每台透析机创建设备客户端 DeviceClient 并建立 TCP 连接
|
// 3) 收到字节流后进行分帧(UTF-16LE XML)
|
// 4) 调用 index.parseAndPrintData 解析 XML -> 结构化 frameObj
|
// 5) 统一标准化后写入本地缓存(latest-device-cache.json)
|
// 6) 按发布目标转发到 MQTT / 阿里云 IoT
|
// 7) 连接异常时记录日志并自动重连
|
// ============================================================================
|
|
function nowIso() {
|
return new Date().toISOString();
|
}
|
|
function readJson(filePath) {
|
return JSON.parse(fs.readFileSync(filePath, 'utf8'));
|
}
|
|
function stringifyLogArg(arg) {
|
if (typeof arg === 'string') return arg;
|
return util.inspect(arg, { depth: 6, colors: false, breakLength: 120 });
|
}
|
|
let auditLogger = null;
|
|
function setAuditLogger(logger) {
|
auditLogger = logger;
|
}
|
|
function logEvent(level, event, fields = {}) {
|
if (!auditLogger) return;
|
auditLogger.log({
|
level,
|
message: event,
|
event,
|
...fields,
|
});
|
}
|
|
// 日志级别对应的图标,提升控制台/文件可读性
|
const LEVEL_ICON = { error: '❌', warn: '⚠️ ', info: 'ℹ️ ', debug: '🔍' };
|
|
function setupPersistentConsoleLogger(config = {}) {
|
// 日志目录:统一存放到 logs/ 子目录,按日期自动分文件
|
// 文件命名:
|
// logs/gateway-YYYY-MM-DD.log 人类可读文本(每行一条)
|
// logs/gateway-YYYY-MM-DD.jsonl 结构化 JSON(按设备/事件检索)
|
// 保留天数:默认 30 天,超出自动删除
|
const logsDir = path.resolve(process.cwd(), config.logsDir || 'logs');
|
if (!fs.existsSync(logsDir)) fs.mkdirSync(logsDir, { recursive: true });
|
|
const maxDays = String(config.maxDays || 30) + 'd';
|
const logLevel = config.level || 'info';
|
|
// ── 格式1:人类可读文本 ──────────────────────────────────────────────
|
// 示例:
|
// 2026-03-15 14:23:01 INFO ✅ [设备接入] 连接成功 | name=床位01 endpoint=192.168.1.101:3021
|
// 2026-03-15 14:23:05 WARN ⚠️ [设备断开] 连接关闭 | name=床位01 abnormal=yes
|
const textFormat = winston.format.combine(
|
winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
|
winston.format.printf(({ timestamp, level, message, event, ...meta }) => {
|
const icon = LEVEL_ICON[level] || ' ';
|
const lvl = level.toUpperCase().padEnd(5);
|
// 结构化事件额外附加关键字段(设备名、主机等),方便肉眼扫描
|
const extras = [];
|
if (event) extras.push(`event=${event}`);
|
if (meta.deviceName) extras.push(`device=${meta.deviceName}`);
|
if (meta.deviceKey) extras.push(`key=${meta.deviceKey}`);
|
if (meta.host && meta.port) extras.push(`endpoint=${meta.host}:${meta.port}`);
|
if (meta.error) extras.push(`err=${meta.error}`);
|
if (meta.payloadBytes != null) extras.push(`bytes=${meta.payloadBytes}`);
|
const suffix = extras.length ? ` [${extras.join(' | ')}]` : '';
|
return `${timestamp} ${lvl} ${icon} ${message}${suffix}`;
|
})
|
);
|
|
// ── 格式2:结构化 JSON ───────────────────────────────────────────────
|
// 每行一个 JSON 对象,包含所有字段,供程序解析/查询
|
const jsonFormat = winston.format.combine(
|
winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }),
|
winston.format.json()
|
);
|
|
// ── 传输层:winston-daily-rotate-file ───────────────────────────────
|
const textTransport = new winston.transports.DailyRotateFile({
|
dirname: logsDir,
|
filename: 'gateway-%DATE%.log',
|
datePattern: 'YYYY-MM-DD',
|
maxFiles: maxDays,
|
format: textFormat,
|
auditFile: path.join(logsDir, '.audit-text.json'),
|
});
|
|
const jsonTransport = new winston.transports.DailyRotateFile({
|
dirname: logsDir,
|
filename: 'gateway-%DATE%.jsonl',
|
datePattern: 'YYYY-MM-DD',
|
maxFiles: maxDays,
|
format: jsonFormat,
|
auditFile: path.join(logsDir, '.audit-json.json'),
|
});
|
|
const logger = winston.createLogger({
|
level: logLevel,
|
transports: [textTransport, jsonTransport],
|
});
|
|
setAuditLogger(logger);
|
|
// ── 拦截 console,同时输出到终端 + 文件 ─────────────────────────────
|
const original = {
|
log: console.log.bind(console),
|
warn: console.warn.bind(console),
|
error: console.error.bind(console),
|
};
|
|
const write = (level, args, consoleWriter) => {
|
consoleWriter(...args);
|
try {
|
const content = args.map(stringifyLogArg).join(' ');
|
logger.log({ level, message: content });
|
} catch (e) {
|
original.error(`❌ 日志落盘失败: ${e.message}`);
|
}
|
};
|
|
console.log = (...args) => write('info', args, original.log);
|
console.warn = (...args) => write('warn', args, original.warn);
|
console.error = (...args) => write('error', args, original.error);
|
|
process.on('uncaughtException', (error) => console.error('❌ 未捕获异常:', error?.stack ?? error));
|
process.on('unhandledRejection', (reason) => console.error('❌ 未处理 Promise 拒绝:', reason));
|
|
// 当天日志文件路径(供启动提示用)
|
const today = new Date().toISOString().slice(0, 10);
|
const todayLog = path.join(logsDir, `gateway-${today}.log`);
|
const todayJson = path.join(logsDir, `gateway-${today}.jsonl`);
|
|
console.log(`📝 日志已启用 → ${logsDir}`);
|
console.log(` 文本: gateway-YYYY-MM-DD.log JSON: gateway-YYYY-MM-DD.jsonl 保留: ${maxDays}`);
|
logEvent('info', 'logger_initialized', {
|
logsDir,
|
todayLog,
|
todayJson,
|
level: logLevel,
|
maxDays,
|
});
|
|
return {
|
logsDir,
|
todayLog,
|
todayJson,
|
close: () => {
|
try { logger.end(); } catch (_) {}
|
},
|
};
|
}
|
|
function safePreview(text, maxLen = 220) {
|
const s = String(text || '').replace(/\s+/g, ' ').trim();
|
if (s.length <= maxLen) return s;
|
return `${s.slice(0, maxLen)}...`;
|
}
|
|
class LatestCacheStore {
|
constructor(cacheFilePath) {
|
this.cacheFilePath = cacheFilePath;
|
this.map = new Map();
|
this.flushTimer = null;
|
this.load();
|
}
|
|
load() {
|
if (!fs.existsSync(this.cacheFilePath)) return;
|
try {
|
const obj = readJson(this.cacheFilePath);
|
Object.entries(obj).forEach(([key, value]) => this.map.set(key, value));
|
console.log(`🗂️ 已加载本地缓存: ${this.cacheFilePath}, keys=${this.map.size}`);
|
} catch (error) {
|
console.warn(`⚠️ 缓存文件加载失败: ${error.message}`);
|
}
|
}
|
|
set(deviceKey, payload) {
|
this.map.set(deviceKey, payload);
|
this.scheduleFlush();
|
}
|
|
get(deviceKey) {
|
return this.map.get(deviceKey);
|
}
|
|
toObject() {
|
const obj = {};
|
for (const [key, value] of this.map.entries()) obj[key] = value;
|
return obj;
|
}
|
|
scheduleFlush() {
|
if (this.flushTimer) return;
|
this.flushTimer = setTimeout(() => {
|
this.flushTimer = null;
|
this.flushNow();
|
}, 200);
|
}
|
|
flushNow() {
|
try {
|
const folder = path.dirname(this.cacheFilePath);
|
if (!fs.existsSync(folder)) fs.mkdirSync(folder, { recursive: true });
|
fs.writeFileSync(this.cacheFilePath, JSON.stringify(this.toObject(), null, 2), 'utf8');
|
} catch (error) {
|
console.error(`❌ 缓存落盘失败: ${error.message}`);
|
logEvent('error', 'cache_flush_failed', {
|
cacheFile: this.cacheFilePath,
|
error: error.message,
|
});
|
}
|
}
|
}
|
|
class MqttPublisher {
|
constructor(config) {
|
this.config = config || {};
|
this.client = null;
|
|
this.failureQueueConfig = {
|
enabled: this.config.failureQueue?.enabled !== false,
|
filePath: path.resolve(process.cwd(), this.config.failureQueue?.filePath || './cache/mqtt-failed-queue.json'),
|
maxItems: Number(this.config.failureQueue?.maxItems) > 0 ? Number(this.config.failureQueue.maxItems) : 5000,
|
retryIntervalMs: Number(this.config.failureQueue?.retryIntervalMs) > 0 ? Number(this.config.failureQueue.retryIntervalMs) : 5000,
|
retryBatchSize: Number(this.config.failureQueue?.retryBatchSize) > 0 ? Number(this.config.failureQueue.retryBatchSize) : 100,
|
dedupeEnabled: this.config.failureQueue?.dedupeEnabled !== false,
|
dedupeFields: Array.isArray(this.config.failureQueue?.dedupeFields) && this.config.failureQueue.dedupeFields.length > 0
|
? this.config.failureQueue.dedupeFields
|
: ['topic', 'deviceKey', 'frameTimestamp'],
|
};
|
this.failureQueue = [];
|
this.queueFlushTimer = null;
|
this.retryTimer = null;
|
this.retryInProgress = false;
|
|
this.loadFailureQueue();
|
}
|
|
isEnabled() {
|
return !!this.config.enabled;
|
}
|
|
loadFailureQueue() {
|
if (!this.failureQueueConfig.enabled) return;
|
const fp = this.failureQueueConfig.filePath;
|
if (!fs.existsSync(fp)) return;
|
|
try {
|
const parsed = readJson(fp);
|
if (Array.isArray(parsed)) {
|
this.failureQueue = parsed;
|
console.log(`📦 已加载 MQTT 失败队列: ${fp}, items=${this.failureQueue.length}`);
|
logEvent('info', 'mqtt_queue_loaded', {
|
filePath: fp,
|
items: this.failureQueue.length,
|
});
|
}
|
} catch (error) {
|
console.error(`❌ MQTT 失败队列加载失败: ${error.message}`);
|
logEvent('error', 'mqtt_queue_load_failed', {
|
filePath: fp,
|
error: error.message,
|
});
|
}
|
}
|
|
flushFailureQueueNow() {
|
if (!this.failureQueueConfig.enabled) return;
|
try {
|
const fp = this.failureQueueConfig.filePath;
|
const folder = path.dirname(fp);
|
if (!fs.existsSync(folder)) fs.mkdirSync(folder, { recursive: true });
|
fs.writeFileSync(fp, JSON.stringify(this.failureQueue, null, 2), 'utf8');
|
} catch (error) {
|
console.error(`❌ MQTT 失败队列落盘失败: ${error.message}`);
|
logEvent('error', 'mqtt_queue_flush_failed', {
|
filePath: this.failureQueueConfig.filePath,
|
error: error.message,
|
});
|
}
|
}
|
|
scheduleFailureQueueFlush() {
|
if (!this.failureQueueConfig.enabled) return;
|
if (this.queueFlushTimer) return;
|
this.queueFlushTimer = setTimeout(() => {
|
this.queueFlushTimer = null;
|
this.flushFailureQueueNow();
|
}, 300);
|
}
|
|
buildDedupeKey(item) {
|
const fields = this.failureQueueConfig.dedupeFields || [];
|
if (fields.length === 0) return null;
|
const key = fields.map((field) => {
|
const val = item?.[field];
|
return `${field}=${val == null ? '' : String(val)}`;
|
}).join('|');
|
return key || null;
|
}
|
|
enqueueFailedPublish(item, reason = 'publish_failed') {
|
if (!this.failureQueueConfig.enabled) return;
|
|
const queueItem = {
|
...item,
|
reason,
|
enqueuedAt: nowIso(),
|
};
|
|
if (this.failureQueueConfig.dedupeEnabled) {
|
const key = this.buildDedupeKey(queueItem);
|
if (key) {
|
const idx = this.failureQueue.findIndex((it) => this.buildDedupeKey(it) === key);
|
if (idx >= 0) {
|
this.failureQueue[idx] = queueItem;
|
this.scheduleFailureQueueFlush();
|
this.scheduleRetryDrain();
|
logEvent('info', 'mqtt_queue_deduped', {
|
dedupeKey: key,
|
queueSize: this.failureQueue.length,
|
topic: item.topic,
|
deviceKey: item.deviceKey || null,
|
});
|
return;
|
}
|
}
|
}
|
|
if (this.failureQueue.length >= this.failureQueueConfig.maxItems) {
|
const dropped = this.failureQueue.shift();
|
logEvent('warn', 'mqtt_queue_drop_oldest', {
|
maxItems: this.failureQueueConfig.maxItems,
|
droppedTopic: dropped?.topic || null,
|
});
|
}
|
|
this.failureQueue.push(queueItem);
|
this.scheduleFailureQueueFlush();
|
this.scheduleRetryDrain();
|
|
logEvent('warn', 'mqtt_queue_enqueued', {
|
topic: item.topic,
|
deviceKey: item.deviceKey || null,
|
queueSize: this.failureQueue.length,
|
reason,
|
});
|
}
|
|
attemptPublish(topic, payload, qos) {
|
return new Promise((resolve, reject) => {
|
if (!this.client || !this.client.connected) {
|
reject(new Error('MQTT 未连接'));
|
return;
|
}
|
|
this.client.publish(topic, payload, { qos }, (err) => {
|
if (err) {
|
reject(err);
|
return;
|
}
|
resolve();
|
});
|
});
|
}
|
|
scheduleRetryDrain() {
|
if (!this.failureQueueConfig.enabled) return;
|
if (this.retryTimer) return;
|
this.retryTimer = setTimeout(() => {
|
this.retryTimer = null;
|
this.drainFailureQueue('timer').catch((error) => {
|
console.warn(`⚠️ MQTT 队列重试异常: ${error.message}`);
|
});
|
}, this.failureQueueConfig.retryIntervalMs);
|
}
|
|
async drainFailureQueue(trigger = 'manual') {
|
if (!this.failureQueueConfig.enabled) return;
|
if (this.retryInProgress) return;
|
if (!this.client || !this.client.connected) {
|
this.scheduleRetryDrain();
|
return;
|
}
|
if (this.failureQueue.length === 0) return;
|
|
this.retryInProgress = true;
|
let sent = 0;
|
try {
|
const batchSize = this.failureQueueConfig.retryBatchSize;
|
const totalBefore = this.failureQueue.length;
|
|
while (this.failureQueue.length > 0 && sent < batchSize) {
|
const item = this.failureQueue[0];
|
try {
|
await this.attemptPublish(item.topic, item.payload, item.qos);
|
this.failureQueue.shift();
|
sent += 1;
|
} catch (error) {
|
logEvent('warn', 'mqtt_queue_retry_failed', {
|
topic: item.topic,
|
queueSize: this.failureQueue.length,
|
error: error.message,
|
});
|
break;
|
}
|
}
|
|
if (sent > 0) {
|
this.scheduleFailureQueueFlush();
|
logEvent('info', 'mqtt_queue_drained', {
|
trigger,
|
sent,
|
remain: this.failureQueue.length,
|
totalBefore,
|
});
|
}
|
} finally {
|
this.retryInProgress = false;
|
if (this.failureQueue.length > 0) this.scheduleRetryDrain();
|
}
|
}
|
|
ensureConnected() {
|
if (this.client) return;
|
|
const mqttConfig = this.config;
|
this.client = mqtt.connect(mqttConfig.brokerUrl, {
|
clientId: mqttConfig.clientId || `artis-gateway-${Date.now()}`,
|
username: mqttConfig.username,
|
password: mqttConfig.password,
|
reconnectPeriod: mqttConfig.reconnectMs || 3000,
|
clean: true,
|
connectTimeout: 10_000,
|
});
|
|
this.client.on('connect', () => {
|
console.log(`✅ MQTT 已连接: ${mqttConfig.brokerUrl}`);
|
logEvent('info', 'mqtt_connected', {
|
brokerUrl: mqttConfig.brokerUrl,
|
clientId: mqttConfig.clientId || null,
|
});
|
this.drainFailureQueue('connect').catch((error) => {
|
console.warn(`⚠️ MQTT 队列重放失败: ${error.message}`);
|
});
|
});
|
|
this.client.on('error', (error) => {
|
console.error(`❌ MQTT 错误: ${error.message}`);
|
logEvent('error', 'mqtt_error', {
|
brokerUrl: mqttConfig.brokerUrl,
|
error: error.message,
|
});
|
});
|
|
this.client.on('offline', () => {
|
logEvent('warn', 'mqtt_offline', { brokerUrl: mqttConfig.brokerUrl });
|
});
|
|
this.client.on('close', () => {
|
logEvent('warn', 'mqtt_closed', { brokerUrl: mqttConfig.brokerUrl });
|
});
|
}
|
|
publish(frameData) {
|
// 发布输入:标准化后的 frameData(包含 device/timestamp/params/latest/source)
|
// 发布输出:topic = {topicPrefix}/{deviceKey}/latest
|
// payload 为完全扁平的物模型对象,所有字段在同一层,与服务端接收格式一致。
|
if (!this.isEnabled()) return;
|
this.ensureConnected();
|
|
const topicPrefix = this.config.topicPrefix || 'dialysis';
|
// 统一用 deviceName 作为 key
|
const topic = `${topicPrefix}/${frameData.deviceName}`;
|
const qos = Number.isInteger(this.config.qos) ? this.config.qos : 1;
|
|
// 从 latest(OBX id -> {value,unit})中取指定 OBX 字段的值
|
const latest = frameData.latest || {};
|
const pick = (obxId) => {
|
const p = latest[String(obxId)];
|
return p != null ? p.value : undefined;
|
};
|
|
// 构造与物模型一致的扁平 payload,所有字段在同一层
|
const mqttPayload = {
|
ts: Date.now(), // 毫秒时间戳
|
suedtime: frameData.timestamp || null, // 帧原始时间(YYYYMMDDHHmmss)
|
deviceId: frameData.deviceName || null, // 设备唯一标识(用name)
|
deviceNo: frameData.device || null, // 机器序列号
|
deviceType: this.config.deviceType || 'Artis', // 设备型号
|
IPAddress: frameData.source?.host || null, // 设备 IP
|
zljd: pick(0), // 治疗阶段
|
sysj: pick(1), // 剩余时间(s)
|
A: pick(34), // 设定超滤量(l)
|
B: pick(2), // 超滤量(l)
|
C: pick(3), // 超滤率(l/hr)
|
D: pick(6), // 血流速(ml/min)
|
sdxll: pick(32), // 设定血流速(ml/min)
|
F: pick(7), // 温度(cel)
|
J: pick(12), // TMP(mmHg)
|
H: pick(13), // 静脉压(mmHg)
|
o: pick(14), // 动脉压(mmHg)
|
L: pick(8), // 透析液流速(ml/min)
|
G: pick(4), // 脱水量(l)
|
xinlv: pick(61), // 心率(pulse)
|
qfqb: pick(65), // QF/QB(%)
|
convVol: pick(67), // Conv Vol(l)
|
convClear: pick(68), // Conv Clear(l/hr)
|
};
|
|
// 去除 undefined 值,保持 payload 干净
|
Object.keys(mqttPayload).forEach(k => {
|
if (mqttPayload[k] === undefined) delete mqttPayload[k];
|
});
|
|
const payload = JSON.stringify(mqttPayload);
|
|
this.attemptPublish(topic, payload, qos).then(() => {
|
const ts = new Date().toISOString().replace('T', ' ').slice(0, 19);
|
console.log(`\n📤 [${ts}] MQTT 发布成功`);
|
console.log(` Topic : ${topic}`);
|
console.log(` Device : ${frameData.deviceName || '-'}`);
|
console.log(` Time : ${frameData.timestamp || '-'}`);
|
const fieldCount = Object.keys(mqttPayload).length;
|
console.log(` Fields : ${fieldCount} 个物模型字段`);
|
console.log(` Size : ${Buffer.byteLength(payload)} bytes`);
|
logEvent('info', 'mqtt_publish_success', {
|
topic,
|
deviceKey: frameData.deviceName,
|
deviceNo: frameData.device || null,
|
sourceHost: frameData.source?.host || null,
|
sourcePort: frameData.source?.port || null,
|
frameTimestamp: frameData.timestamp || null,
|
sentAt: nowIso(),
|
payloadBytes: Buffer.byteLength(payload),
|
payload: mqttPayload,
|
});
|
}).catch((err) => {
|
console.error(`❌ MQTT 发布失败 (${topic}): ${err.message}`);
|
logEvent('error', 'mqtt_publish_failed', {
|
topic,
|
deviceKey: frameData.deviceKey,
|
timestamp: frameData.timestamp,
|
error: err.message,
|
});
|
|
this.enqueueFailedPublish({
|
topic,
|
payload,
|
qos,
|
deviceKey: frameData.deviceName,
|
frameTimestamp: frameData.timestamp || null,
|
}, err.message || 'publish_failed');
|
});
|
}
|
|
close() {
|
if (this.queueFlushTimer) {
|
clearTimeout(this.queueFlushTimer);
|
this.queueFlushTimer = null;
|
}
|
if (this.retryTimer) {
|
clearTimeout(this.retryTimer);
|
this.retryTimer = null;
|
}
|
|
this.flushFailureQueueNow();
|
|
if (!this.client) return;
|
try {
|
this.client.end(true);
|
logEvent('info', 'mqtt_client_closed', {
|
queueRemain: this.failureQueue.length,
|
});
|
} catch (error) {
|
console.warn(`⚠️ MQTT 关闭异常: ${error.message}`);
|
} finally {
|
this.client = null;
|
}
|
}
|
}
|
|
class AliyunIotPublisher {
|
constructor(config) {
|
this.config = config || {};
|
this.tripleCache = new Map();
|
this.clientMap = new Map();
|
this.propertyTypeMap = new Map();
|
this.allowedIdentifiers = null;
|
|
this.productKey = this.config.productKey || null;
|
this.region = this.config.region || 'cn-shanghai';
|
this.deviceNamePrefix = this.config.deviceNameRule?.prefix || '';
|
this.deviceSecretStoreFile = path.resolve(
|
process.cwd(),
|
this.config.deviceSecretStoreFile || './cache/aliyun-device-secrets.json'
|
);
|
this.deviceSecretStore = {};
|
|
this.tripleApi = this.config.tripleApi || {};
|
this.defaultTripleApiConfig = {
|
baseURL: 'https://things.icoldchain.cn/',
|
url: 'device/info/getAliyunDeviceSecret',
|
method: 'post',
|
deviceNameField: 'deviceName',
|
isAutoRegister: 1,
|
timeoutMs: 15000,
|
};
|
|
this.loadDeviceSecretStore();
|
this.loadThingModel();
|
}
|
|
isEnabled() {
|
return !!this.config.enabled;
|
}
|
|
loadDeviceSecretStore() {
|
if (!fs.existsSync(this.deviceSecretStoreFile)) {
|
this.deviceSecretStore = {};
|
return;
|
}
|
try {
|
const parsed = readJson(this.deviceSecretStoreFile);
|
this.deviceSecretStore = parsed && typeof parsed === 'object' ? parsed : {};
|
console.log(`🔐 已加载阿里云设备密钥缓存: ${this.deviceSecretStoreFile}, count=${Object.keys(this.deviceSecretStore).length}`);
|
} catch (error) {
|
this.deviceSecretStore = {};
|
console.warn(`⚠️ 阿里云设备密钥缓存加载失败: ${error.message}`);
|
}
|
}
|
|
saveDeviceSecretStore() {
|
try {
|
const folder = path.dirname(this.deviceSecretStoreFile);
|
if (!fs.existsSync(folder)) fs.mkdirSync(folder, { recursive: true });
|
fs.writeFileSync(this.deviceSecretStoreFile, JSON.stringify(this.deviceSecretStore, null, 2), 'utf8');
|
} catch (error) {
|
console.error(`❌ 阿里云设备密钥缓存保存失败: ${error.message}`);
|
logEvent('error', 'aliyun_secret_store_save_failed', {
|
filePath: this.deviceSecretStoreFile,
|
error: error.message,
|
});
|
}
|
}
|
|
buildDeviceName(deviceNo) {
|
const mode = this.config.deviceNameRule?.mode || 'fromDeviceNo';
|
const base = String(deviceNo || '').trim();
|
if (!base) throw new Error('设备编号为空,无法生成 deviceName');
|
const safe = base.replace(/[^a-zA-Z0-9._-]/g, '_');
|
if (mode !== 'fromDeviceNo') {
|
throw new Error(`不支持的 deviceNameRule.mode: ${mode}`);
|
}
|
return `${this.deviceNamePrefix}${safe}`;
|
}
|
|
async getTripleByApi(deviceNo, deviceName) {
|
const api = { ...this.defaultTripleApiConfig, ...(this.tripleApi || {}) };
|
const requestBody = {
|
isAutoRegister: api.isAutoRegister,
|
[api.deviceNameField || 'deviceName']: deviceName,
|
...(api.extraForm || {}),
|
};
|
|
console.log(`🛰️ [三元组接口请求] baseURL=${api.baseURL} url=${api.url} deviceName=${deviceName}`);
|
logEvent('info', 'aliyun_triple_api_request', {
|
baseURL: api.baseURL,
|
url: api.url,
|
method: api.method,
|
request: requestBody,
|
});
|
|
try {
|
const res = await axios({
|
url: api.url,
|
method: api.method || 'post',
|
baseURL: api.baseURL,
|
headers: {
|
'Content-Type': 'application/x-www-form-urlencoded',
|
...(api.headers || {}),
|
},
|
timeout: Number(api.timeoutMs) > 0 ? Number(api.timeoutMs) : 15000,
|
data: Qs.stringify(requestBody),
|
});
|
|
const responseData = res.data;
|
try {
|
const preview = JSON.stringify(responseData, null, 2);
|
console.log('🛰️ [三元组接口原始响应]', `status=${res.status}`, '\n', preview);
|
} catch (_) {
|
console.log('🛰️ [三元组接口原始响应]', `status=${res.status}`, responseData);
|
}
|
logEvent('info', 'aliyun_triple_api_response', {
|
httpStatus: res.status,
|
response: responseData,
|
});
|
|
const body = responseData?.data && typeof responseData.data === 'object'
|
? responseData.data
|
: responseData;
|
|
const code = responseData?.code ?? responseData?.Code ?? body?.code ?? body?.Code ?? 200;
|
const success = code === 200 || code === '200' || code === 0 || responseData?.success === true || body?.success === true;
|
const message = responseData?.message || responseData?.Message || body?.message || body?.Message || null;
|
|
const productKey = body?.productKey || responseData?.productKey || this.productKey;
|
const outDeviceName = body?.deviceName || responseData?.deviceName || deviceName;
|
const deviceSecret = body?.deviceSecret || responseData?.deviceSecret;
|
const region = body?.region || responseData?.region || this.region;
|
|
if (!success || !productKey || !outDeviceName || !deviceSecret) {
|
throw new Error(`三元组接口返回异常(code=${code}): ${message || JSON.stringify(responseData)}`);
|
}
|
|
return {
|
productKey,
|
deviceName: outDeviceName,
|
deviceSecret,
|
region,
|
};
|
} catch (error) {
|
const status = error?.response?.status;
|
const data = error?.response?.data;
|
const detail = data ? JSON.stringify(data) : error.message;
|
logEvent('error', 'aliyun_triple_api_failed', {
|
status: status || null,
|
error: detail,
|
deviceNo,
|
deviceName,
|
});
|
throw new Error(`三元组接口调用失败: ${status || '-'} ${detail}`);
|
}
|
}
|
|
getPreRegisteredTriple(deviceNo, deviceName) {
|
const map = this.config.preRegisteredDevices || {};
|
const direct = map[deviceNo] || map[deviceName];
|
if (!direct) return null;
|
|
// 支持两种写法:
|
// 1) "设备编号": "deviceSecret"
|
// 2) "设备编号": { "deviceName": "xxx", "deviceSecret": "xxx" }
|
if (typeof direct === 'string') {
|
return {
|
productKey: this.productKey,
|
deviceName,
|
deviceSecret: direct,
|
};
|
}
|
|
if (typeof direct === 'object' && direct.deviceSecret) {
|
return {
|
productKey: this.productKey,
|
deviceName: direct.deviceName || deviceName,
|
deviceSecret: direct.deviceSecret,
|
};
|
}
|
|
return null;
|
}
|
|
loadThingModel() {
|
const thingModelPath = this.config.thingModelPath
|
? path.resolve(process.cwd(), this.config.thingModelPath)
|
: path.resolve(process.cwd(), '阿里物模型.json');
|
|
if (!fs.existsSync(thingModelPath)) {
|
console.warn(`⚠️ 未找到物模型文件: ${thingModelPath},将按默认映射直接上报`);
|
return;
|
}
|
|
try {
|
const model = readJson(thingModelPath);
|
const props = Array.isArray(model.properties) ? model.properties : [];
|
const allowed = new Set();
|
|
props.forEach((prop) => {
|
if (!prop || !prop.identifier) return;
|
allowed.add(prop.identifier);
|
const type = prop.dataType?.type || 'text';
|
this.propertyTypeMap.set(prop.identifier, type);
|
});
|
|
this.allowedIdentifiers = allowed;
|
console.log(`✅ 已加载阿里物模型: ${thingModelPath}, properties=${allowed.size}`);
|
} catch (error) {
|
console.error(`❌ 物模型加载失败: ${error.message}`);
|
}
|
}
|
|
toInt(value) {
|
if (value == null || value === '') return undefined;
|
const n = Number(value);
|
if (Number.isNaN(n)) return undefined;
|
return Math.round(n);
|
}
|
|
pickLatestValue(latestMap, obxId) {
|
const p = latestMap?.[String(obxId)];
|
return p ? p.value : undefined;
|
}
|
|
normalizeByPropertyType(identifier, value) {
|
if (value == null) return undefined;
|
const type = this.propertyTypeMap.get(identifier);
|
if (!type) return value;
|
|
if (type === 'int') {
|
return this.toInt(value);
|
}
|
if (type === 'float' || type === 'double') {
|
const n = Number(value);
|
return Number.isNaN(n) ? undefined : n;
|
}
|
if (type === 'bool') {
|
if (typeof value === 'boolean') return value;
|
const s = String(value).toLowerCase();
|
if (s === 'true' || s === '1') return true;
|
if (s === 'false' || s === '0') return false;
|
return undefined;
|
}
|
return String(value);
|
}
|
|
mapFrameToThingModelParams(frameData) {
|
// 将 OBX 最新值映射为阿里物模型 identifier -> value。
|
// 若加载了物模型文件,则会按 allowedIdentifiers + dataType 做过滤与类型归一。
|
const latest = frameData.latest || {};
|
|
const mapped = {
|
n: frameData.device || frameData.deviceKey,
|
deviceType: this.config.deviceType || 'Artis',
|
IPAddress: frameData.source?.host,
|
zljd: this.pickLatestValue(latest, 0),
|
sysj: this.pickLatestValue(latest, 1),
|
A: this.pickLatestValue(latest, 34),
|
B: this.pickLatestValue(latest, 2),
|
C: this.pickLatestValue(latest, 3),
|
D: this.pickLatestValue(latest, 6),
|
sdxll: this.pickLatestValue(latest, 32),
|
F: this.pickLatestValue(latest, 7),
|
J: this.pickLatestValue(latest, 12),
|
H: this.pickLatestValue(latest, 13),
|
o: this.pickLatestValue(latest, 14),
|
L: this.pickLatestValue(latest, 8),
|
G: this.pickLatestValue(latest, 4),
|
};
|
|
const out = {};
|
Object.entries(mapped).forEach(([identifier, rawVal]) => {
|
if (rawVal == null || rawVal === '') return;
|
if (this.allowedIdentifiers && !this.allowedIdentifiers.has(identifier)) return;
|
|
const normalizedVal = this.normalizeByPropertyType(identifier, rawVal);
|
if (normalizedVal == null) return;
|
out[identifier] = normalizedVal;
|
});
|
|
return out;
|
}
|
|
async getTripleByDeviceNo(deviceNo) {
|
if (!deviceNo) throw new Error('设备编号为空,无法获取三元组');
|
|
if (this.tripleCache.has(deviceNo)) {
|
return this.tripleCache.get(deviceNo);
|
}
|
|
const deviceName = this.buildDeviceName(deviceNo);
|
const localCacheKey = this.productKey ? `${this.productKey}:${deviceName}` : null;
|
|
// 先用本地持久化密钥
|
const existedSecret = localCacheKey ? this.deviceSecretStore?.[localCacheKey] : null;
|
if (existedSecret && this.productKey) {
|
const triple = {
|
productKey: this.productKey,
|
deviceName,
|
deviceSecret: existedSecret,
|
};
|
this.tripleCache.set(deviceNo, triple);
|
return triple;
|
}
|
|
let triple;
|
if (this.tripleApi && (this.tripleApi.url || this.tripleApi.baseURL)) {
|
triple = await this.getTripleByApi(deviceNo, deviceName);
|
} else {
|
const fallback = this.getPreRegisteredTriple(deviceNo, deviceName);
|
if (fallback) {
|
console.warn(`⚠️ 未配置三元组接口,使用预注册设备密钥 | deviceNo=${deviceNo} deviceName=${fallback.deviceName}`);
|
logEvent('warn', 'aliyun_use_preregistered_triple', {
|
deviceNo,
|
deviceName: fallback.deviceName,
|
});
|
triple = fallback;
|
} else {
|
throw new Error('未配置三元组接口 (services.aliyunIot.tripleApi),且没有预注册设备密钥,无法获取三元组');
|
}
|
}
|
|
const cacheKey = `${triple.productKey}:${triple.deviceName}`;
|
this.deviceSecretStore[cacheKey] = triple.deviceSecret;
|
this.saveDeviceSecretStore();
|
logEvent('info', 'aliyun_device_registered', {
|
productKey: triple.productKey,
|
deviceName: triple.deviceName,
|
mode: (this.tripleApi && (this.tripleApi.url || this.tripleApi.baseURL)) ? 'tripleApi' : 'preRegistered',
|
});
|
|
this.tripleCache.set(deviceNo, triple);
|
return triple;
|
}
|
|
createAliyunPassword(clientId, deviceName, productKey, deviceSecret) {
|
const plain = `clientId${clientId}deviceName${deviceName}productKey${productKey}`;
|
return crypto.createHmac('sha1', deviceSecret).update(plain).digest('hex');
|
}
|
|
ensureClient(triple) {
|
const key = `${triple.productKey}:${triple.deviceName}`;
|
if (this.clientMap.has(key)) return this.clientMap.get(key);
|
|
// 按阿里云官方示例:签名使用基础 clientId,扩展参数通过 |securemode=3,signmethod=hmacsha1| 追加
|
const clientIdBase = `artisGateway_${triple.deviceName}`;
|
const clientIdRaw = `${clientIdBase}|securemode=3,signmethod=hmacsha1|`;
|
const password = this.createAliyunPassword(clientIdBase, triple.deviceName, triple.productKey, triple.deviceSecret);
|
const username = `${triple.deviceName}&${triple.productKey}`;
|
const region = triple.region || this.config.region || 'cn-shanghai';
|
const host = this.config.brokerHost || `${triple.productKey}.iot-as-mqtt.${region}.aliyuncs.com`;
|
const url = `mqtt://${host}:1883`;
|
|
const client = mqtt.connect(url, {
|
clientId: clientIdRaw,
|
username,
|
password,
|
reconnectPeriod: 3000,
|
clean: true,
|
connectTimeout: 10_000,
|
});
|
|
client.on('connect', () => {
|
console.log(`✅ 阿里云IoT已连接: ${key}`);
|
});
|
|
client.on('error', (error) => {
|
console.error(`❌ 阿里云IoT错误 (${key}): ${error.message}`);
|
});
|
|
this.clientMap.set(key, client);
|
return client;
|
}
|
|
buildThingModelPayload(frameData) {
|
const params = this.mapFrameToThingModelParams(frameData);
|
|
return {
|
id: `${Date.now()}`,
|
version: '1.0',
|
params,
|
method: 'thing.event.property.post',
|
};
|
}
|
|
async publish(frameData) {
|
if (!this.isEnabled()) return;
|
|
// 直接用 deviceName 作为唯一标识
|
const deviceName = frameData.deviceName;
|
if (!deviceName) {
|
console.warn('⚠️ 阿里云IoT跳过:未解析到设备名称');
|
return;
|
}
|
|
// 兼容 getTripleByDeviceNo 只用 name
|
const triple = await this.getTripleByDeviceNo(deviceName);
|
const client = this.ensureClient(triple);
|
const topic = `/sys/${triple.productKey}/${triple.deviceName}/thing/event/property/post`;
|
const payload = this.buildThingModelPayload(frameData);
|
|
client.publish(topic, JSON.stringify(payload), { qos: 1 }, (err) => {
|
if (err) {
|
console.error(`❌ 阿里云IoT发布失败 (${topic}): ${err.message}`);
|
return;
|
}
|
console.log(`📤 阿里云IoT发布成功: ${topic}`);
|
});
|
}
|
|
close() {
|
for (const [key, client] of this.clientMap.entries()) {
|
try {
|
client.end(true);
|
logEvent('info', 'aliyun_client_closed', { key });
|
} catch (error) {
|
console.warn(`⚠️ 阿里云IoT客户端关闭异常 (${key}): ${error.message}`);
|
}
|
}
|
this.clientMap.clear();
|
}
|
}
|
|
class DeviceClient {
|
constructor(deviceConfig, globalConfig, onFrame) {
|
this.deviceConfig = deviceConfig;
|
this.globalConfig = globalConfig;
|
this.onFrame = onFrame;
|
|
this.bufferCache = Buffer.alloc(0);
|
this.socket = null;
|
this.reconnectTimer = null;
|
this.stopped = false;
|
this.reconnectAttempts = 0;
|
this.maxBufferBytes = Number(this.globalConfig.maxBufferBytes) > 0
|
? Number(this.globalConfig.maxBufferBytes)
|
: 8 * 1024 * 1024;
|
|
this.lastConnectedAt = null;
|
this.lastDisconnectedAt = null;
|
this.lastDataAt = null;
|
this.lastFrameAt = null;
|
this.lastErrorAt = null;
|
this.lastError = null;
|
}
|
|
start() {
|
this.stopped = false;
|
this.connect();
|
}
|
|
stop() {
|
this.stopped = true;
|
if (this.reconnectTimer) {
|
clearTimeout(this.reconnectTimer);
|
this.reconnectTimer = null;
|
}
|
if (this.socket) {
|
try {
|
this.socket.removeAllListeners();
|
this.socket.destroy();
|
} catch (_) {
|
}
|
this.socket = null;
|
}
|
}
|
|
getHealthSnapshot() {
|
const isConnected = !!(this.socket && !this.socket.destroyed);
|
return {
|
name: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
connected: isConnected,
|
reconnectAttempts: this.reconnectAttempts,
|
bufferBytes: this.bufferCache.length,
|
maxBufferBytes: this.maxBufferBytes,
|
lastConnectedAt: this.lastConnectedAt,
|
lastDisconnectedAt: this.lastDisconnectedAt,
|
lastDataAt: this.lastDataAt,
|
lastFrameAt: this.lastFrameAt,
|
lastErrorAt: this.lastErrorAt,
|
lastError: this.lastError,
|
};
|
}
|
|
calcReconnectDelayMs() {
|
const policy = this.globalConfig.reconnectPolicy || {};
|
const baseMs = Number(policy.baseMs) > 0
|
? Number(policy.baseMs)
|
: (this.globalConfig.reconnectIntervalMs || 5000);
|
const maxMs = Number(policy.maxMs) > 0 ? Number(policy.maxMs) : 300000;
|
const factor = Number(policy.factor) > 1 ? Number(policy.factor) : 2;
|
const jitterRatio = Number(policy.jitterRatio) >= 0 ? Number(policy.jitterRatio) : 0.2;
|
|
const expDelay = Math.min(maxMs, Math.round(baseMs * Math.pow(factor, Math.max(0, this.reconnectAttempts))));
|
const jitter = Math.round(expDelay * jitterRatio);
|
const minDelay = Math.max(1000, expDelay - jitter);
|
const maxDelay = expDelay + jitter;
|
return Math.floor(Math.random() * (maxDelay - minDelay + 1)) + minDelay;
|
}
|
|
connect() {
|
if (this.stopped) return;
|
// 每台设备一个独立 socket:独立收包、独立重连、独立日志。
|
if (this.socket) {
|
this.socket.destroy();
|
this.socket = null;
|
}
|
|
const socket = new net.Socket();
|
this.socket = socket;
|
socket.setKeepAlive(true, 10_000);
|
|
console.log(`🔄 [设备接入] 准备连接 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port}`);
|
logEvent('info', 'device_connecting', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
});
|
|
socket.connect(this.deviceConfig.port, this.deviceConfig.host, () => {
|
this.reconnectAttempts = 0;
|
this.lastConnectedAt = nowIso();
|
console.log(`✅ [设备接入] 连接成功 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port}`);
|
logEvent('info', 'device_connected', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
connectedAt: nowIso(),
|
});
|
});
|
|
socket.on('end', () => {
|
console.warn(`⚠️ [设备断开] 远端主动结束连接(FIN) | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port}`);
|
logEvent('warn', 'device_remote_end', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
});
|
});
|
|
socket.on('data', (chunk) => {
|
// 数据链路:接收 chunk -> 追加缓存 -> 分帧 -> 逐帧解析 -> 回调上层分发。
|
this.lastDataAt = nowIso();
|
const cacheBefore = this.bufferCache.length;
|
console.log(`📥 [数据接收] 收到chunk | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} bytes=${chunk.length} cacheBefore=${cacheBefore}`);
|
logEvent('info', 'device_data_received', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
bytes: chunk.length,
|
cacheBefore,
|
receivedAt: nowIso(),
|
});
|
this.bufferCache = Buffer.concat([this.bufferCache, chunk]);
|
|
if (this.bufferCache.length > this.maxBufferBytes) {
|
const droppedBytes = this.bufferCache.length - this.maxBufferBytes;
|
this.bufferCache = this.bufferCache.slice(-this.maxBufferBytes);
|
console.warn(`⚠️ [内存保护] 缓存超限已截断 | name=${this.deviceConfig.name || '-'} dropped=${droppedBytes} keep=${this.maxBufferBytes}`);
|
logEvent('warn', 'device_buffer_trimmed', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
droppedBytes,
|
keepBytes: this.maxBufferBytes,
|
});
|
}
|
|
try {
|
const { frames, remainderBuffer } = extractFramesFromUtf16leBuffer(this.bufferCache);
|
this.bufferCache = remainderBuffer;
|
console.log(`🧱 [分帧结果] frames=${frames.length} cacheAfter=${this.bufferCache.length} | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port}`);
|
for (const frame of frames) {
|
const normalized = frame.replace(/^\uFEFF/, '').trim();
|
console.log(`🔍 [解析前] XML预览 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} xmlLen=${normalized.length} preview="${safePreview(normalized)}"`);
|
logEvent('info', 'device_frame_before_parse', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
xmlLen: normalized.length,
|
rawXmlPreview: safePreview(normalized, 1200),
|
});
|
parseAndPrintData(normalized, {
|
onFrame: (frameObj) => {
|
// 解析成功后补充来源信息与设备级覆盖发布目标,再交给 onFrame。
|
const paramCount = Array.isArray(frameObj.params) ? frameObj.params.length : 0;
|
console.log(`✅ [解析后] 结构化完成 | name=${this.deviceConfig.name || '-'} device=${frameObj.device || '-'} ts=${frameObj.timestamp || '-'} params=${paramCount}`);
|
logEvent('info', 'device_frame_parsed', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
deviceNo: frameObj.device || null,
|
frameTimestamp: frameObj.timestamp || null,
|
paramCount,
|
parsedParams: frameObj.params || [],
|
});
|
this.lastFrameAt = nowIso();
|
Promise.resolve(this.onFrame({
|
...frameObj,
|
source: {
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
},
|
devicePublishTargets: Array.isArray(this.deviceConfig.subscriptions)
|
? this.deviceConfig.subscriptions
|
: undefined,
|
deviceName: this.deviceConfig.name,
|
})).catch((error) => {
|
console.error(`❌ [分发异常] onFrame处理失败 | name=${this.deviceConfig.name || '-'} err=${error.message}`);
|
logEvent('error', 'device_onframe_failed', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
error: error.message,
|
});
|
});
|
},
|
});
|
}
|
} catch (error) {
|
console.warn(`⚠️ [分帧等待] 数据尚不足以形成完整帧 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} cacheNow=${this.bufferCache.length} err=${error.message}`);
|
logEvent('warn', 'device_frame_waiting', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
cacheNow: this.bufferCache.length,
|
error: error.message,
|
});
|
}
|
});
|
|
socket.on('error', (error) => {
|
this.lastErrorAt = nowIso();
|
this.lastError = error.message;
|
console.error(`❌ [设备异常] 连接错误 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} err=${error.message}`);
|
logEvent('error', 'device_socket_error', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
error: error.message,
|
});
|
});
|
|
socket.on('close', (hadError) => {
|
this.lastDisconnectedAt = nowIso();
|
console.warn(`🔌 [设备断开] 连接关闭 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} abnormal=${hadError ? 'yes' : 'no'}`);
|
logEvent('warn', 'device_closed', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
abnormal: !!hadError,
|
closedAt: nowIso(),
|
});
|
if (this.stopped) return;
|
this.scheduleReconnect();
|
});
|
}
|
|
scheduleReconnect() {
|
if (this.stopped) return;
|
if (this.reconnectTimer) return;
|
const ms = this.calcReconnectDelayMs();
|
this.reconnectAttempts += 1;
|
console.log(`⏳ [重连计划] ${ms}ms 后重连 | name=${this.deviceConfig.name || '-'} endpoint=${this.deviceConfig.host}:${this.deviceConfig.port} attempt=${this.reconnectAttempts}`);
|
logEvent('warn', 'device_reconnect_scheduled', {
|
deviceName: this.deviceConfig.name || null,
|
host: this.deviceConfig.host,
|
port: this.deviceConfig.port,
|
reconnectInMs: ms,
|
attempt: this.reconnectAttempts,
|
});
|
this.reconnectTimer = setTimeout(() => {
|
this.reconnectTimer = null;
|
this.connect();
|
}, ms);
|
}
|
}
|
|
function setupHealthProbe(config, clients) {
|
const probeConfig = config || {};
|
if (probeConfig.enabled === false) return null;
|
|
const host = probeConfig.host || '127.0.0.1';
|
const port = Number(probeConfig.port) > 0 ? Number(probeConfig.port) : 18080;
|
const staleFrameMs = Number(probeConfig.staleFrameMs) > 0 ? Number(probeConfig.staleFrameMs) : 180000;
|
|
const server = http.createServer((req, res) => {
|
const pathname = (req.url || '').split('?')[0];
|
if (pathname !== '/health' && pathname !== '/ready') {
|
res.statusCode = 404;
|
res.setHeader('Content-Type', 'application/json; charset=utf-8');
|
res.end(JSON.stringify({ error: 'not_found' }));
|
return;
|
}
|
|
const now = Date.now();
|
const devices = clients.map((client) => client.getHealthSnapshot());
|
const connectedDevices = devices.filter((d) => d.connected).length;
|
|
const freshDevices = devices.filter((d) => {
|
if (!d.lastFrameAt) return false;
|
return (now - Date.parse(d.lastFrameAt)) <= staleFrameMs;
|
}).length;
|
|
let status = 'ok';
|
if (connectedDevices === 0) status = 'degraded';
|
if (devices.length > 0 && freshDevices === 0) status = 'degraded';
|
|
const body = {
|
status,
|
timestamp: nowIso(),
|
pid: process.pid,
|
uptimeSec: Math.round(process.uptime()),
|
memory: process.memoryUsage(),
|
summary: {
|
totalDevices: devices.length,
|
connectedDevices,
|
freshDevices,
|
staleFrameMs,
|
},
|
devices,
|
};
|
|
res.statusCode = status === 'ok' ? 200 : 503;
|
res.setHeader('Content-Type', 'application/json; charset=utf-8');
|
res.end(JSON.stringify(body));
|
});
|
|
server.listen(port, host, () => {
|
console.log(`🩺 健康探针已启动: http://${host}:${port}/health`);
|
logEvent('info', 'health_probe_started', { host, port, staleFrameMs });
|
});
|
|
server.on('error', (error) => {
|
console.error(`❌ 健康探针启动/运行异常: ${error.message}`);
|
logEvent('error', 'health_probe_error', { error: error.message, host, port });
|
});
|
|
return server;
|
}
|
|
function normalizeFrameForCache(frame) {
|
// deviceKey 直接取配置文件 devices 中的 name 字段(通过 frame.deviceName 传递)
|
const deviceKey = frame.deviceName || frame.device || `${frame.source?.host || 'unknown'}:${frame.source?.port || 0}`;
|
const out = {
|
deviceKey,
|
deviceName: frame.deviceName, // 补充deviceName字段,确保mqtt topic正确
|
device: frame.device,
|
timestamp: frame.timestamp,
|
receivedAt: frame.receivedAt || nowIso(),
|
source: frame.source,
|
params: frame.params || [],
|
latest: frame.latest || {},
|
};
|
return out;
|
}
|
|
function normalizePublishTargets(targets, fallback = ['mqtt']) {
|
// 仅允许已支持的发布目标,自动去重;非法值会被忽略。
|
const source = Array.isArray(targets) && targets.length > 0 ? targets : fallback;
|
const allowed = new Set(['mqtt', 'aliyunIot']);
|
const out = [];
|
|
source.forEach((item) => {
|
const key = String(item || '').trim();
|
if (!allowed.has(key)) return;
|
if (!out.includes(key)) out.push(key);
|
});
|
|
return out.length > 0 ? out : fallback;
|
}
|
|
async function main() {
|
// main 负责装配所有组件,不直接处理协议细节。
|
const configArgIndex = process.argv.indexOf('--config');
|
// 配置文件默认查找策略:
|
// - pkg 打包运行:使用 exe 同级目录
|
// - node 脚本运行:使用当前工作目录(npm run gateway 时即项目根目录)
|
const baseDir = process.pkg ? path.dirname(process.execPath) : process.cwd();
|
const configPath = (configArgIndex !== -1 && process.argv[configArgIndex + 1])
|
? path.resolve(process.cwd(), process.argv[configArgIndex + 1])
|
: path.resolve(baseDir, 'gateway.config.json');
|
|
if (!fs.existsSync(configPath)) {
|
console.error(`❌ 配置文件不存在: ${configPath}`);
|
console.error('请先复制 gateway.config.example.json 为 gateway.config.json 并填写参数。');
|
process.exit(1);
|
}
|
|
const config = readJson(configPath);
|
const runtimeLogger = setupPersistentConsoleLogger(config.runtimeLog || {});
|
|
const cacheStore = new LatestCacheStore(path.resolve(process.cwd(), config.cacheFile || './cache/latest-device-cache.json'));
|
const mqttPublisher = new MqttPublisher(config.services?.mqtt || {});
|
const aliyunPublisher = new AliyunIotPublisher(config.services?.aliyunIot || {});
|
const globalPublishTargets = normalizePublishTargets(config.publishTargets, ['mqtt']);
|
console.log(`🚀 网关启动完成 | config=${configPath}`);
|
console.log(`📦 全局转发目标: ${globalPublishTargets.join(', ')}`);
|
|
if ((config.services?.aliyunIot?.enabled === true) && !globalPublishTargets.includes('aliyunIot')) {
|
console.warn('⚠️ 配置检查: aliyunIot.enabled=true,但 publishTargets 未包含 aliyunIot,当前不会上报阿里云。');
|
}
|
if ((config.services?.mqtt?.enabled === true) && !globalPublishTargets.includes('mqtt')) {
|
console.warn('⚠️ 配置检查: mqtt.enabled=true,但 publishTargets 未包含 mqtt,当前不会上报 MQTT。');
|
}
|
|
const onFrame = async (frameObj) => {
|
// 统一分发入口:缓存 -> 计算发布目标 -> 执行发布 -> 记录结果。
|
const frameData = normalizeFrameForCache(frameObj);
|
cacheStore.set(frameData.deviceKey, frameData);
|
console.log(`💾 缓存更新: key=${frameData.deviceKey} params=${frameData.params.length}`);
|
|
const publishTargets = normalizePublishTargets(frameObj.devicePublishTargets, globalPublishTargets);
|
console.log(`📮 [转发准备] device=${frameData.deviceKey} targets=${publishTargets.join(',')} ts=${frameData.timestamp || '-'}`);
|
logEvent('info', 'dispatch_prepare', {
|
deviceKey: frameData.deviceKey,
|
deviceNo: frameData.device || null,
|
frameTimestamp: frameData.timestamp || null,
|
targets: publishTargets,
|
});
|
try {
|
if (publishTargets.includes('mqtt')) {
|
console.log(`➡️ [转发执行] MQTT | device=${frameData.deviceKey}`);
|
logEvent('info', 'dispatch_execute_mqtt', {
|
deviceKey: frameData.deviceKey,
|
frameTimestamp: frameData.timestamp || null,
|
});
|
mqttPublisher.publish(frameData);
|
}
|
if (publishTargets.includes('aliyunIot')) {
|
console.log(`➡️ [转发执行] 阿里云IoT | device=${frameData.deviceKey}`);
|
logEvent('info', 'dispatch_execute_aliyun', {
|
deviceKey: frameData.deviceKey,
|
frameTimestamp: frameData.timestamp || null,
|
});
|
await aliyunPublisher.publish(frameData);
|
}
|
console.log(`✅ [转发完成] device=${frameData.deviceKey}`);
|
logEvent('info', 'dispatch_success', {
|
deviceKey: frameData.deviceKey,
|
frameTimestamp: frameData.timestamp || null,
|
targets: publishTargets,
|
dispatchAt: nowIso(),
|
});
|
} catch (error) {
|
console.error(`❌ 分发失败 (${frameData.deviceKey}): ${error.message}`);
|
logEvent('error', 'dispatch_failed', {
|
deviceKey: frameData.deviceKey,
|
frameTimestamp: frameData.timestamp || null,
|
targets: publishTargets,
|
error: error.message,
|
});
|
}
|
};
|
|
const devices = config.devices || [];
|
if (devices.length === 0) {
|
console.error('❌ 未配置设备列表(devices 为空)');
|
process.exit(1);
|
}
|
|
const clients = devices.map(device => new DeviceClient(device, config, onFrame));
|
clients.forEach(client => client.start());
|
const healthServer = setupHealthProbe(config.healthProbe || {}, clients);
|
|
let shuttingDown = false;
|
const shutdown = (signal = 'SIGINT') => {
|
if (shuttingDown) return;
|
shuttingDown = true;
|
console.warn(`🛑 收到 ${signal},准备安全退出...`);
|
|
clients.forEach((client) => client.stop());
|
mqttPublisher.close();
|
aliyunPublisher.close();
|
if (healthServer) {
|
try {
|
healthServer.close();
|
} catch (_) {
|
}
|
}
|
cacheStore.flushNow();
|
runtimeLogger.close();
|
process.exit(0);
|
};
|
|
process.on('SIGINT', () => shutdown('SIGINT'));
|
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
}
|
|
if (require.main === module) {
|
main().catch((error) => {
|
console.error(`❌ 程序启动失败: ${error.message}`);
|
process.exit(1);
|
});
|
}
|