#!/usr/bin/env node const fs = require('fs'); const path = require('path'); const { createLogger, normalizeLogLevel } = require('./logger'); const { getDefaultConfigPath } = require('./runtime-paths'); const { TcpService } = require('./tcp-service'); const { StateCache } = require('./state-cache'); const { MqttService } = require('./mqtt-service'); const { AliyunService } = require('./aliyun-service'); const { DashboardService, buildDashboardSnapshot } = require('./dashboard-service'); function parseArgs(argv) { const options = { configPath: getDefaultConfigPath(), }; for (let index = 2; index < argv.length; index += 1) { const arg = argv[index]; const value = argv[index + 1]; if (arg === '--config' && value) { options.configPath = path.resolve(value); index += 1; } else if (arg === '--help' || arg === '-h') { options.help = true; } } return options; } function printHelp() { console.log(`JH2028 TCP 网关服务 用法: node app.js node app.js --config ./config.json jh2028-service.exe --config .\\runtime\\config.json ./jh2028-service --config ./runtime/config.json 参数: --config 指定配置文件路径 --help, -h 显示帮助 `); } function loadConfig(configPath) { if (!fs.existsSync(configPath)) { throw new Error(`未找到配置文件: ${configPath}`); } const content = fs.readFileSync(configPath, 'utf8'); const config = JSON.parse(content); validateConfig(config); return config; } function resolveConfigPath(configFilePath, targetPath) { if (path.isAbsolute(targetPath)) { return targetPath; } return path.join(path.dirname(configFilePath), targetPath); } function getSendChannels(config) { const channels = Array.isArray(config.send && config.send.channels) ? config.send.channels : ['mqtt']; return Array.from(new Set(channels .map((item) => String(item || '').trim().toLowerCase()) .filter(Boolean))); } function getSendOptions(config) { const send = config.send || {}; return { includeDeviceIdField: send.includeDeviceIdField !== false, deviceIdField: send.deviceIdField || 'n', }; } function validateConfig(config) { if (!config.tcp || !Array.isArray(config.devices)) { throw new Error('config.json 必须包含 tcp 和 devices'); } if (!config.tcp.host || !config.tcp.port) { throw new Error('config.json 必须包含 tcp.host 和 tcp.port'); } if (!config.protocol || !config.protocol.alModelPath) { throw new Error('config.json 必须包含 protocol.alModelPath'); } const channels = getSendChannels(config); if (channels.length === 0) { throw new Error('config.json 的 send.channels 至少要启用一个通道'); } if (channels.includes('mqtt')) { if (!config.mqtt) { throw new Error('已启用 MQTT 通道,但缺少 mqtt 配置'); } const hasBrokerUrl = Boolean(config.mqtt.brokerUrl); const hasHostMode = Boolean(config.mqtt.protocol && config.mqtt.host && config.mqtt.port); if (!hasBrokerUrl && !hasHostMode) { throw new Error('mqtt 必须配置 brokerUrl 或 protocol/host/port'); } if (!config.mqtt.topicTemplate && !config.mqtt.defaultTopicPrefix) { throw new Error('mqtt 必须配置 topicTemplate 或 defaultTopicPrefix'); } } if (channels.includes('aliyun')) { if (!config.aliyun) { throw new Error('已启用阿里云通道,但缺少 aliyun 配置'); } if (!config.aliyun.tupleApiBaseUrl && !config.aliyun.tupleApiUrl) { throw new Error('aliyun 必须配置 tupleApiBaseUrl 或 tupleApiUrl'); } } } function createOnMetricHandler({ logger, cache, mqttService, aliyunService, dashboardService }) { return async function onMetric(device, metric, result) { const payload = cache.update(device, metric, result); logger.info(`[APP] 缓存已更新 设备=${device.deviceId} 类型=${result.messageType} 数据=${JSON.stringify(payload)}`); if (dashboardService) { dashboardService.broadcastSnapshot(); } const tasks = []; if (mqttService) { tasks.push(mqttService.publish(device, payload).catch((error) => { logger.error(`[APP] MQTT 上报失败 设备=${device.deviceId}: ${error.message}`); return { ok: false, reason: error.message }; })); } if (aliyunService) { tasks.push(aliyunService.publish(device, payload).catch((error) => { logger.error(`[APP] 阿里云上报失败 设备=${device.deviceId}: ${error.message}`); return { ok: false, reason: error.message }; })); } await Promise.all(tasks); }; } async function main() { const options = parseArgs(process.argv); if (options.help) { printHelp(); return; } const config = loadConfig(options.configPath); const logDir = resolveConfigPath(options.configPath, (config.logging && config.logging.dir) || './logs'); const logger = createLogger({ ...(config.logging || {}), dir: logDir, }); const sendChannels = getSendChannels(config); const sendOptions = getSendOptions(config); const alModelPath = resolveConfigPath(options.configPath, config.protocol.alModelPath); logger.info(`[APP] 配置加载完成 TCP=${config.tcp.host}:${config.tcp.port} 设备数量=${config.devices.length} 上报通道=${sendChannels.join(',')} 物模型=${alModelPath} 日志级别=${normalizeLogLevel(config.logging && config.logging.level)}`); const cache = new StateCache(sendOptions); const mqttService = sendChannels.includes('mqtt') ? new MqttService(config.mqtt, logger) : null; const aliyunService = sendChannels.includes('aliyun') ? new AliyunService(config.aliyun, logger) : null; let tcpService = null; const dashboardService = new DashboardService({ config: config.dashboard || {}, logger, getSnapshot: () => buildDashboardSnapshot({ devices: config.devices, cache, tcpService, config: config.dashboard || {}, }), }); if (mqttService) { mqttService.start(); } if (aliyunService) { aliyunService.start(); } tcpService = new TcpService({ tcpConfig: config.tcp, devices: config.devices, alModelPath, logger, logRawHex: config.logging && config.logging.logRawHex, onConnectionChange: () => { dashboardService.broadcastSnapshot(); }, onMetric: createOnMetricHandler({ logger, cache, mqttService, aliyunService, dashboardService, }), }); await dashboardService.start(); await tcpService.start(); let shuttingDown = false; async function shutdown(signal) { if (shuttingDown) { return; } shuttingDown = true; logger.warn(`[APP] 收到关闭信号 信号=${signal}`); await tcpService.stop(); await dashboardService.stop(); if (mqttService) { await mqttService.stop(); } if (aliyunService) { await aliyunService.stop(); } logger.info('[APP] 服务已停止'); await logger.close(); process.exit(0); } process.on('SIGINT', () => { shutdown('SIGINT'); }); process.on('SIGTERM', () => { shutdown('SIGTERM'); }); process.on('uncaughtException', (error) => { logger.error(`[APP] 未捕获异常: ${error.stack || error.message}`); }); process.on('unhandledRejection', (reason) => { logger.error(`[APP] 未处理的 Promise 拒绝: ${reason}`); }); } if (require.main === module) { main().catch((error) => { console.error(error.message); process.exit(1); }); } module.exports = { createOnMetricHandler, getSendChannels, getSendOptions, loadConfig, main, parseArgs, printHelp, resolveConfigPath, validateConfig, };