const { EventEmitter } = require('events'); class AliyunService { constructor(config = {}, logger = console, options = {}) { this.config = config; this.logger = logger; this.fetchImpl = options.fetchImpl || global.fetch; this.aliyunSdk = options.aliyunSdk || null; this.contexts = new Map(); this.started = false; } start() { if (!this.config.enabled) { this.logger.info('[ALIYUN] 阿里云发送未启用'); this.started = false; return; } if (typeof this.fetchImpl !== 'function') { throw new Error('当前 Node 环境不支持 fetch,无法调用三元组接口'); } this.started = true; this.logger.info(`[ALIYUN] 阿里云服务已启用 tupleApi=${this.getTupleUrl()} autoRegister=${this.config.autoRegister === false ? 0 : 1} retryMs=${this.config.registerRetryMs || 60000}`); } async stop() { const closeTasks = []; this.logger.info(`[ALIYUN] 开始停止阿里云服务 contexts=${this.contexts.size}`); for (const context of this.contexts.values()) { this.clearConnectWaiter(context); context.tuplePromise = null; if (context.iotDevice && typeof context.iotDevice.end === 'function') { closeTasks.push(new Promise((resolve) => { try { context.iotDevice.end(false, resolve); } catch (_error) { resolve(); } })); } } await Promise.all(closeTasks); this.contexts.clear(); this.started = false; this.logger.info('[ALIYUN] 阿里云服务已停止'); } normalizeEventPayload(payload) { if (Array.isArray(payload) && payload.length === 1) { return payload[0]; } return payload; } formatErrorMessage(errorLike) { const normalized = this.normalizeEventPayload(errorLike); if (normalized instanceof Error) { return normalized.message || normalized.name || 'unknown-error'; } if (Array.isArray(normalized)) { const parts = normalized .map((item) => this.formatErrorMessage(item)) .filter(Boolean); return parts.length > 0 ? parts.join(' | ') : 'unknown-error'; } if (normalized && typeof normalized === 'object') { if (typeof normalized.message === 'string' && normalized.message) { return normalized.message; } try { return JSON.stringify(normalized); } catch (_error) { return String(normalized); } } if (normalized === undefined || normalized === null || normalized === '') { return 'unknown-error'; } return String(normalized); } shouldLogClose(context) { const throttleMs = this.config.closeLogThrottleMs || 5000; const now = Date.now(); if (context.lastCloseLogAt > 0 && (now - context.lastCloseLogAt) < throttleMs) { return false; } context.lastCloseLogAt = now; return true; } formatSuedtime(date = new Date()) { const year = date.getFullYear(); const month = String(date.getMonth() + 1).padStart(2, '0'); const day = String(date.getDate()).padStart(2, '0'); const hour = String(date.getHours()).padStart(2, '0'); const minute = String(date.getMinutes()).padStart(2, '0'); const second = String(date.getSeconds()).padStart(2, '0'); return `${year}-${month}-${day} ${hour}:${minute}:${second}`; } buildPayload(message) { return { ...(message || {}), suedtime: this.formatSuedtime(), }; } getTupleUrl() { if (this.config.tupleApiUrl) { return this.config.tupleApiUrl; } const baseUrl = (this.config.tupleApiBaseUrl || '').replace(/\/$/, ''); const apiPath = this.config.tupleApiPath || '/device/info/getAliyunDeviceSecret'; return `${baseUrl}${apiPath}`; } getAliyunSdk() { if (this.aliyunSdk) { return this.aliyunSdk; } // eslint-disable-next-line global-require this.aliyunSdk = require('aliyun-iot-device-sdk'); return this.aliyunSdk; } getContext(device) { const deviceId = device.deviceId; if (!this.contexts.has(deviceId)) { this.contexts.set(deviceId, { deviceId, tuple: null, tuplePromise: null, iotDevice: null, connected: false, hasConnectedOnce: false, connectPromise: null, resolveConnectPromise: null, rejectConnectPromise: null, connectTimer: null, lastRegisterAttempt: 0, lastRegisterError: '', lastPropsError: '', lastCloseLogAt: 0, }); this.logger.info(`[ALIYUN] 创建设备上下文 deviceId=${deviceId}`); } return this.contexts.get(deviceId); } extractTuple(responseBody, deviceId) { const data = responseBody && typeof responseBody === 'object' ? (responseBody.data || responseBody.result || responseBody) : null; if (!data || typeof data !== 'object') { throw new Error('三元组接口返回为空'); } const tuple = { productKey: data.productKey || data.ProductKey || '', deviceName: data.deviceName || data.DeviceName || deviceId, deviceSecret: data.deviceSecret || data.DeviceSecret || '', }; if (!tuple.productKey || !tuple.deviceName || !tuple.deviceSecret) { throw new Error('三元组字段不完整'); } return tuple; } async requestTuple(deviceId) { const url = this.getTupleUrl(); const formData = new URLSearchParams(); formData.set('isAutoRegister', this.config.autoRegister === false ? '0' : '1'); formData.set('deviceName', deviceId); this.logger.info(`[ALIYUN] 开始请求三元组 deviceId=${deviceId} url=${url}`); const response = await this.fetchImpl(url, { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body: formData.toString(), }); if (!response.ok) { throw new Error(`三元组接口请求失败 HTTP ${response.status}`); } const rawText = await response.text(); let responseBody = null; try { responseBody = JSON.parse(rawText); } catch (_error) { throw new Error('三元组接口返回不是有效 JSON'); } return this.extractTuple(responseBody, deviceId); } async ensureTuple(context, device) { if (context.tuple) { this.logger.info(`[ALIYUN] 复用已缓存三元组 deviceId=${device.deviceId} deviceName=${context.tuple.deviceName}`); return context.tuple; } if (context.tuplePromise) { this.logger.info(`[ALIYUN] 复用三元组请求中的会话 deviceId=${device.deviceId}`); return context.tuplePromise; } const retryMs = this.config.registerRetryMs || 60000; const now = Date.now(); if (context.lastRegisterAttempt > 0 && (now - context.lastRegisterAttempt) < retryMs && context.lastRegisterError) { this.logger.warn(`[ALIYUN] 三元组请求冷却中 deviceId=${device.deviceId} retryMs=${retryMs} lastError=${context.lastRegisterError}`); throw new Error(`阿里云三元组重试冷却中 ${context.lastRegisterError}`); } context.lastRegisterAttempt = now; context.tuplePromise = (async () => { try { const tuple = await this.requestTuple(device.deviceId); context.tuple = tuple; context.lastRegisterError = ''; this.logger.info(`[ALIYUN] 三元组获取成功 deviceId=${device.deviceId} deviceName=${tuple.deviceName}`); return tuple; } catch (error) { context.lastRegisterError = error.message; this.logger.error(`[ALIYUN] 三元组获取失败 deviceId=${device.deviceId}: ${error.message}`); throw error; } finally { context.tuplePromise = null; } })(); return context.tuplePromise; } createIotDevice(tuple) { const sdk = this.getAliyunSdk(); if (!sdk || typeof sdk.device !== 'function') { throw new Error('aliyun-iot-device-sdk 不可用'); } return sdk.device({ ProductKey: tuple.productKey, DeviceName: tuple.deviceName, DeviceSecret: tuple.deviceSecret, }); } clearConnectWaiter(context) { if (context.connectTimer) { clearTimeout(context.connectTimer); } context.connectPromise = null; context.resolveConnectPromise = null; context.rejectConnectPromise = null; context.connectTimer = null; } resolveConnectWaiter(context, iotDevice) { if (!context.connectPromise || typeof context.resolveConnectPromise !== 'function') { return false; } const resolve = context.resolveConnectPromise; this.clearConnectWaiter(context); resolve(iotDevice); return true; } rejectConnectWaiter(context, error) { if (!context.connectPromise || typeof context.rejectConnectPromise !== 'function') { return false; } const reject = context.rejectConnectPromise; this.clearConnectWaiter(context); reject(error instanceof Error ? error : new Error(this.formatErrorMessage(error))); return true; } ensureConnectWaiter(context, device) { if (context.connectPromise) { return context.connectPromise; } const connectTimeoutMs = this.config.connectTimeoutMs || 15000; context.connectPromise = new Promise((resolve, reject) => { context.resolveConnectPromise = resolve; context.rejectConnectPromise = reject; context.connectTimer = setTimeout(() => { this.rejectConnectWaiter(context, new Error('阿里云连接超时')); }, connectTimeoutMs); }); if (context.iotDevice && !context.connected) { this.logger.info(`[ALIYUN] 等待现有设备连接恢复 deviceId=${device.deviceId}`); } return context.connectPromise; } bindIotDeviceEvents(context, device, iotDevice) { const isCurrentDevice = () => context.iotDevice === iotDevice; iotDevice.on('connect', () => { if (!isCurrentDevice()) { return; } const wasConnected = context.connected; const hadConnectedOnce = context.hasConnectedOnce; context.connected = true; context.hasConnectedOnce = true; context.lastCloseLogAt = 0; const resolvedPending = this.resolveConnectWaiter(context, iotDevice); if (resolvedPending) { this.logger.info(`[ALIYUN] ${hadConnectedOnce ? '设备重新连接成功' : '设备连接成功'} deviceId=${device.deviceId}`); return; } if (!wasConnected) { this.logger.info(`[ALIYUN] 设备重新连接成功 deviceId=${device.deviceId}`); } }); iotDevice.on('error', (error) => { if (!isCurrentDevice()) { return; } context.connected = false; this.logger.error(`[ALIYUN] 连接错误 deviceId=${device.deviceId}: ${this.formatErrorMessage(error)}`); this.rejectConnectWaiter(context, error); }); iotDevice.on('reconnect', () => { if (!isCurrentDevice()) { return; } this.logger.info(`[ALIYUN] 正在重连 deviceId=${device.deviceId}`); }); iotDevice.on('offline', () => { if (!isCurrentDevice()) { return; } context.connected = false; this.logger.warn(`[ALIYUN] 连接离线 deviceId=${device.deviceId}`); }); iotDevice.on('close', () => { if (!isCurrentDevice()) { return; } context.connected = false; if (this.shouldLogClose(context)) { this.logger.info(`[ALIYUN] 连接关闭 deviceId=${device.deviceId}`); } }); } async ensureIotDevice(context, device) { if (context.iotDevice && context.connected) { this.logger.info(`[ALIYUN] 复用已连接设备 deviceId=${device.deviceId}`); return context.iotDevice; } if (context.connectPromise) { this.logger.info(`[ALIYUN] 复用连接中的设备会话 deviceId=${device.deviceId}`); return context.connectPromise; } if (!context.iotDevice) { const tuple = await this.ensureTuple(context, device); if (!context.iotDevice) { this.logger.info(`[ALIYUN] 开始创建设备连接 deviceId=${device.deviceId} deviceName=${tuple.deviceName}`); const iotDevice = this.createIotDevice(tuple); context.iotDevice = iotDevice; this.bindIotDeviceEvents(context, device, iotDevice); } } if (context.connected) { return context.iotDevice; } return this.ensureConnectWaiter(context, device); } async postProps(context, device, payload) { const iotDevice = await this.ensureIotDevice(context, device); this.logger.info(`[ALIYUN] 开始属性上报 deviceId=${device.deviceId} keys=${Object.keys(payload).join(',')}`); return new Promise((resolve, reject) => { iotDevice.postProps(payload, (result) => { if (result && (result.message === 'success' || result.success === true || result.code === 200)) { context.lastPropsError = ''; this.logger.info(`[ALIYUN] 属性上报成功 deviceId=${device.deviceId} payload=${JSON.stringify(payload)}`); resolve(result); return; } const errorMessage = result && (result.message || result.code) ? String(result.message || result.code) : '阿里云返回未知错误'; context.lastPropsError = errorMessage; reject(new Error(errorMessage)); }); }); } async publish(device, message) { if (!this.started || !this.config.enabled) { this.logger.warn(`[ALIYUN] 跳过发送 deviceId=${device.deviceId} reason=aliyun-disabled`); return { skipped: true, reason: 'aliyun-disabled' }; } const context = this.getContext(device); const payload = this.buildPayload(message); this.logger.info(`[ALIYUN] 收到发送请求 deviceId=${device.deviceId} payload=${JSON.stringify(payload)}`); try { await this.postProps(context, device, payload); return { ok: true, payload }; } catch (error) { this.logger.error(`[ALIYUN] 属性上报失败 deviceId=${device.deviceId}: ${error.message}`); return { ok: false, reason: error.message, payload }; } } } class FakeAliyunDevice extends EventEmitter { postProps(payload, callback) { callback({ message: 'success', payload }); } } module.exports = { AliyunService, FakeAliyunDevice, };