const { EventEmitter } = require('events');
|
|
class AliyunService {
|
constructor(config = {}, logger = console, options = {}) {
|
this.config = config;
|
this.logger = logger;
|
this.fetchImpl = options.fetchImpl || global.fetch;
|
this.aliyunSdk = options.aliyunSdk || null;
|
this.contexts = new Map();
|
this.started = false;
|
}
|
|
start() {
|
if (!this.config.enabled) {
|
this.logger.info('[ALIYUN] 阿里云发送未启用');
|
this.started = false;
|
return;
|
}
|
|
if (typeof this.fetchImpl !== 'function') {
|
throw new Error('当前 Node 环境不支持 fetch,无法调用三元组接口');
|
}
|
|
this.started = true;
|
this.logger.info(`[ALIYUN] 阿里云服务已启用 tupleApi=${this.getTupleUrl()} autoRegister=${this.config.autoRegister === false ? 0 : 1} retryMs=${this.config.registerRetryMs || 60000}`);
|
}
|
|
async stop() {
|
const closeTasks = [];
|
|
this.logger.info(`[ALIYUN] 开始停止阿里云服务 contexts=${this.contexts.size}`);
|
|
for (const context of this.contexts.values()) {
|
this.clearConnectWaiter(context);
|
context.tuplePromise = null;
|
|
if (context.iotDevice && typeof context.iotDevice.end === 'function') {
|
closeTasks.push(new Promise((resolve) => {
|
try {
|
context.iotDevice.end(false, resolve);
|
} catch (_error) {
|
resolve();
|
}
|
}));
|
}
|
}
|
|
await Promise.all(closeTasks);
|
this.contexts.clear();
|
this.started = false;
|
this.logger.info('[ALIYUN] 阿里云服务已停止');
|
}
|
|
normalizeEventPayload(payload) {
|
if (Array.isArray(payload) && payload.length === 1) {
|
return payload[0];
|
}
|
|
return payload;
|
}
|
|
formatErrorMessage(errorLike) {
|
const normalized = this.normalizeEventPayload(errorLike);
|
|
if (normalized instanceof Error) {
|
return normalized.message || normalized.name || 'unknown-error';
|
}
|
|
if (Array.isArray(normalized)) {
|
const parts = normalized
|
.map((item) => this.formatErrorMessage(item))
|
.filter(Boolean);
|
|
return parts.length > 0 ? parts.join(' | ') : 'unknown-error';
|
}
|
|
if (normalized && typeof normalized === 'object') {
|
if (typeof normalized.message === 'string' && normalized.message) {
|
return normalized.message;
|
}
|
|
try {
|
return JSON.stringify(normalized);
|
} catch (_error) {
|
return String(normalized);
|
}
|
}
|
|
if (normalized === undefined || normalized === null || normalized === '') {
|
return 'unknown-error';
|
}
|
|
return String(normalized);
|
}
|
|
shouldLogClose(context) {
|
const throttleMs = this.config.closeLogThrottleMs || 5000;
|
const now = Date.now();
|
|
if (context.lastCloseLogAt > 0 && (now - context.lastCloseLogAt) < throttleMs) {
|
return false;
|
}
|
|
context.lastCloseLogAt = now;
|
return true;
|
}
|
|
formatSuedtime(date = new Date()) {
|
const year = date.getFullYear();
|
const month = String(date.getMonth() + 1).padStart(2, '0');
|
const day = String(date.getDate()).padStart(2, '0');
|
const hour = String(date.getHours()).padStart(2, '0');
|
const minute = String(date.getMinutes()).padStart(2, '0');
|
const second = String(date.getSeconds()).padStart(2, '0');
|
|
return `${year}-${month}-${day} ${hour}:${minute}:${second}`;
|
}
|
|
buildPayload(message) {
|
return {
|
...(message || {}),
|
suedtime: this.formatSuedtime(),
|
};
|
}
|
|
getTupleUrl() {
|
if (this.config.tupleApiUrl) {
|
return this.config.tupleApiUrl;
|
}
|
|
const baseUrl = (this.config.tupleApiBaseUrl || '').replace(/\/$/, '');
|
const apiPath = this.config.tupleApiPath || '/device/info/getAliyunDeviceSecret';
|
|
return `${baseUrl}${apiPath}`;
|
}
|
|
getAliyunSdk() {
|
if (this.aliyunSdk) {
|
return this.aliyunSdk;
|
}
|
|
// eslint-disable-next-line global-require
|
this.aliyunSdk = require('aliyun-iot-device-sdk');
|
return this.aliyunSdk;
|
}
|
|
getContext(device) {
|
const deviceId = device.deviceId;
|
|
if (!this.contexts.has(deviceId)) {
|
this.contexts.set(deviceId, {
|
deviceId,
|
tuple: null,
|
tuplePromise: null,
|
iotDevice: null,
|
connected: false,
|
hasConnectedOnce: false,
|
connectPromise: null,
|
resolveConnectPromise: null,
|
rejectConnectPromise: null,
|
connectTimer: null,
|
lastRegisterAttempt: 0,
|
lastRegisterError: '',
|
lastPropsError: '',
|
lastCloseLogAt: 0,
|
});
|
|
this.logger.info(`[ALIYUN] 创建设备上下文 deviceId=${deviceId}`);
|
}
|
|
return this.contexts.get(deviceId);
|
}
|
|
extractTuple(responseBody, deviceId) {
|
const data = responseBody && typeof responseBody === 'object'
|
? (responseBody.data || responseBody.result || responseBody)
|
: null;
|
|
if (!data || typeof data !== 'object') {
|
throw new Error('三元组接口返回为空');
|
}
|
|
const tuple = {
|
productKey: data.productKey || data.ProductKey || '',
|
deviceName: data.deviceName || data.DeviceName || deviceId,
|
deviceSecret: data.deviceSecret || data.DeviceSecret || '',
|
};
|
|
if (!tuple.productKey || !tuple.deviceName || !tuple.deviceSecret) {
|
throw new Error('三元组字段不完整');
|
}
|
|
return tuple;
|
}
|
|
async requestTuple(deviceId) {
|
const url = this.getTupleUrl();
|
const formData = new URLSearchParams();
|
formData.set('isAutoRegister', this.config.autoRegister === false ? '0' : '1');
|
formData.set('deviceName', deviceId);
|
|
this.logger.info(`[ALIYUN] 开始请求三元组 deviceId=${deviceId} url=${url}`);
|
|
const response = await this.fetchImpl(url, {
|
method: 'POST',
|
headers: {
|
'Content-Type': 'application/x-www-form-urlencoded',
|
},
|
body: formData.toString(),
|
});
|
|
if (!response.ok) {
|
throw new Error(`三元组接口请求失败 HTTP ${response.status}`);
|
}
|
|
const rawText = await response.text();
|
let responseBody = null;
|
|
try {
|
responseBody = JSON.parse(rawText);
|
} catch (_error) {
|
throw new Error('三元组接口返回不是有效 JSON');
|
}
|
|
return this.extractTuple(responseBody, deviceId);
|
}
|
|
async ensureTuple(context, device) {
|
if (context.tuple) {
|
this.logger.info(`[ALIYUN] 复用已缓存三元组 deviceId=${device.deviceId} deviceName=${context.tuple.deviceName}`);
|
return context.tuple;
|
}
|
|
if (context.tuplePromise) {
|
this.logger.info(`[ALIYUN] 复用三元组请求中的会话 deviceId=${device.deviceId}`);
|
return context.tuplePromise;
|
}
|
|
const retryMs = this.config.registerRetryMs || 60000;
|
const now = Date.now();
|
|
if (context.lastRegisterAttempt > 0 && (now - context.lastRegisterAttempt) < retryMs && context.lastRegisterError) {
|
this.logger.warn(`[ALIYUN] 三元组请求冷却中 deviceId=${device.deviceId} retryMs=${retryMs} lastError=${context.lastRegisterError}`);
|
throw new Error(`阿里云三元组重试冷却中 ${context.lastRegisterError}`);
|
}
|
|
context.lastRegisterAttempt = now;
|
context.tuplePromise = (async () => {
|
try {
|
const tuple = await this.requestTuple(device.deviceId);
|
context.tuple = tuple;
|
context.lastRegisterError = '';
|
this.logger.info(`[ALIYUN] 三元组获取成功 deviceId=${device.deviceId} deviceName=${tuple.deviceName}`);
|
return tuple;
|
} catch (error) {
|
context.lastRegisterError = error.message;
|
this.logger.error(`[ALIYUN] 三元组获取失败 deviceId=${device.deviceId}: ${error.message}`);
|
throw error;
|
} finally {
|
context.tuplePromise = null;
|
}
|
})();
|
|
return context.tuplePromise;
|
}
|
|
createIotDevice(tuple) {
|
const sdk = this.getAliyunSdk();
|
|
if (!sdk || typeof sdk.device !== 'function') {
|
throw new Error('aliyun-iot-device-sdk 不可用');
|
}
|
|
return sdk.device({
|
ProductKey: tuple.productKey,
|
DeviceName: tuple.deviceName,
|
DeviceSecret: tuple.deviceSecret,
|
});
|
}
|
|
clearConnectWaiter(context) {
|
if (context.connectTimer) {
|
clearTimeout(context.connectTimer);
|
}
|
|
context.connectPromise = null;
|
context.resolveConnectPromise = null;
|
context.rejectConnectPromise = null;
|
context.connectTimer = null;
|
}
|
|
resolveConnectWaiter(context, iotDevice) {
|
if (!context.connectPromise || typeof context.resolveConnectPromise !== 'function') {
|
return false;
|
}
|
|
const resolve = context.resolveConnectPromise;
|
this.clearConnectWaiter(context);
|
resolve(iotDevice);
|
return true;
|
}
|
|
rejectConnectWaiter(context, error) {
|
if (!context.connectPromise || typeof context.rejectConnectPromise !== 'function') {
|
return false;
|
}
|
|
const reject = context.rejectConnectPromise;
|
this.clearConnectWaiter(context);
|
reject(error instanceof Error ? error : new Error(this.formatErrorMessage(error)));
|
return true;
|
}
|
|
ensureConnectWaiter(context, device) {
|
if (context.connectPromise) {
|
return context.connectPromise;
|
}
|
|
const connectTimeoutMs = this.config.connectTimeoutMs || 15000;
|
|
context.connectPromise = new Promise((resolve, reject) => {
|
context.resolveConnectPromise = resolve;
|
context.rejectConnectPromise = reject;
|
context.connectTimer = setTimeout(() => {
|
this.rejectConnectWaiter(context, new Error('阿里云连接超时'));
|
}, connectTimeoutMs);
|
});
|
|
if (context.iotDevice && !context.connected) {
|
this.logger.info(`[ALIYUN] 等待现有设备连接恢复 deviceId=${device.deviceId}`);
|
}
|
|
return context.connectPromise;
|
}
|
|
bindIotDeviceEvents(context, device, iotDevice) {
|
const isCurrentDevice = () => context.iotDevice === iotDevice;
|
|
iotDevice.on('connect', () => {
|
if (!isCurrentDevice()) {
|
return;
|
}
|
|
const wasConnected = context.connected;
|
const hadConnectedOnce = context.hasConnectedOnce;
|
context.connected = true;
|
context.hasConnectedOnce = true;
|
context.lastCloseLogAt = 0;
|
|
const resolvedPending = this.resolveConnectWaiter(context, iotDevice);
|
|
if (resolvedPending) {
|
this.logger.info(`[ALIYUN] ${hadConnectedOnce ? '设备重新连接成功' : '设备连接成功'} deviceId=${device.deviceId}`);
|
return;
|
}
|
|
if (!wasConnected) {
|
this.logger.info(`[ALIYUN] 设备重新连接成功 deviceId=${device.deviceId}`);
|
}
|
});
|
|
iotDevice.on('error', (error) => {
|
if (!isCurrentDevice()) {
|
return;
|
}
|
|
context.connected = false;
|
this.logger.error(`[ALIYUN] 连接错误 deviceId=${device.deviceId}: ${this.formatErrorMessage(error)}`);
|
this.rejectConnectWaiter(context, error);
|
});
|
|
iotDevice.on('reconnect', () => {
|
if (!isCurrentDevice()) {
|
return;
|
}
|
|
this.logger.info(`[ALIYUN] 正在重连 deviceId=${device.deviceId}`);
|
});
|
|
iotDevice.on('offline', () => {
|
if (!isCurrentDevice()) {
|
return;
|
}
|
|
context.connected = false;
|
this.logger.warn(`[ALIYUN] 连接离线 deviceId=${device.deviceId}`);
|
});
|
|
iotDevice.on('close', () => {
|
if (!isCurrentDevice()) {
|
return;
|
}
|
|
context.connected = false;
|
if (this.shouldLogClose(context)) {
|
this.logger.info(`[ALIYUN] 连接关闭 deviceId=${device.deviceId}`);
|
}
|
});
|
}
|
|
async ensureIotDevice(context, device) {
|
if (context.iotDevice && context.connected) {
|
this.logger.info(`[ALIYUN] 复用已连接设备 deviceId=${device.deviceId}`);
|
return context.iotDevice;
|
}
|
|
if (context.connectPromise) {
|
this.logger.info(`[ALIYUN] 复用连接中的设备会话 deviceId=${device.deviceId}`);
|
return context.connectPromise;
|
}
|
|
if (!context.iotDevice) {
|
const tuple = await this.ensureTuple(context, device);
|
|
if (!context.iotDevice) {
|
this.logger.info(`[ALIYUN] 开始创建设备连接 deviceId=${device.deviceId} deviceName=${tuple.deviceName}`);
|
const iotDevice = this.createIotDevice(tuple);
|
context.iotDevice = iotDevice;
|
this.bindIotDeviceEvents(context, device, iotDevice);
|
}
|
}
|
|
if (context.connected) {
|
return context.iotDevice;
|
}
|
|
return this.ensureConnectWaiter(context, device);
|
}
|
|
async postProps(context, device, payload) {
|
const iotDevice = await this.ensureIotDevice(context, device);
|
|
this.logger.info(`[ALIYUN] 开始属性上报 deviceId=${device.deviceId} keys=${Object.keys(payload).join(',')}`);
|
|
return new Promise((resolve, reject) => {
|
iotDevice.postProps(payload, (result) => {
|
if (result && (result.message === 'success' || result.success === true || result.code === 200)) {
|
context.lastPropsError = '';
|
this.logger.info(`[ALIYUN] 属性上报成功 deviceId=${device.deviceId} payload=${JSON.stringify(payload)}`);
|
resolve(result);
|
return;
|
}
|
|
const errorMessage = result && (result.message || result.code)
|
? String(result.message || result.code)
|
: '阿里云返回未知错误';
|
|
context.lastPropsError = errorMessage;
|
reject(new Error(errorMessage));
|
});
|
});
|
}
|
|
async publish(device, message) {
|
if (!this.started || !this.config.enabled) {
|
this.logger.warn(`[ALIYUN] 跳过发送 deviceId=${device.deviceId} reason=aliyun-disabled`);
|
return { skipped: true, reason: 'aliyun-disabled' };
|
}
|
|
const context = this.getContext(device);
|
const payload = this.buildPayload(message);
|
|
this.logger.info(`[ALIYUN] 收到发送请求 deviceId=${device.deviceId} payload=${JSON.stringify(payload)}`);
|
|
try {
|
await this.postProps(context, device, payload);
|
return { ok: true, payload };
|
} catch (error) {
|
this.logger.error(`[ALIYUN] 属性上报失败 deviceId=${device.deviceId}: ${error.message}`);
|
return { ok: false, reason: error.message, payload };
|
}
|
}
|
}
|
|
class FakeAliyunDevice extends EventEmitter {
|
postProps(payload, callback) {
|
callback({ message: 'success', payload });
|
}
|
}
|
|
module.exports = {
|
AliyunService,
|
FakeAliyunDevice,
|
};
|