chenyc
2026-05-20 c8ba0f92b3f84273a78f06de25359db20c1b2a4d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
#!/usr/bin/env node
 
const fs = require('fs');
const path = require('path');
const { createLogger, normalizeLogLevel } = require('./logger');
const { getDefaultConfigPath } = require('./runtime-paths');
const { TcpService } = require('./tcp-service');
const { StateCache } = require('./state-cache');
const { MqttService } = require('./mqtt-service');
const { AliyunService } = require('./aliyun-service');
const { DashboardService, buildDashboardSnapshot } = require('./dashboard-service');
 
function parseArgs(argv) {
  const options = {
    configPath: getDefaultConfigPath(),
  };
 
  for (let index = 2; index < argv.length; index += 1) {
    const arg = argv[index];
    const value = argv[index + 1];
 
    if (arg === '--config' && value) {
      options.configPath = path.resolve(value);
      index += 1;
    } else if (arg === '--help' || arg === '-h') {
      options.help = true;
    }
  }
 
  return options;
}
 
function printHelp() {
  console.log(`JH2028 TCP 网关服务
 
用法:
  node app.js
  node app.js --config ./config.json
  jh2028-service.exe --config .\\runtime\\config.json
  ./jh2028-service --config ./runtime/config.json
 
参数:
  --config <path>    指定配置文件路径
  --help, -h         显示帮助
`);
}
 
function loadConfig(configPath) {
  if (!fs.existsSync(configPath)) {
    throw new Error(`未找到配置文件: ${configPath}`);
  }
 
  const content = fs.readFileSync(configPath, 'utf8');
  const config = JSON.parse(content);
  validateConfig(config);
  return config;
}
 
function resolveConfigPath(configFilePath, targetPath) {
  if (path.isAbsolute(targetPath)) {
    return targetPath;
  }
 
  return path.join(path.dirname(configFilePath), targetPath);
}
 
function getSendChannels(config) {
  const channels = Array.isArray(config.send && config.send.channels)
    ? config.send.channels
    : ['mqtt'];
 
  return Array.from(new Set(channels
    .map((item) => String(item || '').trim().toLowerCase())
    .filter(Boolean)));
}
 
function getSendOptions(config) {
  const send = config.send || {};
  return {
    includeDeviceIdField: send.includeDeviceIdField !== false,
    deviceIdField: send.deviceIdField || 'n',
  };
}
 
function validateConfig(config) {
  if (!config.tcp || !Array.isArray(config.devices)) {
    throw new Error('config.json 必须包含 tcp 和 devices');
  }
 
  if (!config.tcp.host || !config.tcp.port) {
    throw new Error('config.json 必须包含 tcp.host 和 tcp.port');
  }
 
  if (!config.protocol || !config.protocol.alModelPath) {
    throw new Error('config.json 必须包含 protocol.alModelPath');
  }
 
  const channels = getSendChannels(config);
  if (channels.length === 0) {
    throw new Error('config.json 的 send.channels 至少要启用一个通道');
  }
 
  if (channels.includes('mqtt')) {
    if (!config.mqtt) {
      throw new Error('已启用 MQTT 通道,但缺少 mqtt 配置');
    }
 
    const hasBrokerUrl = Boolean(config.mqtt.brokerUrl);
    const hasHostMode = Boolean(config.mqtt.protocol && config.mqtt.host && config.mqtt.port);
 
    if (!hasBrokerUrl && !hasHostMode) {
      throw new Error('mqtt 必须配置 brokerUrl 或 protocol/host/port');
    }
 
    if (!config.mqtt.topicTemplate && !config.mqtt.defaultTopicPrefix) {
      throw new Error('mqtt 必须配置 topicTemplate 或 defaultTopicPrefix');
    }
  }
 
  if (channels.includes('aliyun')) {
    if (!config.aliyun) {
      throw new Error('已启用阿里云通道,但缺少 aliyun 配置');
    }
 
    if (!config.aliyun.tupleApiBaseUrl && !config.aliyun.tupleApiUrl) {
      throw new Error('aliyun 必须配置 tupleApiBaseUrl 或 tupleApiUrl');
    }
  }
}
 
