'use strict';
|
|
const MONGODB_ERROR_CODES = require('../../error_codes').MONGODB_ERROR_CODES;
|
const ReadPreference = require('./read_preference');
|
const TopologyType = require('../sdam/common').TopologyType;
|
const MongoError = require('../error').MongoError;
|
const isRetryableWriteError = require('../error').isRetryableWriteError;
|
const maxWireVersion = require('../utils').maxWireVersion;
|
const MongoNetworkError = require('../error').MongoNetworkError;
|
|
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
|
|
/**
|
* Emit event if it exists
|
* @method
|
*/
|
function emitSDAMEvent(self, event, description) {
|
if (self.listeners(event).length > 0) {
|
self.emit(event, description);
|
}
|
}
|
|
function createCompressionInfo(options) {
|
if (!options.compression || !options.compression.compressors) {
|
return [];
|
}
|
|
// Check that all supplied compressors are valid
|
options.compression.compressors.forEach(function(compressor) {
|
if (compressor !== 'snappy' && compressor !== 'zlib') {
|
throw new Error('compressors must be at least one of snappy or zlib');
|
}
|
});
|
|
return options.compression.compressors;
|
}
|
|
function clone(object) {
|
return JSON.parse(JSON.stringify(object));
|
}
|
|
var getPreviousDescription = function(self) {
|
if (!self.s.serverDescription) {
|
self.s.serverDescription = {
|
address: self.name,
|
arbiters: [],
|
hosts: [],
|
passives: [],
|
type: 'Unknown'
|
};
|
}
|
|
return self.s.serverDescription;
|
};
|
|
var emitServerDescriptionChanged = function(self, description) {
|
if (self.listeners('serverDescriptionChanged').length > 0) {
|
// Emit the server description changed events
|
self.emit('serverDescriptionChanged', {
|
topologyId: self.s.topologyId !== -1 ? self.s.topologyId : self.id,
|
address: self.name,
|
previousDescription: getPreviousDescription(self),
|
newDescription: description
|
});
|
|
self.s.serverDescription = description;
|
}
|
};
|
|
var getPreviousTopologyDescription = function(self) {
|
if (!self.s.topologyDescription) {
|
self.s.topologyDescription = {
|
topologyType: 'Unknown',
|
servers: [
|
{
|
address: self.name,
|
arbiters: [],
|
hosts: [],
|
passives: [],
|
type: 'Unknown'
|
}
|
]
|
};
|
}
|
|
return self.s.topologyDescription;
|
};
|
|
var emitTopologyDescriptionChanged = function(self, description) {
|
if (self.listeners('topologyDescriptionChanged').length > 0) {
|
// Emit the server description changed events
|
self.emit('topologyDescriptionChanged', {
|
topologyId: self.s.topologyId !== -1 ? self.s.topologyId : self.id,
|
address: self.name,
|
previousDescription: getPreviousTopologyDescription(self),
|
newDescription: description
|
});
|
|
self.s.serverDescription = description;
|
}
|
};
|
|
var changedIsMaster = function(self, currentIsmaster, ismaster) {
|
var currentType = getTopologyType(self, currentIsmaster);
|
var newType = getTopologyType(self, ismaster);
|
if (newType !== currentType) return true;
|
return false;
|
};
|
|
var getTopologyType = function(self, ismaster) {
|
if (!ismaster) {
|
ismaster = self.ismaster;
|
}
|
|
if (!ismaster) return 'Unknown';
|
if (ismaster.ismaster && ismaster.msg === 'isdbgrid') return 'Mongos';
|
if (ismaster.ismaster && !ismaster.hosts) return 'Standalone';
|
if (ismaster.ismaster) return 'RSPrimary';
|
if (ismaster.secondary) return 'RSSecondary';
|
if (ismaster.arbiterOnly) return 'RSArbiter';
|
return 'Unknown';
|
};
|
|
var inquireServerState = function(self) {
|
return function(callback) {
|
if (self.s.state === 'destroyed') return;
|
// Record response time
|
var start = new Date().getTime();
|
|
// emitSDAMEvent
|
emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: self.name });
|
|
// Attempt to execute ismaster command
|
self.command('admin.$cmd', { ismaster: true }, { monitoring: true }, function(err, r) {
|
if (!err) {
|
// Legacy event sender
|
self.emit('ismaster', r, self);
|
|
// Calculate latencyMS
|
var latencyMS = new Date().getTime() - start;
|
|
// Server heart beat event
|
emitSDAMEvent(self, 'serverHeartbeatSucceeded', {
|
durationMS: latencyMS,
|
reply: r.result,
|
connectionId: self.name
|
});
|
|
// Did the server change
|
if (changedIsMaster(self, self.s.ismaster, r.result)) {
|
// Emit server description changed if something listening
|
emitServerDescriptionChanged(self, {
|
address: self.name,
|
arbiters: [],
|
hosts: [],
|
passives: [],
|
type: !self.s.inTopology ? 'Standalone' : getTopologyType(self)
|
});
|
}
|
|
// Updat ismaster view
|
self.s.ismaster = r.result;
|
|
// Set server response time
|
self.s.isMasterLatencyMS = latencyMS;
|
} else {
|
emitSDAMEvent(self, 'serverHeartbeatFailed', {
|
durationMS: latencyMS,
|
failure: err,
|
connectionId: self.name
|
});
|
}
|
|
// Peforming an ismaster monitoring callback operation
|
if (typeof callback === 'function') {
|
return callback(err, r);
|
}
|
|
// Perform another sweep
|
self.s.inquireServerStateTimeout = setTimeout(inquireServerState(self), self.s.haInterval);
|
});
|
};
|
};
|
|
//
|
// Clone the options
|
var cloneOptions = function(options) {
|
var opts = {};
|
for (var name in options) {
|
opts[name] = options[name];
|
}
|
return opts;
|
};
|
|
function Interval(fn, time) {
|
var timer = false;
|
|
this.start = function() {
|
if (!this.isRunning()) {
|
timer = setInterval(fn, time);
|
}
|
|
return this;
|
};
|
|
this.stop = function() {
|
clearInterval(timer);
|
timer = false;
|
return this;
|
};
|
|
this.isRunning = function() {
|
return timer !== false;
|
};
|
}
|
|
function Timeout(fn, time) {
|
var timer = false;
|
var func = () => {
|
if (timer) {
|
clearTimeout(timer);
|
timer = false;
|
|
fn();
|
}
|
};
|
|
this.start = function() {
|
if (!this.isRunning()) {
|
timer = setTimeout(func, time);
|
}
|
return this;
|
};
|
|
this.stop = function() {
|
clearTimeout(timer);
|
timer = false;
|
return this;
|
};
|
|
this.isRunning = function() {
|
return timer !== false;
|
};
|
}
|
|
function diff(previous, current) {
|
// Difference document
|
var diff = {
|
servers: []
|
};
|
|
// Previous entry
|
if (!previous) {
|
previous = { servers: [] };
|
}
|
|
// Check if we have any previous servers missing in the current ones
|
for (var i = 0; i < previous.servers.length; i++) {
|
var found = false;
|
|
for (var j = 0; j < current.servers.length; j++) {
|
if (current.servers[j].address.toLowerCase() === previous.servers[i].address.toLowerCase()) {
|
found = true;
|
break;
|
}
|
}
|
|
if (!found) {
|
// Add to the diff
|
diff.servers.push({
|
address: previous.servers[i].address,
|
from: previous.servers[i].type,
|
to: 'Unknown'
|
});
|
}
|
}
|
|
// Check if there are any severs that don't exist
|
for (j = 0; j < current.servers.length; j++) {
|
found = false;
|
|
// Go over all the previous servers
|
for (i = 0; i < previous.servers.length; i++) {
|
if (previous.servers[i].address.toLowerCase() === current.servers[j].address.toLowerCase()) {
|
found = true;
|
break;
|
}
|
}
|
|
// Add the server to the diff
|
if (!found) {
|
diff.servers.push({
|
address: current.servers[j].address,
|
from: 'Unknown',
|
to: current.servers[j].type
|
});
|
}
|
}
|
|
// Got through all the servers
|
for (i = 0; i < previous.servers.length; i++) {
|
var prevServer = previous.servers[i];
|
|
// Go through all current servers
|
for (j = 0; j < current.servers.length; j++) {
|
var currServer = current.servers[j];
|
|
// Matching server
|
if (prevServer.address.toLowerCase() === currServer.address.toLowerCase()) {
|
// We had a change in state
|
if (prevServer.type !== currServer.type) {
|
diff.servers.push({
|
address: prevServer.address,
|
from: prevServer.type,
|
to: currServer.type
|
});
|
}
|
}
|
}
|
}
|
|
// Return difference
|
return diff;
|
}
|
|
/**
|
* Shared function to determine clusterTime for a given topology
|
*
|
* @param {*} topology
|
* @param {*} clusterTime
|
*/
|
function resolveClusterTime(topology, $clusterTime) {
|
if (topology.clusterTime == null) {
|
topology.clusterTime = $clusterTime;
|
} else {
|
if ($clusterTime.clusterTime.greaterThan(topology.clusterTime.clusterTime)) {
|
topology.clusterTime = $clusterTime;
|
}
|
}
|
}
|
|
// NOTE: this is a temporary move until the topologies can be more formally refactored
|
// to share code.
|
const SessionMixins = {
|
endSessions: function(sessions, callback) {
|
if (!Array.isArray(sessions)) {
|
sessions = [sessions];
|
}
|
|
// TODO:
|
// When connected to a sharded cluster the endSessions command
|
// can be sent to any mongos. When connected to a replica set the
|
// endSessions command MUST be sent to the primary if the primary
|
// is available, otherwise it MUST be sent to any available secondary.
|
// Is it enough to use: ReadPreference.primaryPreferred ?
|
this.command(
|
'admin.$cmd',
|
{ endSessions: sessions },
|
{ readPreference: ReadPreference.primaryPreferred },
|
() => {
|
// intentionally ignored, per spec
|
if (typeof callback === 'function') callback();
|
}
|
);
|
}
|
};
|
|
function topologyType(topology) {
|
if (topology.description) {
|
return topology.description.type;
|
}
|
|
if (topology.type === 'mongos') {
|
return TopologyType.Sharded;
|
} else if (topology.type === 'replset') {
|
return TopologyType.ReplicaSetWithPrimary;
|
}
|
|
return TopologyType.Single;
|
}
|
|
const RETRYABLE_WIRE_VERSION = 6;
|
|
/**
|
* Determines whether the provided topology supports retryable writes
|
*
|
* @param {Mongos|Replset} topology
|
*/
|
const isRetryableWritesSupported = function(topology) {
|
const maxWireVersion = topology.lastIsMaster().maxWireVersion;
|
if (maxWireVersion < RETRYABLE_WIRE_VERSION) {
|
return false;
|
}
|
|
if (!topology.logicalSessionTimeoutMinutes) {
|
return false;
|
}
|
|
if (topologyType(topology) === TopologyType.Single) {
|
return false;
|
}
|
|
return true;
|
};
|
|
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
|
'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';
|
|
function getMMAPError(err) {
|
if (err.code !== MMAPv1_RETRY_WRITES_ERROR_CODE || !err.errmsg.includes('Transaction numbers')) {
|
return err;
|
}
|
|
// According to the retryable writes spec, we must replace the error message in this case.
|
// We need to replace err.message so the thrown message is correct and we need to replace err.errmsg to meet the spec requirement.
|
const newErr = new MongoError({
|
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
|
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
|
originalError: err
|
});
|
return newErr;
|
}
|
|
// NOTE: only used for legacy topology types
|
function legacyIsRetryableWriteError(err, topology) {
|
if (!(err instanceof MongoError)) {
|
return false;
|
}
|
|
// if pre-4.4 server, then add error label if its a retryable write error
|
if (
|
isRetryableWritesSupported(topology) &&
|
(err instanceof MongoNetworkError ||
|
(maxWireVersion(topology) < 9 && isRetryableWriteError(err)))
|
) {
|
err.addErrorLabel('RetryableWriteError');
|
}
|
|
return err.hasErrorLabel('RetryableWriteError');
|
}
|
|
module.exports = {
|
SessionMixins,
|
resolveClusterTime,
|
inquireServerState,
|
getTopologyType,
|
emitServerDescriptionChanged,
|
emitTopologyDescriptionChanged,
|
cloneOptions,
|
createCompressionInfo,
|
clone,
|
diff,
|
Interval,
|
Timeout,
|
isRetryableWritesSupported,
|
getMMAPError,
|
topologyType,
|
legacyIsRetryableWriteError
|
};
|