#!/usr/bin/env node
|
|
const fs = require('fs');
|
const path = require('path');
|
const { TcpService } = require('./tcp-service');
|
const { MqttService } = require('./mqtt-service');
|
const { AliyunService } = require('./aliyun-service');
|
const { MetricAggregator } = require('./metric-aggregator');
|
const { getDefaultConfigPath } = require('./runtime-paths');
|
|
const LOG_LEVELS = {
|
debug: 10,
|
info: 20,
|
warn: 30,
|
error: 40,
|
};
|
|
function formatLocalTimestamp(date = new Date()) {
|
const year = date.getFullYear();
|
const month = String(date.getMonth() + 1).padStart(2, '0');
|
const day = String(date.getDate()).padStart(2, '0');
|
const hour = String(date.getHours()).padStart(2, '0');
|
const minute = String(date.getMinutes()).padStart(2, '0');
|
const second = String(date.getSeconds()).padStart(2, '0');
|
const millisecond = String(date.getMilliseconds()).padStart(3, '0');
|
|
return `${year}-${month}-${day} ${hour}:${minute}:${second}.${millisecond}`;
|
}
|
|
function normalizeLogLevel(level) {
|
const normalized = String(level || 'info').trim().toLowerCase();
|
return LOG_LEVELS[normalized] ? normalized : 'info';
|
}
|
|
function createLogger(loggingConfig = {}) {
|
const settings = {
|
enabled: loggingConfig.enabled !== false,
|
console: loggingConfig.console !== false,
|
dir: loggingConfig.dir || path.join(process.cwd(), 'logs'),
|
filePrefix: loggingConfig.filePrefix || 'jhm-service',
|
level: normalizeLogLevel(loggingConfig.level),
|
};
|
let currentDate = '';
|
let currentFilePath = '';
|
|
function getDatePart(date = new Date()) {
|
const year = date.getFullYear();
|
const month = String(date.getMonth() + 1).padStart(2, '0');
|
const day = String(date.getDate()).padStart(2, '0');
|
return `${year}${month}${day}`;
|
}
|
|
function ensureLogFilePath() {
|
if (!settings.enabled) {
|
return null;
|
}
|
|
const today = getDatePart();
|
|
if (currentFilePath && currentDate === today) {
|
return currentFilePath;
|
}
|
|
fs.mkdirSync(settings.dir, { recursive: true });
|
currentDate = today;
|
currentFilePath = path.join(settings.dir, `${settings.filePrefix}-${today}.log`);
|
|
return currentFilePath;
|
}
|
|
function shouldWrite(level) {
|
return LOG_LEVELS[level] >= LOG_LEVELS[settings.level];
|
}
|
|
function format(level, message) {
|
return `[${formatLocalTimestamp()}] [${level.toUpperCase()}] ${message}`;
|
}
|
|
function write(level, message) {
|
const normalizedLevel = normalizeLogLevel(level);
|
|
if (!shouldWrite(normalizedLevel)) {
|
return;
|
}
|
|
const line = format(normalizedLevel, message);
|
|
if (settings.console) {
|
if (normalizedLevel === 'error') {
|
console.error(line);
|
} else if (normalizedLevel === 'warn') {
|
console.warn(line);
|
} else {
|
console.log(line);
|
}
|
}
|
|
const logFilePath = ensureLogFilePath();
|
|
if (logFilePath) {
|
try {
|
fs.appendFileSync(logFilePath, `${line}\n`, 'utf8');
|
} catch (error) {
|
console.error(`[LOGGER] Failed to append log file: ${error.message}`);
|
}
|
}
|
}
|
|
return {
|
debug(message) {
|
write('debug', message);
|
},
|
info(message) {
|
write('info', message);
|
},
|
warn(message) {
|
write('warn', message);
|
},
|
error(message) {
|
write('error', message);
|
},
|
getLogFilePath() {
|
ensureLogFilePath();
|
return currentFilePath;
|
},
|
async close() {
|
currentDate = '';
|
},
|
};
|
}
|
|
function parseArgs(argv) {
|
const options = {
|
configPath: getDefaultConfigPath(),
|
};
|
|
for (let index = 2; index < argv.length; index += 1) {
|
const arg = argv[index];
|
const value = argv[index + 1];
|
|
if (arg === '--config' && value) {
|
options.configPath = path.resolve(value);
|
index += 1;
|
} else if (arg === '--help' || arg === '-h') {
|
options.help = true;
|
}
|
}
|
|
return options;
|
}
|
|
function printHelp() {
|
console.log(`JHM TCP Socket Gateway
|
|
Usage:
|
node app.js
|
node app.js --config ./config.json
|
jhm-service.exe --config .\\runtime\\config.json
|
./jhm-service --config ./runtime/config.json
|
|
Options:
|
--config <path> set config file path
|
--help, -h show help
|
`);
|
}
|
|
function loadConfig(configPath) {
|
if (!fs.existsSync(configPath)) {
|
throw new Error(`config file not found: ${configPath}`);
|
}
|
|
const content = fs.readFileSync(configPath, 'utf8');
|
const config = JSON.parse(content);
|
|
validateConfig(config);
|
|
return config;
|
}
|
|
function getSendChannels(config) {
|
const channels = Array.isArray(config.send && config.send.channels)
|
? config.send.channels
|
: ['mqtt'];
|
|
return Array.from(new Set(channels
|
.map((item) => String(item || '').trim().toLowerCase())
|
.filter(Boolean)));
|
}
|
|
function getSendOptions(config) {
|
const send = config.send || {};
|
const mode = ['immediate', 'batch'].includes(String(send.mode || '').trim().toLowerCase())
|
? String(send.mode).trim().toLowerCase()
|
: 'batch';
|
|
return {
|
mode,
|
flushIntervalMs: Number(send.flushIntervalMs) > 0 ? Number(send.flushIntervalMs) : 60000,
|
alignToMinute: send.alignToMinute !== false,
|
includeDeviceIdField: send.includeDeviceIdField !== false,
|
deviceIdField: send.deviceIdField || 'n',
|
publishOnShutdown: send.publishOnShutdown !== false,
|
};
|
}
|
|
function getBloodPressureOptions(config) {
|
const bloodPressure = config.protocol && config.protocol.bloodPressure;
|
|
return {
|
publishTime: bloodPressure ? bloodPressure.publishTime !== false : true,
|
};
|
}
|
|
function validateConfig(config) {
|
if (!config.tcp || !Array.isArray(config.devices)) {
|
throw new Error('config.json must include tcp and devices');
|
}
|
|
if (!config.tcp.host || !config.tcp.port) {
|
throw new Error('config.json must include tcp.host and tcp.port');
|
}
|
|
const channels = getSendChannels(config);
|
|
if (channels.length === 0) {
|
throw new Error('config.json send.channels must enable at least one channel');
|
}
|
|
if (channels.includes('mqtt')) {
|
if (!config.mqtt) {
|
throw new Error('config.json enabled mqtt but missing mqtt config');
|
}
|
|
const hasBrokerUrl = Boolean(config.mqtt.brokerUrl);
|
const hasHostMode = Boolean(config.mqtt.protocol && config.mqtt.host && config.mqtt.port);
|
|
if (!hasBrokerUrl && !hasHostMode) {
|
throw new Error('config.json mqtt requires brokerUrl or protocol/host/port');
|
}
|
|
if (!config.mqtt.topicTemplate && !config.mqtt.defaultTopicPrefix) {
|
throw new Error('config.json mqtt requires topicTemplate or defaultTopicPrefix');
|
}
|
}
|
|
if (channels.includes('aliyun')) {
|
if (!config.aliyun) {
|
throw new Error('config.json enabled aliyun but missing aliyun config');
|
}
|
|
if (!config.aliyun.tupleApiBaseUrl && !config.aliyun.tupleApiUrl) {
|
throw new Error('config.json aliyun requires tupleApiBaseUrl or tupleApiUrl');
|
}
|
}
|
|
if (!config.protocol || !config.protocol.alModelPath) {
|
throw new Error('config.json protocol.alModelPath is required');
|
}
|
|
const sendOptions = getSendOptions(config);
|
|
if (sendOptions.flushIntervalMs <= 0) {
|
throw new Error('config.json send.flushIntervalMs must be > 0');
|
}
|
|
if (config.logging && config.logging.level) {
|
const supportedLevels = ['debug', 'info', 'warn', 'error'];
|
|
if (!supportedLevels.includes(String(config.logging.level).toLowerCase())) {
|
throw new Error(`config.json logging.level only supports ${supportedLevels.join(', ')}`);
|
}
|
}
|
}
|
|
function resolveConfigPath(configFilePath, targetPath) {
|
if (path.isAbsolute(targetPath)) {
|
return targetPath;
|
}
|
|
return path.join(path.dirname(configFilePath), targetPath);
|
}
|
|
async function main() {
|
const options = parseArgs(process.argv);
|
|
if (options.help) {
|
printHelp();
|
return;
|
}
|
|
const bootstrapLogger = createLogger({
|
enabled: false,
|
console: true,
|
});
|
bootstrapLogger.info(`[APP] Startup arguments parsed config=${options.configPath}`);
|
|
const config = loadConfig(options.configPath);
|
const logDir = resolveConfigPath(options.configPath, (config.logging && config.logging.dir) || './logs');
|
const logger = createLogger({
|
...(config.logging || {}),
|
dir: logDir,
|
});
|
const sendChannels = getSendChannels(config);
|
const sendOptions = getSendOptions(config);
|
const bloodPressureOptions = getBloodPressureOptions(config);
|
const alModelPath = resolveConfigPath(options.configPath, config.protocol.alModelPath);
|
|
if (config.logging && config.logging.enabled === false) {
|
logger.info('[APP] File logging is disabled; console output only');
|
} else {
|
logger.info(`[APP] Local logging enabled dir=${logDir} file=${logger.getLogFilePath()} level=${normalizeLogLevel(config.logging && config.logging.level)}`);
|
}
|
|
logger.info(`[APP] Config loaded tcp=${config.tcp.host}:${config.tcp.port} devices=${config.devices.length} channels=${sendChannels.join(',')} sendMode=${sendOptions.mode} flushIntervalMs=${sendOptions.flushIntervalMs} alModel=${alModelPath}`);
|
|
const mqttService = sendChannels.includes('mqtt') ? new MqttService(config.mqtt, logger) : null;
|
const aliyunService = sendChannels.includes('aliyun') ? new AliyunService(config.aliyun, logger) : null;
|
|
const aggregator = new MetricAggregator({
|
logger,
|
...sendOptions,
|
onFlush: async (device, payload, meta) => {
|
logger.info(`[APP] Batch dispatch deviceId=${device.deviceId} channels=${sendChannels.join(',')} reason=${meta.reason} payload=${JSON.stringify(payload)}`);
|
|
let ok = true;
|
|
if (mqttService) {
|
try {
|
mqttService.publish(device, payload);
|
} catch (error) {
|
ok = false;
|
logger.error(`[APP] MQTT batch dispatch failed deviceId=${device.deviceId}: ${error.message}`);
|
}
|
}
|
|
if (aliyunService) {
|
try {
|
const result = await aliyunService.publish(device, payload);
|
|
if (result && result.skipped) {
|
ok = false;
|
logger.warn(`[APP] Aliyun batch dispatch skipped deviceId=${device.deviceId} reason=${result.reason}`);
|
} else if (result && result.ok === false) {
|
ok = false;
|
logger.error(`[APP] Aliyun batch dispatch failed deviceId=${device.deviceId}: ${result.reason}`);
|
}
|
} catch (error) {
|
ok = false;
|
logger.error(`[APP] Aliyun batch dispatch exception deviceId=${device.deviceId}: ${error.message}`);
|
}
|
}
|
|
return ok;
|
},
|
});
|
|
const tcpService = new TcpService({
|
tcpConfig: config.tcp,
|
devices: config.devices,
|
alModelPath,
|
publishBloodPressureTime: bloodPressureOptions.publishTime,
|
logger,
|
onMetric(device, metric) {
|
logger.info(`[APP] Metric cached deviceId=${device.deviceId} mode=${sendOptions.mode} metric=${JSON.stringify(metric)}`);
|
aggregator.ingest(device, metric).catch((error) => {
|
logger.error(`[APP] Metric cache failed deviceId=${device.deviceId}: ${error.message}`);
|
});
|
},
|
});
|
|
if (mqttService) {
|
logger.info('[APP] Starting MQTT channel');
|
mqttService.start();
|
}
|
|
if (aliyunService) {
|
logger.info('[APP] Starting Aliyun channel');
|
aliyunService.start();
|
}
|
|
aggregator.start();
|
await tcpService.start();
|
logger.info('[APP] All services started');
|
|
let shuttingDown = false;
|
|
async function shutdown(signal) {
|
if (shuttingDown) {
|
return;
|
}
|
|
shuttingDown = true;
|
logger.warn(`[APP] Received ${signal}, shutting down`);
|
|
await tcpService.stop();
|
await aggregator.stop();
|
|
if (mqttService) {
|
await mqttService.stop();
|
}
|
|
if (aliyunService) {
|
await aliyunService.stop();
|
}
|
|
logger.info('[APP] Service shutdown completed');
|
await logger.close();
|
process.exit(0);
|
}
|
|
process.on('SIGINT', () => {
|
shutdown('SIGINT');
|
});
|
|
process.on('SIGTERM', () => {
|
shutdown('SIGTERM');
|
});
|
|
process.on('uncaughtException', (error) => {
|
logger.error(`[APP] Uncaught exception: ${error.stack || error.message}`);
|
});
|
|
process.on('unhandledRejection', (reason) => {
|
logger.error(`[APP] Unhandled promise rejection: ${reason}`);
|
});
|
}
|
|
if (require.main === module) {
|
main().catch((error) => {
|
console.error(error.message);
|
process.exit(1);
|
});
|
}
|
|
module.exports = {
|
createLogger,
|
formatLocalTimestamp,
|
getBloodPressureOptions,
|
getSendChannels,
|
getSendOptions,
|
loadConfig,
|
main,
|
normalizeLogLevel,
|
parseArgs,
|
printHelp,
|
resolveConfigPath,
|
validateConfig,
|
};
|