class MetricAggregator { constructor(options = {}) { this.logger = options.logger || console; this.onFlush = typeof options.onFlush === 'function' ? options.onFlush : async () => true; this.mode = String(options.mode || 'batch').trim().toLowerCase(); this.flushIntervalMs = Number(options.flushIntervalMs) > 0 ? Number(options.flushIntervalMs) : 60000; this.alignToMinute = options.alignToMinute !== false; this.includeDeviceIdField = options.includeDeviceIdField !== false; this.deviceIdField = options.deviceIdField || 'n'; this.publishOnShutdown = options.publishOnShutdown !== false; this.cache = new Map(); this.timer = null; this.started = false; this.flushing = false; this.flushQueued = false; } start() { if (this.started) { return; } this.started = true; if (this.mode === 'batch') { this.scheduleNextFlush(); } } async stop() { this.started = false; if (this.timer) { clearTimeout(this.timer); this.timer = null; } if (this.mode === 'batch' && this.publishOnShutdown) { await this.flush({ reason: 'shutdown' }); } } ingest(device, metric = {}) { if (!device || !device.deviceId || !metric || typeof metric !== 'object') { return Promise.resolve(false); } const entry = this.getOrCreateEntry(device); entry.device = device; entry.dirty = true; entry.lastUpdateAt = Date.now(); if (this.includeDeviceIdField) { entry.payload[this.deviceIdField] = device.deviceId; } Object.assign(entry.payload, metric); if (this.mode === 'immediate') { return this.flush({ reason: 'immediate', deviceId: device.deviceId }); } return Promise.resolve(true); } getOrCreateEntry(device) { if (!this.cache.has(device.deviceId)) { this.cache.set(device.deviceId, { device, payload: {}, dirty: false, firstSeenAt: Date.now(), lastUpdateAt: 0, lastFlushAt: 0, }); } return this.cache.get(device.deviceId); } computeDelayMs() { if (!this.alignToMinute) { return this.flushIntervalMs; } const remainder = Date.now() % this.flushIntervalMs; return remainder === 0 ? this.flushIntervalMs : this.flushIntervalMs - remainder; } scheduleNextFlush() { if (!this.started || this.mode !== 'batch') { return; } if (this.timer) { clearTimeout(this.timer); } this.timer = setTimeout(() => { this.flush({ reason: 'timer' }).catch((error) => { this.logger.error(`[APP] Aggregator flush failed: ${error.message}`); }); }, this.computeDelayMs()); if (typeof this.timer.unref === 'function') { this.timer.unref(); } } async flush(options = {}) { const reason = options.reason || 'manual'; const deviceId = options.deviceId || ''; if (this.flushing) { this.flushQueued = true; return false; } this.flushing = true; if (this.timer) { clearTimeout(this.timer); this.timer = null; } try { const entries = Array.from(this.cache.values()).filter((entry) => { if (!entry.dirty) { return false; } if (deviceId && entry.device.deviceId !== deviceId) { return false; } return true; }); for (const entry of entries) { const payload = { ...entry.payload, }; if (this.includeDeviceIdField) { payload[this.deviceIdField] = entry.device.deviceId; } try { const result = await this.onFlush(entry.device, payload, { reason, firstSeenAt: entry.firstSeenAt, lastUpdateAt: entry.lastUpdateAt, lastFlushAt: entry.lastFlushAt, }); if (result === false) { continue; } entry.dirty = false; entry.lastFlushAt = Date.now(); } catch (error) { this.logger.error(`[APP] Aggregator device flush failed deviceId=${entry.device.deviceId}: ${error.message}`); } } return true; } finally { this.flushing = false; if (this.started && this.mode === 'batch') { this.scheduleNextFlush(); } if (this.flushQueued) { this.flushQueued = false; await this.flush({ reason: 'queued' }); } } } getSnapshot(deviceId) { if (!deviceId) { return Array.from(this.cache.values()).map((entry) => ({ deviceId: entry.device.deviceId, payload: { ...entry.payload }, dirty: entry.dirty, })); } const entry = this.cache.get(deviceId); if (!entry) { return null; } return { deviceId: entry.device.deviceId, payload: { ...entry.payload }, dirty: entry.dirty, }; } } module.exports = { MetricAggregator, };