const { EventEmitter } = require('events');
|
const { formatUploadTime } = require('./mqtt-service');
|
|
class AliyunService {
|
constructor(config = {}, logger = console, options = {}) {
|
this.config = config;
|
this.logger = logger;
|
this.fetchImpl = options.fetchImpl || global.fetch;
|
this.aliyunSdk = options.aliyunSdk || null;
|
this.contexts = new Map();
|
this.started = false;
|
}
|
|
start() {
|
if (!this.config.enabled) {
|
this.logger.info('[ALIYUN] 阿里云通道未启用');
|
this.started = false;
|
return;
|
}
|
|
if (typeof this.fetchImpl !== 'function') {
|
throw new Error('当前 Node.js 运行环境不支持 fetch,无法请求三元组接口');
|
}
|
|
this.started = true;
|
this.logger.info(`[ALIYUN] 阿里云通道已启用 三元组接口=${this.getTupleUrl()} 自动注册=${this.config.autoRegister === false ? 0 : 1}`);
|
}
|
|
async stop() {
|
const closeTasks = [];
|
|
for (const context of this.contexts.values()) {
|
this.clearConnectWaiter(context);
|
context.tuplePromise = null;
|
|
if (context.iotDevice && typeof context.iotDevice.end === 'function') {
|
closeTasks.push(new Promise((resolve) => {
|
try {
|
context.iotDevice.end(false, resolve);
|
} catch (_error) {
|
resolve();
|
}
|
}));
|
}
|
}
|
|
await Promise.all(closeTasks);
|
this.contexts.clear();
|
this.started = false;
|
this.logger.info('[ALIYUN] 阿里云通道已停止');
|
}
|
|
buildPayload(message) {
|
return {
|
...(message || {}),
|
suedtime: formatUploadTime(),
|
};
|
}
|
|
getTupleUrl() {
|
if (this.config.tupleApiUrl) {
|
return this.config.tupleApiUrl;
|
}
|
|
const baseUrl = String(this.config.tupleApiBaseUrl || '').replace(/\/$/, '');
|
const apiPath = this.config.tupleApiPath || '/device/info/getAliyunDeviceSecret';
|
return `${baseUrl}${apiPath}`;
|
}
|
|
getAliyunSdk() {
|
if (!this.aliyunSdk) {
|
this.aliyunSdk = require('aliyun-iot-device-sdk');
|
}
|
|
return this.aliyunSdk;
|
}
|
|
getContext(device) {
|
const deviceId = device.deviceId;
|
|
if (!this.contexts.has(deviceId)) {
|
this.contexts.set(deviceId, {
|
deviceId,
|
tuple: null,
|
tuplePromise: null,
|
iotDevice: null,
|
connected: false,
|
connectPromise: null,
|
resolveConnectPromise: null,
|
rejectConnectPromise: null,
|
connectTimer: null,
|
lastRegisterAttempt: 0,
|
lastRegisterError: '',
|
});
|
this.logger.info(`[ALIYUN] 创建设备上下文 设备=${deviceId}`);
|
}
|
|
return this.contexts.get(deviceId);
|
}
|
|
extractTuple(responseBody, deviceId) {
|
const data = responseBody && typeof responseBody === 'object'
|
? (responseBody.data || responseBody.result || responseBody)
|
: null;
|
|
if (!data || typeof data !== 'object') {
|
throw new Error('三元组接口返回为空');
|
}
|
|
const tuple = {
|
productKey: data.productKey || data.ProductKey || '',
|
deviceName: data.deviceName || data.DeviceName || deviceId,
|
deviceSecret: data.deviceSecret || data.DeviceSecret || '',
|
};
|
|
if (!tuple.productKey || !tuple.deviceName || !tuple.deviceSecret) {
|
throw new Error('三元组字段不完整');
|
}
|
|
return tuple;
|
}
|
|
async requestTuple(deviceId) {
|
const formData = new URLSearchParams();
|
formData.set('isAutoRegister', this.config.autoRegister === false ? '0' : '1');
|
formData.set('deviceName', deviceId);
|
|
const url = this.getTupleUrl();
|
this.logger.info(`[ALIYUN] 请求三元组 设备=${deviceId} 地址=${url}`);
|
|
const response = await this.fetchImpl(url, {
|
method: 'POST',
|
headers: {
|
'Content-Type': 'application/x-www-form-urlencoded',
|
},
|
body: formData.toString(),
|
});
|
|
if (!response.ok) {
|
throw new Error(`三元组接口请求失败 HTTP ${response.status}`);
|
}
|
|
const text = await response.text();
|
let body;
|
|
try {
|
body = JSON.parse(text);
|
} catch (_error) {
|
throw new Error('三元组接口返回不是有效 JSON');
|
}
|
|
return this.extractTuple(body, deviceId);
|
}
|
|
async ensureTuple(context, device) {
|
if (context.tuple) {
|
return context.tuple;
|
}
|
|
if (context.tuplePromise) {
|
return context.tuplePromise;
|
}
|
|
const retryMs = this.config.registerRetryMs || 60000;
|
const now = Date.now();
|
|
if (context.lastRegisterAttempt > 0 && (now - context.lastRegisterAttempt) < retryMs && context.lastRegisterError) {
|
throw new Error(`三元组请求重试冷却中: ${context.lastRegisterError}`);
|
}
|
|
context.lastRegisterAttempt = now;
|
context.tuplePromise = (async () => {
|
try {
|
const tuple = await this.requestTuple(device.deviceId);
|
context.tuple = tuple;
|
context.lastRegisterError = '';
|
this.logger.info(`[ALIYUN] 三元组获取成功 设备=${device.deviceId} 阿里云设备名=${tuple.deviceName}`);
|
return tuple;
|
} catch (error) {
|
context.lastRegisterError = error.message;
|
this.logger.error(`[ALIYUN] 三元组获取失败 设备=${device.deviceId}: ${error.message}`);
|
throw error;
|
} finally {
|
context.tuplePromise = null;
|
}
|
})();
|
|
return context.tuplePromise;
|
}
|
|
createIotDevice(tuple) {
|
const sdk = this.getAliyunSdk();
|
|
if (!sdk || typeof sdk.device !== 'function') {
|
throw new Error('aliyun-iot-device-sdk 不可用');
|
}
|
|
return sdk.device({
|
ProductKey: tuple.productKey,
|
DeviceName: tuple.deviceName,
|
DeviceSecret: tuple.deviceSecret,
|
});
|
}
|
|
clearConnectWaiter(context) {
|
if (context.connectTimer) {
|
clearTimeout(context.connectTimer);
|
}
|
|
context.connectPromise = null;
|
context.resolveConnectPromise = null;
|
context.rejectConnectPromise = null;
|
context.connectTimer = null;
|
}
|
|
resolveConnectWaiter(context, iotDevice) {
|
if (context.connectPromise && context.resolveConnectPromise) {
|
const resolve = context.resolveConnectPromise;
|
this.clearConnectWaiter(context);
|
resolve(iotDevice);
|
}
|
}
|
|
rejectConnectWaiter(context, error) {
|
if (context.connectPromise && context.rejectConnectPromise) {
|
const reject = context.rejectConnectPromise;
|
this.clearConnectWaiter(context);
|
reject(error instanceof Error ? error : new Error(String(error)));
|
}
|
}
|
|
bindIotDeviceEvents(context, device, iotDevice) {
|
const isCurrentDevice = () => context.iotDevice === iotDevice;
|
|
iotDevice.on('connect', () => {
|
if (!isCurrentDevice()) {
|
return;
|
}
|
|
context.connected = true;
|
this.logger.info(`[ALIYUN] 设备连接成功 设备=${device.deviceId}`);
|
this.resolveConnectWaiter(context, iotDevice);
|
});
|
|
iotDevice.on('error', (error) => {
|
if (!isCurrentDevice()) {
|
return;
|
}
|
|
context.connected = false;
|
this.logger.error(`[ALIYUN] 设备连接异常 设备=${device.deviceId}: ${error.message || error}`);
|
this.rejectConnectWaiter(context, error);
|
});
|
|
iotDevice.on('offline', () => {
|
if (isCurrentDevice()) {
|
context.connected = false;
|
this.logger.warn(`[ALIYUN] 设备离线 设备=${device.deviceId}`);
|
}
|
});
|
|
iotDevice.on('close', () => {
|
if (isCurrentDevice()) {
|
context.connected = false;
|
this.logger.warn(`[ALIYUN] 连接已关闭 设备=${device.deviceId}`);
|
}
|
});
|
}
|
|
async ensureIotDevice(context, device) {
|
if (context.iotDevice && context.connected) {
|
return context.iotDevice;
|
}
|
|
if (context.connectPromise) {
|
return context.connectPromise;
|
}
|
|
if (!context.iotDevice) {
|
const tuple = await this.ensureTuple(context, device);
|
const iotDevice = this.createIotDevice(tuple);
|
context.iotDevice = iotDevice;
|
this.bindIotDeviceEvents(context, device, iotDevice);
|
this.logger.info(`[ALIYUN] 创建 SDK 设备实例 设备=${device.deviceId}`);
|
}
|
|
if (context.connected) {
|
return context.iotDevice;
|
}
|
|
const connectTimeoutMs = this.config.connectTimeoutMs || 15000;
|
context.connectPromise = new Promise((resolve, reject) => {
|
context.resolveConnectPromise = resolve;
|
context.rejectConnectPromise = reject;
|
context.connectTimer = setTimeout(() => {
|
this.rejectConnectWaiter(context, new Error('阿里云连接超时'));
|
}, connectTimeoutMs);
|
});
|
|
return context.connectPromise;
|
}
|
|
async publish(device, message) {
|
if (!this.started || !this.config.enabled) {
|
this.logger.warn(`[ALIYUN] 跳过上报 设备=${device.deviceId} 原因=通道未启用`);
|
return { skipped: true, reason: 'disabled' };
|
}
|
|
const context = this.getContext(device);
|
const payload = this.buildPayload(message);
|
|
try {
|
const iotDevice = await this.ensureIotDevice(context, device);
|
this.logger.info(`[ALIYUN] 开始属性上报 设备=${device.deviceId} 字段=${Object.keys(payload).join(',')}`);
|
await new Promise((resolve, reject) => {
|
iotDevice.postProps(payload, (result) => {
|
if (result && (result.message === 'success' || result.success === true || result.code === 200)) {
|
resolve(result);
|
return;
|
}
|
|
reject(new Error(result && (result.message || result.code) ? String(result.message || result.code) : '属性上报失败'));
|
});
|
});
|
this.logger.info(`[ALIYUN] 属性上报成功 设备=${device.deviceId} 数据=${JSON.stringify(payload)}`);
|
return { ok: true, payload };
|
} catch (error) {
|
this.logger.error(`[ALIYUN] 上报失败 设备=${device.deviceId}: ${error.message}`);
|
return { ok: false, reason: error.message, payload };
|
}
|
}
|
}
|
|
class FakeAliyunDevice extends EventEmitter {
|
postProps(payload, callback) {
|
callback({ message: 'success', payload });
|
}
|
}
|
|
module.exports = {
|
AliyunService,
|
FakeAliyunDevice,
|
};
|