#!/usr/bin/env node const fs = require('fs'); const path = require('path'); const { TcpService } = require('./tcp-service'); const { MqttService } = require('./mqtt-service'); const { AliyunService } = require('./aliyun-service'); const { MetricAggregator } = require('./metric-aggregator'); const { getDefaultConfigPath } = require('./runtime-paths'); const LOG_LEVELS = { debug: 10, info: 20, warn: 30, error: 40, }; function formatLocalTimestamp(date = new Date()) { const year = date.getFullYear(); const month = String(date.getMonth() + 1).padStart(2, '0'); const day = String(date.getDate()).padStart(2, '0'); const hour = String(date.getHours()).padStart(2, '0'); const minute = String(date.getMinutes()).padStart(2, '0'); const second = String(date.getSeconds()).padStart(2, '0'); const millisecond = String(date.getMilliseconds()).padStart(3, '0'); return `${year}-${month}-${day} ${hour}:${minute}:${second}.${millisecond}`; } function normalizeLogLevel(level) { const normalized = String(level || 'info').trim().toLowerCase(); return LOG_LEVELS[normalized] ? normalized : 'info'; } function createLogger(loggingConfig = {}) { const settings = { enabled: loggingConfig.enabled !== false, console: loggingConfig.console !== false, dir: loggingConfig.dir || path.join(process.cwd(), 'logs'), filePrefix: loggingConfig.filePrefix || 'jhm-service', level: normalizeLogLevel(loggingConfig.level), }; let currentDate = ''; let currentFilePath = ''; function getDatePart(date = new Date()) { const year = date.getFullYear(); const month = String(date.getMonth() + 1).padStart(2, '0'); const day = String(date.getDate()).padStart(2, '0'); return `${year}${month}${day}`; } function ensureLogFilePath() { if (!settings.enabled) { return null; } const today = getDatePart(); if (currentFilePath && currentDate === today) { return currentFilePath; } fs.mkdirSync(settings.dir, { recursive: true }); currentDate = today; currentFilePath = path.join(settings.dir, `${settings.filePrefix}-${today}.log`); return currentFilePath; } function shouldWrite(level) { return LOG_LEVELS[level] >= LOG_LEVELS[settings.level]; } function format(level, message) { return `[${formatLocalTimestamp()}] [${level.toUpperCase()}] ${message}`; } function write(level, message) { const normalizedLevel = normalizeLogLevel(level); if (!shouldWrite(normalizedLevel)) { return; } const line = format(normalizedLevel, message); if (settings.console) { if (normalizedLevel === 'error') { console.error(line); } else if (normalizedLevel === 'warn') { console.warn(line); } else { console.log(line); } } const logFilePath = ensureLogFilePath(); if (logFilePath) { try { fs.appendFileSync(logFilePath, `${line}\n`, 'utf8'); } catch (error) { console.error(`[LOGGER] Failed to append log file: ${error.message}`); } } } return { debug(message) { write('debug', message); }, info(message) { write('info', message); }, warn(message) { write('warn', message); }, error(message) { write('error', message); }, getLogFilePath() { ensureLogFilePath(); return currentFilePath; }, async close() { currentDate = ''; }, }; } function parseArgs(argv) { const options = { configPath: getDefaultConfigPath(), }; for (let index = 2; index < argv.length; index += 1) { const arg = argv[index]; const value = argv[index + 1]; if (arg === '--config' && value) { options.configPath = path.resolve(value); index += 1; } else if (arg === '--help' || arg === '-h') { options.help = true; } } return options; } function printHelp() { console.log(`JHM TCP Socket Gateway Usage: node app.js node app.js --config ./config.json jhm-service.exe --config .\\runtime\\config.json ./jhm-service --config ./runtime/config.json Options: --config set config file path --help, -h show help `); } function loadConfig(configPath) { if (!fs.existsSync(configPath)) { throw new Error(`config file not found: ${configPath}`); } const content = fs.readFileSync(configPath, 'utf8'); const config = JSON.parse(content); validateConfig(config); return config; } function getSendChannels(config) { const channels = Array.isArray(config.send && config.send.channels) ? config.send.channels : ['mqtt']; return Array.from(new Set(channels .map((item) => String(item || '').trim().toLowerCase()) .filter(Boolean))); } function getSendOptions(config) { const send = config.send || {}; const mode = ['immediate', 'batch'].includes(String(send.mode || '').trim().toLowerCase()) ? String(send.mode).trim().toLowerCase() : 'batch'; return { mode, flushIntervalMs: Number(send.flushIntervalMs) > 0 ? Number(send.flushIntervalMs) : 60000, alignToMinute: send.alignToMinute !== false, includeDeviceIdField: send.includeDeviceIdField !== false, deviceIdField: send.deviceIdField || 'n', publishOnShutdown: send.publishOnShutdown !== false, }; } function getBloodPressureOptions(config) { const bloodPressure = config.protocol && config.protocol.bloodPressure; return { publishTime: bloodPressure ? bloodPressure.publishTime !== false : true, }; } function validateConfig(config) { if (!config.tcp || !Array.isArray(config.devices)) { throw new Error('config.json must include tcp and devices'); } if (!config.tcp.host || !config.tcp.port) { throw new Error('config.json must include tcp.host and tcp.port'); } const channels = getSendChannels(config); if (channels.length === 0) { throw new Error('config.json send.channels must enable at least one channel'); } if (channels.includes('mqtt')) { if (!config.mqtt) { throw new Error('config.json enabled mqtt but missing mqtt config'); } const hasBrokerUrl = Boolean(config.mqtt.brokerUrl); const hasHostMode = Boolean(config.mqtt.protocol && config.mqtt.host && config.mqtt.port); if (!hasBrokerUrl && !hasHostMode) { throw new Error('config.json mqtt requires brokerUrl or protocol/host/port'); } if (!config.mqtt.topicTemplate && !config.mqtt.defaultTopicPrefix) { throw new Error('config.json mqtt requires topicTemplate or defaultTopicPrefix'); } } if (channels.includes('aliyun')) { if (!config.aliyun) { throw new Error('config.json enabled aliyun but missing aliyun config'); } if (!config.aliyun.tupleApiBaseUrl && !config.aliyun.tupleApiUrl) { throw new Error('config.json aliyun requires tupleApiBaseUrl or tupleApiUrl'); } } if (!config.protocol || !config.protocol.alModelPath) { throw new Error('config.json protocol.alModelPath is required'); } const sendOptions = getSendOptions(config); if (sendOptions.flushIntervalMs <= 0) { throw new Error('config.json send.flushIntervalMs must be > 0'); } if (config.logging && config.logging.level) { const supportedLevels = ['debug', 'info', 'warn', 'error']; if (!supportedLevels.includes(String(config.logging.level).toLowerCase())) { throw new Error(`config.json logging.level only supports ${supportedLevels.join(', ')}`); } } } function resolveConfigPath(configFilePath, targetPath) { if (path.isAbsolute(targetPath)) { return targetPath; } return path.join(path.dirname(configFilePath), targetPath); } async function main() { const options = parseArgs(process.argv); if (options.help) { printHelp(); return; } const bootstrapLogger = createLogger({ enabled: false, console: true, }); bootstrapLogger.info(`[APP] Startup arguments parsed config=${options.configPath}`); const config = loadConfig(options.configPath); const logDir = resolveConfigPath(options.configPath, (config.logging && config.logging.dir) || './logs'); const logger = createLogger({ ...(config.logging || {}), dir: logDir, }); const sendChannels = getSendChannels(config); const sendOptions = getSendOptions(config); const bloodPressureOptions = getBloodPressureOptions(config); const alModelPath = resolveConfigPath(options.configPath, config.protocol.alModelPath); if (config.logging && config.logging.enabled === false) { logger.info('[APP] File logging is disabled; console output only'); } else { logger.info(`[APP] Local logging enabled dir=${logDir} file=${logger.getLogFilePath()} level=${normalizeLogLevel(config.logging && config.logging.level)}`); } logger.info(`[APP] Config loaded tcp=${config.tcp.host}:${config.tcp.port} devices=${config.devices.length} channels=${sendChannels.join(',')} sendMode=${sendOptions.mode} flushIntervalMs=${sendOptions.flushIntervalMs} alModel=${alModelPath}`); const mqttService = sendChannels.includes('mqtt') ? new MqttService(config.mqtt, logger) : null; const aliyunService = sendChannels.includes('aliyun') ? new AliyunService(config.aliyun, logger) : null; const aggregator = new MetricAggregator({ logger, ...sendOptions, onFlush: async (device, payload, meta) => { logger.info(`[APP] Batch dispatch deviceId=${device.deviceId} channels=${sendChannels.join(',')} reason=${meta.reason} payload=${JSON.stringify(payload)}`); let ok = true; if (mqttService) { try { mqttService.publish(device, payload); } catch (error) { ok = false; logger.error(`[APP] MQTT batch dispatch failed deviceId=${device.deviceId}: ${error.message}`); } } if (aliyunService) { try { const result = await aliyunService.publish(device, payload); if (result && result.skipped) { ok = false; logger.warn(`[APP] Aliyun batch dispatch skipped deviceId=${device.deviceId} reason=${result.reason}`); } else if (result && result.ok === false) { ok = false; logger.error(`[APP] Aliyun batch dispatch failed deviceId=${device.deviceId}: ${result.reason}`); } } catch (error) { ok = false; logger.error(`[APP] Aliyun batch dispatch exception deviceId=${device.deviceId}: ${error.message}`); } } return ok; }, }); const tcpService = new TcpService({ tcpConfig: config.tcp, devices: config.devices, alModelPath, publishBloodPressureTime: bloodPressureOptions.publishTime, logger, onMetric(device, metric) { logger.info(`[APP] Metric cached deviceId=${device.deviceId} mode=${sendOptions.mode} metric=${JSON.stringify(metric)}`); aggregator.ingest(device, metric).catch((error) => { logger.error(`[APP] Metric cache failed deviceId=${device.deviceId}: ${error.message}`); }); }, }); if (mqttService) { logger.info('[APP] Starting MQTT channel'); mqttService.start(); } if (aliyunService) { logger.info('[APP] Starting Aliyun channel'); aliyunService.start(); } aggregator.start(); await tcpService.start(); logger.info('[APP] All services started'); let shuttingDown = false; async function shutdown(signal) { if (shuttingDown) { return; } shuttingDown = true; logger.warn(`[APP] Received ${signal}, shutting down`); await tcpService.stop(); await aggregator.stop(); if (mqttService) { await mqttService.stop(); } if (aliyunService) { await aliyunService.stop(); } logger.info('[APP] Service shutdown completed'); await logger.close(); process.exit(0); } process.on('SIGINT', () => { shutdown('SIGINT'); }); process.on('SIGTERM', () => { shutdown('SIGTERM'); }); process.on('uncaughtException', (error) => { logger.error(`[APP] Uncaught exception: ${error.stack || error.message}`); }); process.on('unhandledRejection', (reason) => { logger.error(`[APP] Unhandled promise rejection: ${reason}`); }); } if (require.main === module) { main().catch((error) => { console.error(error.message); process.exit(1); }); } module.exports = { createLogger, formatLocalTimestamp, getBloodPressureOptions, getSendChannels, getSendOptions, loadConfig, main, normalizeLogLevel, parseArgs, printHelp, resolveConfigPath, validateConfig, };