'use strict';
|
const net = require('net');
|
const tls = require('tls');
|
const Connection = require('./connection');
|
const MongoError = require('../error').MongoError;
|
const MongoNetworkError = require('../error').MongoNetworkError;
|
const MongoNetworkTimeoutError = require('../error').MongoNetworkTimeoutError;
|
const defaultAuthProviders = require('../auth/defaultAuthProviders').defaultAuthProviders;
|
const AuthContext = require('../auth/auth_provider').AuthContext;
|
const WIRE_CONSTANTS = require('../wireprotocol/constants');
|
const makeClientMetadata = require('../utils').makeClientMetadata;
|
const MAX_SUPPORTED_WIRE_VERSION = WIRE_CONSTANTS.MAX_SUPPORTED_WIRE_VERSION;
|
const MAX_SUPPORTED_SERVER_VERSION = WIRE_CONSTANTS.MAX_SUPPORTED_SERVER_VERSION;
|
const MIN_SUPPORTED_WIRE_VERSION = WIRE_CONSTANTS.MIN_SUPPORTED_WIRE_VERSION;
|
const MIN_SUPPORTED_SERVER_VERSION = WIRE_CONSTANTS.MIN_SUPPORTED_SERVER_VERSION;
|
let AUTH_PROVIDERS;
|
|
function connect(options, cancellationToken, callback) {
|
if (typeof cancellationToken === 'function') {
|
callback = cancellationToken;
|
cancellationToken = undefined;
|
}
|
|
const ConnectionType = options && options.connectionType ? options.connectionType : Connection;
|
if (AUTH_PROVIDERS == null) {
|
AUTH_PROVIDERS = defaultAuthProviders(options.bson);
|
}
|
|
const family = options.family !== void 0 ? options.family : 0;
|
makeConnection(family, options, cancellationToken, (err, socket) => {
|
if (err) {
|
callback(err, socket); // in the error case, `socket` is the originating error event name
|
return;
|
}
|
|
performInitialHandshake(new ConnectionType(socket, options), options, callback);
|
});
|
}
|
|
function isModernConnectionType(conn) {
|
return !(conn instanceof Connection);
|
}
|
|
function checkSupportedServer(ismaster, options) {
|
const serverVersionHighEnough =
|
ismaster &&
|
typeof ismaster.maxWireVersion === 'number' &&
|
ismaster.maxWireVersion >= MIN_SUPPORTED_WIRE_VERSION;
|
const serverVersionLowEnough =
|
ismaster &&
|
typeof ismaster.minWireVersion === 'number' &&
|
ismaster.minWireVersion <= MAX_SUPPORTED_WIRE_VERSION;
|
|
if (serverVersionHighEnough) {
|
if (serverVersionLowEnough) {
|
return null;
|
}
|
|
const message = `Server at ${options.host}:${options.port} reports minimum wire version ${ismaster.minWireVersion}, but this version of the Node.js Driver requires at most ${MAX_SUPPORTED_WIRE_VERSION} (MongoDB ${MAX_SUPPORTED_SERVER_VERSION})`;
|
return new MongoError(message);
|
}
|
|
const message = `Server at ${options.host}:${
|
options.port
|
} reports maximum wire version ${ismaster.maxWireVersion ||
|
0}, but this version of the Node.js Driver requires at least ${MIN_SUPPORTED_WIRE_VERSION} (MongoDB ${MIN_SUPPORTED_SERVER_VERSION})`;
|
return new MongoError(message);
|
}
|
|
function performInitialHandshake(conn, options, _callback) {
|
const callback = function(err, ret) {
|
if (err && conn) {
|
conn.destroy();
|
}
|
_callback(err, ret);
|
};
|
|
const credentials = options.credentials;
|
if (credentials) {
|
if (!credentials.mechanism.match(/DEFAULT/i) && !AUTH_PROVIDERS[credentials.mechanism]) {
|
callback(new MongoError(`authMechanism '${credentials.mechanism}' not supported`));
|
return;
|
}
|
}
|
|
const authContext = new AuthContext(conn, credentials, options);
|
prepareHandshakeDocument(authContext, (err, handshakeDoc) => {
|
if (err) {
|
return callback(err);
|
}
|
|
const handshakeOptions = Object.assign({}, options);
|
if (options.connectTimeoutMS || options.connectionTimeout) {
|
// The handshake technically is a monitoring check, so its socket timeout should be connectTimeoutMS
|
handshakeOptions.socketTimeout = options.connectTimeoutMS || options.connectionTimeout;
|
}
|
|
handshakeDoc.helloOk = !!options.useUnifiedTopology;
|
|
const start = new Date().getTime();
|
conn.command('admin.$cmd', handshakeDoc, handshakeOptions, (err, result) => {
|
if (err) {
|
callback(err);
|
return;
|
}
|
|
const response = result.result;
|
if (response.ok === 0) {
|
callback(new MongoError(response));
|
return;
|
}
|
|
if ('isWritablePrimary' in response) {
|
// Provide pre-hello-style response document.
|
response.ismaster = response.isWritablePrimary;
|
}
|
|
if (options.useUnifiedTopology && response.helloOk) {
|
conn.helloOk = true;
|
}
|
|
const supportedServerErr = checkSupportedServer(response, options);
|
if (supportedServerErr) {
|
callback(supportedServerErr);
|
return;
|
}
|
|
if (!isModernConnectionType(conn)) {
|
// resolve compression
|
if (response.compression) {
|
const agreedCompressors = handshakeDoc.compression.filter(
|
compressor => response.compression.indexOf(compressor) !== -1
|
);
|
|
if (agreedCompressors.length) {
|
conn.agreedCompressor = agreedCompressors[0];
|
}
|
|
if (options.compression && options.compression.zlibCompressionLevel) {
|
conn.zlibCompressionLevel = options.compression.zlibCompressionLevel;
|
}
|
}
|
}
|
|
// NOTE: This is metadata attached to the connection while porting away from
|
// handshake being done in the `Server` class. Likely, it should be
|
// relocated, or at very least restructured.
|
conn.ismaster = response;
|
conn.lastIsMasterMS = new Date().getTime() - start;
|
|
if (!response.arbiterOnly && credentials) {
|
// store the response on auth context
|
Object.assign(authContext, { response });
|
|
const resolvedCredentials = credentials.resolveAuthMechanism(response);
|
const authProvider = AUTH_PROVIDERS[resolvedCredentials.mechanism];
|
authProvider.auth(authContext, err => {
|
if (err) return callback(err);
|
callback(undefined, conn);
|
});
|
|
return;
|
}
|
|
callback(undefined, conn);
|
});
|
});
|
}
|
|
function prepareHandshakeDocument(authContext, callback) {
|
const options = authContext.options;
|
const serverApi = authContext.connection.serverApi;
|
const compressors =
|
options.compression && options.compression.compressors ? options.compression.compressors : [];
|
|
const handshakeDoc = {
|
[serverApi ? 'hello' : 'ismaster']: true,
|
client: options.metadata || makeClientMetadata(options),
|
compression: compressors
|
};
|
|
const credentials = authContext.credentials;
|
if (credentials) {
|
if (credentials.mechanism.match(/DEFAULT/i) && credentials.username) {
|
Object.assign(handshakeDoc, {
|
saslSupportedMechs: `${credentials.source}.${credentials.username}`
|
});
|
|
AUTH_PROVIDERS['scram-sha-256'].prepare(handshakeDoc, authContext, callback);
|
return;
|
}
|
|
const authProvider = AUTH_PROVIDERS[credentials.mechanism];
|
if (authProvider == null) {
|
return callback(new MongoError(`No AuthProvider for ${credentials.mechanism} defined.`));
|
}
|
authProvider.prepare(handshakeDoc, authContext, callback);
|
return;
|
}
|
|
callback(undefined, handshakeDoc);
|
}
|
|
const LEGAL_SSL_SOCKET_OPTIONS = [
|
'pfx',
|
'key',
|
'passphrase',
|
'cert',
|
'ca',
|
'ciphers',
|
'NPNProtocols',
|
'ALPNProtocols',
|
'servername',
|
'ecdhCurve',
|
'secureProtocol',
|
'secureContext',
|
'session',
|
'minDHSize',
|
'crl',
|
'rejectUnauthorized'
|
];
|
|
function parseConnectOptions(family, options) {
|
const host = typeof options.host === 'string' ? options.host : 'localhost';
|
if (host.indexOf('/') !== -1) {
|
return { path: host };
|
}
|
|
const result = {
|
family,
|
host,
|
port: typeof options.port === 'number' ? options.port : 27017,
|
rejectUnauthorized: false
|
};
|
|
return result;
|
}
|
|
function parseSslOptions(family, options) {
|
const result = parseConnectOptions(family, options);
|
|
// Merge in valid SSL options
|
for (const name in options) {
|
if (options[name] != null && LEGAL_SSL_SOCKET_OPTIONS.indexOf(name) !== -1) {
|
result[name] = options[name];
|
}
|
}
|
|
// Override checkServerIdentity behavior
|
if (options.checkServerIdentity === false) {
|
// Skip the identiy check by retuning undefined as per node documents
|
// https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
|
result.checkServerIdentity = function() {
|
return undefined;
|
};
|
} else if (typeof options.checkServerIdentity === 'function') {
|
result.checkServerIdentity = options.checkServerIdentity;
|
}
|
|
// Set default sni servername to be the same as host
|
if (result.servername == null && !net.isIP(result.host)) {
|
result.servername = result.host;
|
}
|
|
return result;
|
}
|
|
const SOCKET_ERROR_EVENTS = new Set(['error', 'close', 'timeout', 'parseError']);
|
function makeConnection(family, options, cancellationToken, _callback) {
|
const useSsl = typeof options.ssl === 'boolean' ? options.ssl : false;
|
const keepAlive = typeof options.keepAlive === 'boolean' ? options.keepAlive : true;
|
let keepAliveInitialDelay =
|
typeof options.keepAliveInitialDelay === 'number' ? options.keepAliveInitialDelay : 120000;
|
const noDelay = typeof options.noDelay === 'boolean' ? options.noDelay : true;
|
const connectionTimeout =
|
typeof options.connectionTimeout === 'number'
|
? options.connectionTimeout
|
: typeof options.connectTimeoutMS === 'number'
|
? options.connectTimeoutMS
|
: 30000;
|
const socketTimeoutMS =
|
typeof options.socketTimeoutMS === 'number'
|
? options.socketTimeoutMS
|
: typeof options.socketTimeout === 'number'
|
? options.socketTimeout
|
: 0;
|
const rejectUnauthorized =
|
typeof options.rejectUnauthorized === 'boolean' ? options.rejectUnauthorized : true;
|
|
if (keepAliveInitialDelay > socketTimeoutMS) {
|
keepAliveInitialDelay = Math.round(socketTimeoutMS / 2);
|
}
|
|
let socket;
|
const callback = function(err, ret) {
|
if (err && socket) {
|
socket.destroy();
|
}
|
|
_callback(err, ret);
|
};
|
|
try {
|
if (useSsl) {
|
socket = tls.connect(parseSslOptions(family, options));
|
if (typeof socket.disableRenegotiation === 'function') {
|
socket.disableRenegotiation();
|
}
|
} else {
|
socket = net.createConnection(parseConnectOptions(family, options));
|
}
|
} catch (err) {
|
return callback(err);
|
}
|
|
socket.setKeepAlive(keepAlive, keepAliveInitialDelay);
|
socket.setTimeout(connectionTimeout);
|
socket.setNoDelay(noDelay);
|
|
const connectEvent = useSsl ? 'secureConnect' : 'connect';
|
let cancellationHandler;
|
function errorHandler(eventName) {
|
return err => {
|
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
|
if (cancellationHandler) {
|
cancellationToken.removeListener('cancel', cancellationHandler);
|
}
|
|
socket.removeListener(connectEvent, connectHandler);
|
callback(connectionFailureError(eventName, err));
|
};
|
}
|
|
function connectHandler() {
|
SOCKET_ERROR_EVENTS.forEach(event => socket.removeAllListeners(event));
|
if (cancellationHandler) {
|
cancellationToken.removeListener('cancel', cancellationHandler);
|
}
|
|
if (socket.authorizationError && rejectUnauthorized) {
|
return callback(socket.authorizationError);
|
}
|
|
socket.setTimeout(socketTimeoutMS);
|
callback(null, socket);
|
}
|
|
SOCKET_ERROR_EVENTS.forEach(event => socket.once(event, errorHandler(event)));
|
if (cancellationToken) {
|
cancellationHandler = errorHandler('cancel');
|
cancellationToken.once('cancel', cancellationHandler);
|
}
|
|
socket.once(connectEvent, connectHandler);
|
}
|
|
function connectionFailureError(type, err) {
|
switch (type) {
|
case 'error':
|
return new MongoNetworkError(err);
|
case 'timeout':
|
return new MongoNetworkTimeoutError(`connection timed out`);
|
case 'close':
|
return new MongoNetworkError(`connection closed`);
|
case 'cancel':
|
return new MongoNetworkError(`connection establishment was cancelled`);
|
default:
|
return new MongoNetworkError(`unknown network error`);
|
}
|
}
|
|
module.exports = connect;
|