const { EventEmitter } = require('events'); const { formatUploadTime } = require('./mqtt-service'); class AliyunService { constructor(config = {}, logger = console, options = {}) { this.config = config; this.logger = logger; this.fetchImpl = options.fetchImpl || global.fetch; this.aliyunSdk = options.aliyunSdk || null; this.contexts = new Map(); this.started = false; } start() { if (!this.config.enabled) { this.logger.info('[ALIYUN] 阿里云通道未启用'); this.started = false; return; } if (typeof this.fetchImpl !== 'function') { throw new Error('当前 Node.js 运行环境不支持 fetch,无法请求三元组接口'); } this.started = true; this.logger.info(`[ALIYUN] 阿里云通道已启用 三元组接口=${this.getTupleUrl()} 自动注册=${this.config.autoRegister === false ? 0 : 1}`); } async stop() { const closeTasks = []; for (const context of this.contexts.values()) { this.clearConnectWaiter(context); context.tuplePromise = null; if (context.iotDevice && typeof context.iotDevice.end === 'function') { closeTasks.push(new Promise((resolve) => { try { context.iotDevice.end(false, resolve); } catch (_error) { resolve(); } })); } } await Promise.all(closeTasks); this.contexts.clear(); this.started = false; this.logger.info('[ALIYUN] 阿里云通道已停止'); } buildPayload(message) { return { ...(message || {}), suedtime: formatUploadTime(), }; } getTupleUrl() { if (this.config.tupleApiUrl) { return this.config.tupleApiUrl; } const baseUrl = String(this.config.tupleApiBaseUrl || '').replace(/\/$/, ''); const apiPath = this.config.tupleApiPath || '/device/info/getAliyunDeviceSecret'; return `${baseUrl}${apiPath}`; } getAliyunSdk() { if (!this.aliyunSdk) { this.aliyunSdk = require('aliyun-iot-device-sdk'); } return this.aliyunSdk; } getContext(device) { const deviceId = device.deviceId; if (!this.contexts.has(deviceId)) { this.contexts.set(deviceId, { deviceId, tuple: null, tuplePromise: null, iotDevice: null, connected: false, connectPromise: null, resolveConnectPromise: null, rejectConnectPromise: null, connectTimer: null, lastRegisterAttempt: 0, lastRegisterError: '', }); this.logger.info(`[ALIYUN] 创建设备上下文 设备=${deviceId}`); } return this.contexts.get(deviceId); } extractTuple(responseBody, deviceId) { const data = responseBody && typeof responseBody === 'object' ? (responseBody.data || responseBody.result || responseBody) : null; if (!data || typeof data !== 'object') { throw new Error('三元组接口返回为空'); } const tuple = { productKey: data.productKey || data.ProductKey || '', deviceName: data.deviceName || data.DeviceName || deviceId, deviceSecret: data.deviceSecret || data.DeviceSecret || '', }; if (!tuple.productKey || !tuple.deviceName || !tuple.deviceSecret) { throw new Error('三元组字段不完整'); } return tuple; } async requestTuple(deviceId) { const formData = new URLSearchParams(); formData.set('isAutoRegister', this.config.autoRegister === false ? '0' : '1'); formData.set('deviceName', deviceId); const url = this.getTupleUrl(); this.logger.info(`[ALIYUN] 请求三元组 设备=${deviceId} 地址=${url}`); const response = await this.fetchImpl(url, { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded', }, body: formData.toString(), }); if (!response.ok) { throw new Error(`三元组接口请求失败 HTTP ${response.status}`); } const text = await response.text(); let body; try { body = JSON.parse(text); } catch (_error) { throw new Error('三元组接口返回不是有效 JSON'); } return this.extractTuple(body, deviceId); } async ensureTuple(context, device) { if (context.tuple) { return context.tuple; } if (context.tuplePromise) { return context.tuplePromise; } const retryMs = this.config.registerRetryMs || 60000; const now = Date.now(); if (context.lastRegisterAttempt > 0 && (now - context.lastRegisterAttempt) < retryMs && context.lastRegisterError) { throw new Error(`三元组请求重试冷却中: ${context.lastRegisterError}`); } context.lastRegisterAttempt = now; context.tuplePromise = (async () => { try { const tuple = await this.requestTuple(device.deviceId); context.tuple = tuple; context.lastRegisterError = ''; this.logger.info(`[ALIYUN] 三元组获取成功 设备=${device.deviceId} 阿里云设备名=${tuple.deviceName}`); return tuple; } catch (error) { context.lastRegisterError = error.message; this.logger.error(`[ALIYUN] 三元组获取失败 设备=${device.deviceId}: ${error.message}`); throw error; } finally { context.tuplePromise = null; } })(); return context.tuplePromise; } createIotDevice(tuple) { const sdk = this.getAliyunSdk(); if (!sdk || typeof sdk.device !== 'function') { throw new Error('aliyun-iot-device-sdk 不可用'); } return sdk.device({ ProductKey: tuple.productKey, DeviceName: tuple.deviceName, DeviceSecret: tuple.deviceSecret, }); } clearConnectWaiter(context) { if (context.connectTimer) { clearTimeout(context.connectTimer); } context.connectPromise = null; context.resolveConnectPromise = null; context.rejectConnectPromise = null; context.connectTimer = null; } resolveConnectWaiter(context, iotDevice) { if (context.connectPromise && context.resolveConnectPromise) { const resolve = context.resolveConnectPromise; this.clearConnectWaiter(context); resolve(iotDevice); } } rejectConnectWaiter(context, error) { if (context.connectPromise && context.rejectConnectPromise) { const reject = context.rejectConnectPromise; this.clearConnectWaiter(context); reject(error instanceof Error ? error : new Error(String(error))); } } bindIotDeviceEvents(context, device, iotDevice) { const isCurrentDevice = () => context.iotDevice === iotDevice; iotDevice.on('connect', () => { if (!isCurrentDevice()) { return; } context.connected = true; this.logger.info(`[ALIYUN] 设备连接成功 设备=${device.deviceId}`); this.resolveConnectWaiter(context, iotDevice); }); iotDevice.on('error', (error) => { if (!isCurrentDevice()) { return; } context.connected = false; this.logger.error(`[ALIYUN] 设备连接异常 设备=${device.deviceId}: ${error.message || error}`); this.rejectConnectWaiter(context, error); }); iotDevice.on('offline', () => { if (isCurrentDevice()) { context.connected = false; this.logger.warn(`[ALIYUN] 设备离线 设备=${device.deviceId}`); } }); iotDevice.on('close', () => { if (isCurrentDevice()) { context.connected = false; this.logger.warn(`[ALIYUN] 连接已关闭 设备=${device.deviceId}`); } }); } async ensureIotDevice(context, device) { if (context.iotDevice && context.connected) { return context.iotDevice; } if (context.connectPromise) { return context.connectPromise; } if (!context.iotDevice) { const tuple = await this.ensureTuple(context, device); const iotDevice = this.createIotDevice(tuple); context.iotDevice = iotDevice; this.bindIotDeviceEvents(context, device, iotDevice); this.logger.info(`[ALIYUN] 创建 SDK 设备实例 设备=${device.deviceId}`); } if (context.connected) { return context.iotDevice; } const connectTimeoutMs = this.config.connectTimeoutMs || 15000; context.connectPromise = new Promise((resolve, reject) => { context.resolveConnectPromise = resolve; context.rejectConnectPromise = reject; context.connectTimer = setTimeout(() => { this.rejectConnectWaiter(context, new Error('阿里云连接超时')); }, connectTimeoutMs); }); return context.connectPromise; } async publish(device, message) { if (!this.started || !this.config.enabled) { this.logger.warn(`[ALIYUN] 跳过上报 设备=${device.deviceId} 原因=通道未启用`); return { skipped: true, reason: 'disabled' }; } const context = this.getContext(device); const payload = this.buildPayload(message); try { const iotDevice = await this.ensureIotDevice(context, device); this.logger.info(`[ALIYUN] 开始属性上报 设备=${device.deviceId} 字段=${Object.keys(payload).join(',')}`); await new Promise((resolve, reject) => { iotDevice.postProps(payload, (result) => { if (result && (result.message === 'success' || result.success === true || result.code === 200)) { resolve(result); return; } reject(new Error(result && (result.message || result.code) ? String(result.message || result.code) : '属性上报失败')); }); }); this.logger.info(`[ALIYUN] 属性上报成功 设备=${device.deviceId} 数据=${JSON.stringify(payload)}`); return { ok: true, payload }; } catch (error) { this.logger.error(`[ALIYUN] 上报失败 设备=${device.deviceId}: ${error.message}`); return { ok: false, reason: error.message, payload }; } } } class FakeAliyunDevice extends EventEmitter { postProps(payload, callback) { callback({ message: 'success', payload }); } } module.exports = { AliyunService, FakeAliyunDevice, };