class MetricAggregator {
|
constructor(options = {}) {
|
this.logger = options.logger || console;
|
this.onFlush = typeof options.onFlush === 'function' ? options.onFlush : async () => true;
|
this.mode = String(options.mode || 'batch').trim().toLowerCase();
|
this.flushIntervalMs = Number(options.flushIntervalMs) > 0 ? Number(options.flushIntervalMs) : 60000;
|
this.alignToMinute = options.alignToMinute !== false;
|
this.includeDeviceIdField = options.includeDeviceIdField !== false;
|
this.deviceIdField = options.deviceIdField || 'n';
|
this.publishOnShutdown = options.publishOnShutdown !== false;
|
this.cache = new Map();
|
this.timer = null;
|
this.started = false;
|
this.flushing = false;
|
this.flushQueued = false;
|
}
|
|
start() {
|
if (this.started) {
|
return;
|
}
|
|
this.started = true;
|
|
if (this.mode === 'batch') {
|
this.scheduleNextFlush();
|
}
|
}
|
|
async stop() {
|
this.started = false;
|
|
if (this.timer) {
|
clearTimeout(this.timer);
|
this.timer = null;
|
}
|
|
if (this.mode === 'batch' && this.publishOnShutdown) {
|
await this.flush({ reason: 'shutdown' });
|
}
|
}
|
|
ingest(device, metric = {}) {
|
if (!device || !device.deviceId || !metric || typeof metric !== 'object') {
|
return Promise.resolve(false);
|
}
|
|
const entry = this.getOrCreateEntry(device);
|
entry.device = device;
|
entry.dirty = true;
|
entry.lastUpdateAt = Date.now();
|
|
if (this.includeDeviceIdField) {
|
entry.payload[this.deviceIdField] = device.deviceId;
|
}
|
|
Object.assign(entry.payload, metric);
|
|
if (this.mode === 'immediate') {
|
return this.flush({ reason: 'immediate', deviceId: device.deviceId });
|
}
|
|
return Promise.resolve(true);
|
}
|
|
getOrCreateEntry(device) {
|
if (!this.cache.has(device.deviceId)) {
|
this.cache.set(device.deviceId, {
|
device,
|
payload: {},
|
dirty: false,
|
firstSeenAt: Date.now(),
|
lastUpdateAt: 0,
|
lastFlushAt: 0,
|
});
|
}
|
|
return this.cache.get(device.deviceId);
|
}
|
|
computeDelayMs() {
|
if (!this.alignToMinute) {
|
return this.flushIntervalMs;
|
}
|
|
const remainder = Date.now() % this.flushIntervalMs;
|
return remainder === 0 ? this.flushIntervalMs : this.flushIntervalMs - remainder;
|
}
|
|
scheduleNextFlush() {
|
if (!this.started || this.mode !== 'batch') {
|
return;
|
}
|
|
if (this.timer) {
|
clearTimeout(this.timer);
|
}
|
|
this.timer = setTimeout(() => {
|
this.flush({ reason: 'timer' }).catch((error) => {
|
this.logger.error(`[APP] Aggregator flush failed: ${error.message}`);
|
});
|
}, this.computeDelayMs());
|
|
if (typeof this.timer.unref === 'function') {
|
this.timer.unref();
|
}
|
}
|
|
async flush(options = {}) {
|
const reason = options.reason || 'manual';
|
const deviceId = options.deviceId || '';
|
|
if (this.flushing) {
|
this.flushQueued = true;
|
return false;
|
}
|
|
this.flushing = true;
|
|
if (this.timer) {
|
clearTimeout(this.timer);
|
this.timer = null;
|
}
|
|
try {
|
const entries = Array.from(this.cache.values()).filter((entry) => {
|
if (!entry.dirty) {
|
return false;
|
}
|
|
if (deviceId && entry.device.deviceId !== deviceId) {
|
return false;
|
}
|
|
return true;
|
});
|
|
for (const entry of entries) {
|
const payload = {
|
...entry.payload,
|
};
|
|
if (this.includeDeviceIdField) {
|
payload[this.deviceIdField] = entry.device.deviceId;
|
}
|
|
try {
|
const result = await this.onFlush(entry.device, payload, {
|
reason,
|
firstSeenAt: entry.firstSeenAt,
|
lastUpdateAt: entry.lastUpdateAt,
|
lastFlushAt: entry.lastFlushAt,
|
});
|
|
if (result === false) {
|
continue;
|
}
|
|
entry.dirty = false;
|
entry.lastFlushAt = Date.now();
|
} catch (error) {
|
this.logger.error(`[APP] Aggregator device flush failed deviceId=${entry.device.deviceId}: ${error.message}`);
|
}
|
}
|
|
return true;
|
} finally {
|
this.flushing = false;
|
|
if (this.started && this.mode === 'batch') {
|
this.scheduleNextFlush();
|
}
|
|
if (this.flushQueued) {
|
this.flushQueued = false;
|
await this.flush({ reason: 'queued' });
|
}
|
}
|
}
|
|
getSnapshot(deviceId) {
|
if (!deviceId) {
|
return Array.from(this.cache.values()).map((entry) => ({
|
deviceId: entry.device.deviceId,
|
payload: { ...entry.payload },
|
dirty: entry.dirty,
|
}));
|
}
|
|
const entry = this.cache.get(deviceId);
|
|
if (!entry) {
|
return null;
|
}
|
|
return {
|
deviceId: entry.device.deviceId,
|
payload: { ...entry.payload },
|
dirty: entry.dirty,
|
};
|
}
|
}
|
|
module.exports = {
|
MetricAggregator,
|
};
|