function createOnMetricHandler({ logger, cache, mqttService, aliyunService, dashboardService }) {
  return async function onMetric(device, metric, result) {
    const payload = cache.update(device, metric, result);
    logger.info(`[APP] 缓存已更新 设备=${device.deviceId} 类型=${result.messageType} 数据=${JSON.stringify(payload)}`);
 
    if (dashboardService) {
      dashboardService.broadcastSnapshot();
    }
 
    const tasks = [];
 
    if (mqttService) {
      tasks.push(mqttService.publish(device, payload).catch((error) => {
        logger.error(`[APP] MQTT 上报失败 设备=${device.deviceId}: ${error.message}`);
        return { ok: false, reason: error.message };
      }));
    }
 
    if (aliyunService) {
      tasks.push(aliyunService.publish(device, payload).catch((error) => {
        logger.error(`[APP] 阿里云上报失败 设备=${device.deviceId}: ${error.message}`);
        return { ok: false, reason: error.message };
      }));
    }
 
    await Promise.all(tasks);
  };
}
 
async function main() {
  const options = parseArgs(process.argv);
 
  if (options.help) {
    printHelp();
    return;
  }
 
  const config = loadConfig(options.configPath);
  const logDir = resolveConfigPath(options.configPath, (config.logging && config.logging.dir) || './logs');
  const logger = createLogger({
    ...(config.logging || {}),
    dir: logDir,
  });
  const sendChannels = getSendChannels(config);
  const sendOptions = getSendOptions(config);
  const alModelPath = resolveConfigPath(options.configPath, config.protocol.alModelPath);
 
  logger.info(`[APP] 配置加载完成 TCP=${config.tcp.host}:${config.tcp.port} 设备数量=${config.devices.length} 上报通道=${sendChannels.join(',')} 物模型=${alModelPath} 日志级别=${normalizeLogLevel(config.logging && config.logging.level)}`);
 
  const cache = new StateCache(sendOptions);
  const mqttService = sendChannels.includes('mqtt') ? new MqttService(config.mqtt, logger) : null;
  const aliyunService = sendChannels.includes('aliyun') ? new AliyunService(config.aliyun, logger) : null;
  let tcpService = null;
  const dashboardService = new DashboardService({
    config: config.dashboard || {},
    logger,
    getSnapshot: () => buildDashboardSnapshot({
      devices: config.devices,
      cache,
      tcpService,
      config: config.dashboard || {},
    }),
  });
 
  if (mqttService) {
    mqttService.start();
  }
 
  if (aliyunService) {
    aliyunService.start();
  }
 
  tcpService = new TcpService({
    tcpConfig: config.tcp,
    devices: config.devices,
    alModelPath,
    logger,
    logRawHex: config.logging && config.logging.logRawHex,
    onConnectionChange: () => {
      dashboardService.broadcastSnapshot();
    },
    onMetric: createOnMetricHandler({
      logger,
      cache,
      mqttService,
      aliyunService,
      dashboardService,
    }),
  });
 
  await dashboardService.start();
  await tcpService.start();
 
  let shuttingDown = false;
 
  async function shutdown(signal) {
    if (shuttingDown) {
      return;
    }
 
    shuttingDown = true;
    logger.warn(`[APP] 收到关闭信号 信号=${signal}`);
    await tcpService.stop();
    await dashboardService.stop();
 
    if (mqttService) {
      await mqttService.stop();
    }
 
    if (aliyunService) {
      await aliyunService.stop();
    }
 
    logger.info('[APP] 服务已停止');
    await logger.close();
    process.exit(0);
  }
 
  process.on('SIGINT', () => {
    shutdown('SIGINT');
  });
 
  process.on('SIGTERM', () => {
    shutdown('SIGTERM');
  });
 
  process.on('uncaughtException', (error) => {
    logger.error(`[APP] 未捕获异常: ${error.stack || error.message}`);
  });
 
  process.on('unhandledRejection', (reason) => {
    logger.error(`[APP] 未处理的 Promise 拒绝: ${reason}`);
  });
}
 
if (require.main === module) {
  main().catch((error) => {
    console.error(error.message);
    process.exit(1);
  });
}
 
module.exports = {
  createOnMetricHandler,
  getSendChannels,
  getSendOptions,
  loadConfig,
  main,
  parseArgs,
  printHelp,
  resolveConfigPath,
  validateConfig,
};