chenyc
2025-12-09 65e034683b28d799e73c7d7e5e4769fab5b9bc9c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.BufferedDuplex = void 0;
exports.writev = writev;
const readable_stream_1 = require("readable-stream");
const buffer_1 = require("buffer");
function writev(chunks, cb) {
    const buffers = new Array(chunks.length);
    for (let i = 0; i < chunks.length; i++) {
        if (typeof chunks[i].chunk === 'string') {
            buffers[i] = buffer_1.Buffer.from(chunks[i].chunk, 'utf8');
        }
        else {
            buffers[i] = chunks[i].chunk;
        }
    }
    this._write(buffer_1.Buffer.concat(buffers), 'binary', cb);
}
class BufferedDuplex extends readable_stream_1.Duplex {
    socket;
    proxy;
    isSocketOpen;
    writeQueue;
    constructor(opts, proxy, socket) {
        super({
            objectMode: true,
        });
        this.proxy = proxy;
        this.socket = socket;
        this.writeQueue = [];
        if (!opts.objectMode) {
            this._writev = writev.bind(this);
        }
        this.isSocketOpen = false;
        this.proxy.on('data', (chunk) => {
            if (!this.destroyed && this.readable) {
                this.push(chunk);
            }
        });
    }
    _read(size) {
        this.proxy.read(size);
    }
    _write(chunk, encoding, cb) {
        if (!this.isSocketOpen) {
            this.writeQueue.push({ chunk, encoding, cb });
        }
        else {
            this.writeToProxy(chunk, encoding, cb);
        }
    }
    _final(callback) {
        this.writeQueue = [];
        this.proxy.end(callback);
    }
    _destroy(err, callback) {
        this.writeQueue = [];
        this.proxy.destroy();
        callback(err);
    }
    socketReady() {
        this.emit('connect');
        this.isSocketOpen = true;
        this.processWriteQueue();
    }
    writeToProxy(chunk, encoding, cb) {
        if (this.proxy.write(chunk, encoding) === false) {
            this.proxy.once('drain', cb);
        }
        else {
            cb();
        }
    }
    processWriteQueue() {
        while (this.writeQueue.length > 0) {
            const { chunk, encoding, cb } = this.writeQueue.shift();
            this.writeToProxy(chunk, encoding, cb);
        }
    }
}
exports.BufferedDuplex = BufferedDuplex;
//# sourceMappingURL=BufferedDuplex.js.map