const net = require('net');
|
const { Jh2028Decoder, bytesToHex } = require('./decoder');
|
|
function normalizeIp(address) {
|
if (!address) {
|
return '';
|
}
|
|
if (address.startsWith('::ffff:')) {
|
return address.slice(7);
|
}
|
|
return address;
|
}
|
|
function formatReceivedTimestamp(date = new Date()) {
|
const year = date.getFullYear();
|
const month = String(date.getMonth() + 1).padStart(2, '0');
|
const day = String(date.getDate()).padStart(2, '0');
|
const hour = String(date.getHours()).padStart(2, '0');
|
const minute = String(date.getMinutes()).padStart(2, '0');
|
const second = String(date.getSeconds()).padStart(2, '0');
|
return `${year}-${month}-${day} ${hour}:${minute}:${second}`;
|
}
|
|
class TcpService {
|
constructor(options = {}) {
|
this.tcpConfig = options.tcpConfig || {};
|
this.devices = options.devices || [];
|
this.alModelPath = options.alModelPath;
|
this.logger = options.logger || console;
|
this.logRawHex = Boolean(options.logRawHex);
|
this.onMetric = typeof options.onMetric === 'function' ? options.onMetric : async () => {};
|
this.onConnectionChange = typeof options.onConnectionChange === 'function' ? options.onConnectionChange : () => {};
|
this.server = null;
|
this.sessions = new Map();
|
this.socketsByDeviceId = new Map();
|
this.deviceMap = this.buildDeviceMap(this.devices);
|
}
|
|
buildDeviceMap(devices) {
|
const map = new Map();
|
|
for (const device of devices) {
|
if (!device || !device.ip || !device.deviceId) {
|
continue;
|
}
|
|
map.set(normalizeIp(device.ip), {
|
...device,
|
name: device.name || device.deviceId,
|
});
|
}
|
|
return map;
|
}
|
|
async start() {
|
this.server = net.createServer((socket) => {
|
this.handleConnection(socket);
|
});
|
|
this.server.maxConnections = this.tcpConfig.maxConnections || 100;
|
this.server.on('error', (error) => {
|
this.logger.error(`[TCP] 服务异常: ${error.message}`);
|
});
|
|
await new Promise((resolve, reject) => {
|
this.server.once('listening', resolve);
|
this.server.once('error', reject);
|
this.server.listen({
|
host: this.tcpConfig.host,
|
port: this.tcpConfig.port,
|
backlog: this.tcpConfig.backlog || 128,
|
});
|
});
|
|
this.logger.info(`[TCP] 已开始监听 ${this.tcpConfig.host}:${this.tcpConfig.port} 设备数量=${this.deviceMap.size}`);
|
}
|
|
handleConnection(socket) {
|
const clientIp = normalizeIp(socket.remoteAddress);
|
const device = this.deviceMap.get(clientIp);
|
|
if (!device) {
|
this.logger.warn(`[TCP] 未配置设备接入 IP=${clientIp} 端口=${socket.remotePort},连接将关闭`);
|
socket.destroy();
|
return;
|
}
|
|
const oldSocket = this.socketsByDeviceId.get(device.deviceId);
|
if (oldSocket && oldSocket !== socket) {
|
this.logger.warn(`[TCP] 同设备新连接接入,将关闭旧连接 设备=${device.deviceId} IP=${clientIp}`);
|
oldSocket.destroy();
|
}
|
|
const decoder = new Jh2028Decoder({
|
alModelPath: this.alModelPath,
|
maxBufferBytes: this.tcpConfig.maxBufferBytes || 8192,
|
});
|
|
this.sessions.set(socket, {
|
clientIp,
|
device,
|
decoder,
|
connectedAt: Date.now(),
|
lastDataAt: 0,
|
remotePort: socket.remotePort,
|
});
|
this.socketsByDeviceId.set(device.deviceId, socket);
|
|
if (this.tcpConfig.keepAlive) {
|
socket.setKeepAlive(true, this.tcpConfig.keepAliveDelayMs || 10000);
|
}
|
|
socket.setNoDelay(Boolean(this.tcpConfig.noDelay));
|
socket.setTimeout(this.tcpConfig.socketTimeoutMs || 120000);
|
|
this.logger.info(`[TCP] 设备已连接 设备=${device.deviceId} IP=${clientIp} 端口=${socket.remotePort}`);
|
this.notifyConnectionChange(device);
|
|
socket.on('data', (chunk) => {
|
this.handleData(socket, chunk);
|
});
|
|
socket.on('timeout', () => {
|
this.logger.warn(`[TCP] 连接超时 设备=${device.deviceId} IP=${clientIp}`);
|
socket.destroy();
|
});
|
|
socket.on('error', (error) => {
|
this.logger.error(`[TCP] 连接异常 设备=${device.deviceId} IP=${clientIp}: ${error.message}`);
|
});
|
|
socket.on('close', () => {
|
this.sessions.delete(socket);
|
if (this.socketsByDeviceId.get(device.deviceId) === socket) {
|
this.socketsByDeviceId.delete(device.deviceId);
|
}
|
this.logger.info(`[TCP] 设备已断开 设备=${device.deviceId} IP=${clientIp}`);
|
this.notifyConnectionChange(device);
|
});
|
}
|
|
handleData(socket, chunk) {
|
const session = this.sessions.get(socket);
|
if (!session) {
|
return;
|
}
|
|
this.logger.info(`[TCP] 收到原始数据 设备=${session.device.deviceId} 字节数=${chunk.length}`);
|
session.lastDataAt = Date.now();
|
if (this.logRawHex) {
|
this.logger.debug(`[TCP] 原始报文 设备=${session.device.deviceId} ${bytesToHex(chunk)}`);
|
}
|
|
const results = session.decoder.push(chunk);
|
if (results.length === 0) {
|
this.logger.warn(`[TCP] 当前数据片段尚未组成完整报文 设备=${session.device.deviceId} 字节数=${chunk.length}`);
|
return;
|
}
|
|
for (const result of results) {
|
if (!result.publish) {
|
const level = result.ok ? 'warn' : 'warn';
|
this.logger[level](`[TCP] 跳过未发布报文 设备=${session.device.deviceId} 原因=${result.reason} 原始报文=${result.rawHex || ''}`);
|
continue;
|
}
|
|
const metric = { ...result.metric };
|
if (result.messageType === 'blood-pressure') {
|
metric.M = formatReceivedTimestamp();
|
}
|
|
this.logger.info(`[TCP] 解析到指标 设备=${session.device.deviceId} 类型=${result.messageType} 指标=${JSON.stringify(metric)}`);
|
Promise.resolve(this.onMetric(session.device, metric, result)).catch((error) => {
|
this.logger.error(`[TCP] 指标处理失败 设备=${session.device.deviceId}: ${error.message}`);
|
});
|
}
|
}
|
|
async stop() {
|
this.logger.info(`[TCP] 正在停止 TCP 服务 当前连接数=${this.sessions.size}`);
|
|
for (const socket of this.sessions.keys()) {
|
socket.destroy();
|
}
|
|
this.sessions.clear();
|
this.socketsByDeviceId.clear();
|
|
if (!this.server) {
|
return;
|
}
|
|
await new Promise((resolve) => {
|
this.server.close(resolve);
|
});
|
|
this.server = null;
|
this.logger.info('[TCP] TCP 服务已停止');
|
}
|
|
notifyConnectionChange(device) {
|
Promise.resolve(this.onConnectionChange(device, this.getConnectionSnapshot())).catch((error) => {
|
this.logger.error(`[TCP] 连接状态通知失败 设备=${device.deviceId}: ${error.message}`);
|
});
|
}
|
|
getConnectionSnapshot() {
|
return Array.from(this.sessions.values()).map((session) => ({
|
deviceId: session.device.deviceId,
|
ip: session.clientIp,
|
name: session.device.name || session.device.deviceId,
|
remotePort: session.remotePort,
|
connectedAt: session.connectedAt,
|
lastDataAt: session.lastDataAt,
|
online: true,
|
}));
|
}
|
}
|
|
module.exports = {
|
TcpService,
|
formatReceivedTimestamp,
|
normalizeIp,
|
};
|