/** * Starts up a sharded cluster with the given specifications. The cluster will be fully operational * after the execution of this constructor function. * * In addition to its own methods, ShardingTest inherits all the functions from the 'sh' utility * with the db set as the first mongos instance in the test (i.e. s0). * * @param {Object} params Contains the key-value pairs for the cluster * configuration. Accepted keys are: * * { * name {string}: name for this test * verbose {number}: the verbosity for the mongos * chunkSize {number}: the chunk size to use as configuration for the cluster * * mongos {number|Object|Array.}: number of mongos or mongos * configuration object(s)(*). @see MongoRunner.runMongos * * rs {Object|Array.}: replica set configuration object. Can * contain: * { * nodes {number}: number of replica members. Defaults to 3. * protocolVersion {number}: protocol version of replset used by the * replset initiation. * For other options, @see ReplSetTest#initiate * } * * shards {number|Object|Array.}: number of shards or shard * configuration object(s)(*). @see MongoRunner.runMongod * * config {number|Object|Array.}: number of config server or * config server configuration object(s)(*). @see MongoRunner.runMongod * * (*) There are two ways For multiple configuration objects. * (1) Using the object format. Example: * * { d0: { verbose: 5 }, d1: { auth: '' }, rs2: { oplogsize: 10 }} * * In this format, d = mongod, s = mongos & c = config servers * * (2) Using the array format. Example: * * [{ verbose: 5 }, { auth: '' }] * * Note: you can only have single server shards for array format. * * Note: A special "bridgeOptions" property can be specified in both the object and array * formats to configure the options for the mongobridge corresponding to that node. These * options are merged with the params.bridgeOptions options, where the node-specific * options take precedence. * * other: { * rs: same as above * chunkSize: same as above * keyFile {string}: the location of the keyFile * shardAsReplicaSet {boolean}: if true, start shards as 2 node replica sets. default * is true. * * shardOptions {Object}: same as the shards property above. * Can be used to specify options that are common all shards. * * configOptions {Object}: same as the config property above. * Can be used to specify options that are common all config servers. * mongosOptions {Object}: same as the mongos property above. * Can be used to specify options that are common all mongos. * enableBalancer {boolean} : if true, enable the balancer * enableAutoSplit {boolean} : if true, enable autosplitting; else, default to the * enableBalancer setting * manualAddShard {boolean}: shards will not be added if true. * * migrationLockAcquisitionMaxWaitMS {number}: number of milliseconds to acquire the migration * lock. * * useBridge {boolean}: If true, then a mongobridge process is started for each node in the * sharded cluster. Defaults to false. * * causallyConsistent {boolean}: Specifies whether the connections to the replica set nodes * should be created with the 'causal consistency' flag enabled, which means they will * gossip the cluster time and add readConcern afterClusterTime where applicable. * Defaults to false. * * bridgeOptions {Object}: Options to apply to all mongobridge processes. Defaults to {}. * * // replica Set only: * rsOptions {Object}: same as the rs property above. Can be used to * specify options that are common all replica members. * useHostname {boolean}: if true, use hostname of machine, * otherwise use localhost * numReplicas {number} * * } * } * * Member variables: * s {Mongo} - connection to the first mongos * s0, s1, ... {Mongo} - connection to different mongos * rs0, rs1, ... {ReplSetTest} - test objects to replica sets * shard0, shard1, ... {Mongo} - connection to shards (not available for replica sets) * d0, d1, ... {Mongo} - same as shard0, shard1, ... * config0, config1, ... {Mongo} - connection to config servers * c0, c1, ... {Mongo} - same as config0, config1, ... * configRS - If the config servers are a replset, this will contain the config ReplSetTest object */ var ShardingTest = function(params) { if (!(this instanceof ShardingTest)) { return new ShardingTest(params); } // Capture the 'this' reference var self = this; // Used for counting the test duration var _startTime = new Date(); // Populated with the paths of all shard hosts (config servers + hosts) and is used for // cleaning up the data files on shutdown var _alldbpaths = []; // Timeout to be used for operations scheduled by the sharding test, which must wait for write // concern (5 minutes) var kDefaultWTimeoutMs = 5 * 60 * 1000; // Publicly exposed variables /** * Attempts to open a connection to the specified connection string or throws if unable to * connect. */ function _connectWithRetry(url) { var conn; assert.soon(function() { try { conn = new Mongo(url); return true; } catch (e) { print("Error connecting to " + url + ": " + e); return false; } }); return conn; } /** * Constructs a human-readable string representing a chunk's range. */ function _rangeToString(r) { return tojsononeline(r.min) + " -> " + tojsononeline(r.max); } /** * Checks whether the specified collection is sharded by consulting the config metadata. */ function _isSharded(collName) { var collName = "" + collName; var dbName; if (typeof collName.getCollectionNames == 'function') { dbName = "" + collName; collName = undefined; } if (dbName) { var x = self.config.databases.findOne({_id: dbname}); if (x) return x.partitioned; else return false; } if (collName) { var x = self.config.collections.findOne({_id: collName}); if (x) return true; else return false; } } /** * Extends the ShardingTest class with the methods exposed by the sh utility class. */ function _extendWithShMethods() { Object.keys(sh).forEach(function(fn) { if (typeof sh[fn] !== 'function') { return; } assert.eq(undefined, self[fn], 'ShardingTest contains a method ' + fn + ' which duplicates a method with the same name on sh. ' + 'Please select a different function name.'); self[fn] = function() { if (typeof db == "undefined") { db = undefined; } var oldDb = db; db = self.getDB('test'); try { return sh[fn].apply(sh, arguments); } finally { db = oldDb; } }; }); } /** * Configures the cluster based on the specified parameters (balancer state, etc). */ function _configureCluster() { if (!otherParams.enableBalancer) { self.stopBalancer(); } if (!otherParams.enableAutoSplit) { self.disableAutoSplit(); } else if (!otherParams.enableBalancer) { // Turn on autoSplit since disabling balancer also turns auto split off. self.enableAutoSplit(); } } function connectionURLTheSame(a, b) { if (a == b) return true; if (!a || !b) return false; if (a.host) return connectionURLTheSame(a.host, b); if (b.host) return connectionURLTheSame(a, b.host); if (a.name) return connectionURLTheSame(a.name, b); if (b.name) return connectionURLTheSame(a, b.name); if (a.indexOf("/") < 0 && b.indexOf("/") < 0) { a = a.split(":"); b = b.split(":"); if (a.length != b.length) return false; if (a.length == 2 && a[1] != b[1]) return false; if (a[0] == "localhost" || a[0] == "127.0.0.1") a[0] = getHostName(); if (b[0] == "localhost" || b[0] == "127.0.0.1") b[0] = getHostName(); return a[0] == b[0]; } else { var a0 = a.split("/")[0]; var b0 = b.split("/")[0]; return a0 == b0; } } assert(connectionURLTheSame("foo", "foo")); assert(!connectionURLTheSame("foo", "bar")); assert(connectionURLTheSame("foo/a,b", "foo/b,a")); assert(!connectionURLTheSame("foo/a,b", "bar/a,b")); // ShardingTest API this.getDB = function(name) { return this.s.getDB(name); }; /** * Finds the _id of the primary shard for database 'dbname', e.g., 'test-rs0' */ this.getPrimaryShardIdForDatabase = function(dbname) { var x = this.config.databases.findOne({_id: "" + dbname}); if (x) { return x.primary; } var countDBsFound = 0; this.config.databases.find().forEach(function(db) { countDBsFound++; printjson(db); }); throw Error("couldn't find dbname: " + dbname + " in config.databases. Total DBs: " + countDBsFound); }; this.getNonPrimaries = function(dbname) { var x = this.config.databases.findOne({_id: dbname}); if (!x) { this.config.databases.find().forEach(printjson); throw Error("couldn't find dbname: " + dbname + " total: " + this.config.databases.count()); } return this.config.shards.find({_id: {$ne: x.primary}}).map(z => z._id); }; this.getConnNames = function() { var names = []; for (var i = 0; i < this._connections.length; i++) { names.push(this._connections[i].name); } return names; }; /** * Find the connection to the primary shard for database 'dbname'. */ this.getPrimaryShard = function(dbname) { var dbPrimaryShardId = this.getPrimaryShardIdForDatabase(dbname); var primaryShard = this.config.shards.findOne({_id: dbPrimaryShardId}); if (primaryShard) { shardConnectionString = primaryShard.host; var rsName = shardConnectionString.substring(0, shardConnectionString.indexOf("/")); for (var i = 0; i < this._connections.length; i++) { var c = this._connections[i]; if (connectionURLTheSame(shardConnectionString, c.name) || connectionURLTheSame(rsName, c.name)) return c; } } throw Error("can't find server connection for db '" + dbname + "'s primary shard: " + tojson(primaryShard)); }; this.normalize = function(x) { var z = this.config.shards.findOne({host: x}); if (z) return z._id; return x; }; /** * Find a different shard connection than the one given. */ this.getOther = function(one) { if (this._connections.length < 2) { throw Error("getOther only works with 2 shards"); } if (one._mongo) { one = one._mongo; } for (var i = 0; i < this._connections.length; i++) { if (this._connections[i] != one) { return this._connections[i]; } } return null; }; this.getAnother = function(one) { if (this._connections.length < 2) { throw Error("getAnother() only works with multiple servers"); } if (one._mongo) { one = one._mongo; } for (var i = 0; i < this._connections.length; i++) { if (this._connections[i] == one) return this._connections[(i + 1) % this._connections.length]; } }; this.stopAllMongos = function(opts) { for (var i = 0; i < this._mongos.length; i++) { this.stopMongos(i, opts); } }; this.stop = function(opts = {}) { this.checkUUIDsConsistentAcrossCluster(); this.checkIndexesConsistentAcrossCluster(); this.checkOrphansAreDeleted(); if (jsTestOptions().alwaysUseLogFiles) { if (opts.noCleanData === false) { throw new Error("Always using log files, but received conflicting option."); } opts.noCleanData = true; } this.stopAllMongos(opts); let startTime = new Date(); // Measure the execution time of shutting down shards. for (var i = 0; i < this._connections.length; i++) { if (this._rs[i]) { this._rs[i].test.stopSet(15, undefined, opts); } else { this.stopMongod(i, opts); } } print("ShardingTest stopped all shards, took " + (new Date() - startTime) + "ms for " + this._connections.length + " shards."); if (this.configRS) { this.configRS.stopSet(undefined, undefined, opts); } else { // Old style config triplet for (var i = 0; i < this._configServers.length; i++) { this.stopConfigServer(i, opts); } } if (!opts.noCleanData) { print("ShardingTest stop deleting all dbpaths"); for (var i = 0; i < _alldbpaths.length; i++) { resetDbpath(MongoRunner.dataPath + _alldbpaths[i]); } } var timeMillis = new Date().getTime() - _startTime.getTime(); print('*** ShardingTest ' + this._testName + " completed successfully in " + (timeMillis / 1000) + " seconds ***"); }; this.getDBPaths = function() { return _alldbpaths.map((path) => { return MongoRunner.dataPath + path; }); }; this.adminCommand = function(cmd) { var res = this.admin.runCommand(cmd); if (res && res.ok == 1) return true; throw _getErrorWithCode(res, "command " + tojson(cmd) + " failed: " + tojson(res)); }; this.forEachConnection = function(fn) { this._connections.forEach(function(conn) { fn(conn); }); }; this.printChangeLog = function() { this.config.changelog.find().forEach(function(z) { var msg = z.server + "\t" + z.time + "\t" + z.what; for (var i = z.what.length; i < 15; i++) msg += " "; msg += " " + z.ns + "\t"; if (z.what == "split") { msg += _rangeToString(z.details.before) + " -->> (" + _rangeToString(z.details.left) + "), (" + _rangeToString(z.details.right) + ")"; } else if (z.what == "multi-split") { msg += _rangeToString(z.details.before) + " -->> (" + z.details.number + "/" + z.details.of + " " + _rangeToString(z.details.chunk) + ")"; } else { msg += tojsononeline(z.details); } print("ShardingTest " + msg); }); }; this.getChunksString = function(ns) { var q = {}; if (ns) { q.ns = ns; } var s = ""; this.config.chunks.find(q).sort({ns: 1, min: 1}).forEach(function(z) { s += " " + z._id + "\t" + z.lastmod.t + "|" + z.lastmod.i + "\t" + tojson(z.min) + " -> " + tojson(z.max) + " " + z.shard + " " + z.ns + "\n"; }); return s; }; this.printChunks = function(ns) { print("ShardingTest " + this.getChunksString(ns)); }; this.printShardingStatus = function(verbose) { printShardingStatus(this.config, verbose); }; this.printCollectionInfo = function(ns, msg) { var out = ""; if (msg) { out += msg + "\n"; } out += "sharding collection info: " + ns + "\n"; for (var i = 0; i < this._connections.length; i++) { var c = this._connections[i]; out += " mongod " + c + " " + tojson(c.getCollection(ns).getShardVersion(), " ", true) + "\n"; } for (var i = 0; i < this._mongos.length; i++) { var c = this._mongos[i]; out += " mongos " + c + " " + tojson(c.getCollection(ns).getShardVersion(), " ", true) + "\n"; } out += this.getChunksString(ns); print("ShardingTest " + out); }; /** * Returns the number of shards which contain the given dbName.collName collection */ this.onNumShards = function(dbName, collName) { return this.shardCounts(dbName, collName) .reduce((total, currentValue) => total + (currentValue > 0 ? 1 : 0), 0); }; /** * Returns an array of the size of numShards where each element is the number of documents on * that particular shard */ this.shardCounts = function(dbName, collName) { return this._connections.map((connection) => connection.getDB(dbName).getCollection(collName).count()); }; this.chunkCounts = function(collName, dbName) { dbName = dbName || "test"; var x = {}; this.config.shards.find().forEach(function(z) { x[z._id] = 0; }); this.config.chunks.find({ns: dbName + "." + collName}).forEach(function(z) { if (x[z.shard]) x[z.shard]++; else x[z.shard] = 1; }); return x; }; this.chunkDiff = function(collName, dbName) { var c = this.chunkCounts(collName, dbName); var min = 100000000; var max = 0; for (var s in c) { if (c[s] < min) min = c[s]; if (c[s] > max) max = c[s]; } print("ShardingTest input: " + tojson(c) + " min: " + min + " max: " + max); return max - min; }; /** * Waits up to the specified timeout (with a default of 60s) for the balancer to execute one * round. If no round has been executed, throws an error. * * The mongosConnection parameter is optional and allows callers to specify a connection * different than the first mongos instance in the list. */ this.awaitBalancerRound = function(timeoutMs, mongosConnection) { timeoutMs = timeoutMs || 60000; mongosConnection = mongosConnection || self.s0; // Get the balancer section from the server status of the config server primary function getBalancerStatus() { var balancerStatus = assert.commandWorked(mongosConnection.adminCommand({balancerStatus: 1})); if (balancerStatus.mode !== 'full') { throw Error('Balancer is not enabled'); } return balancerStatus; } var initialStatus = getBalancerStatus(); var currentStatus; assert.soon( function() { currentStatus = getBalancerStatus(); return (currentStatus.numBalancerRounds - initialStatus.numBalancerRounds) != 0; }, function() { return 'Latest balancer status: ' + tojson(currentStatus); }, timeoutMs); }; /** * Waits up to one minute for the difference in chunks between the most loaded shard and * least loaded shard to be 0 or 1, indicating that the collection is well balanced. This should * only be called after creating a big enough chunk difference to trigger balancing. */ this.awaitBalance = function(collName, dbName, timeToWait) { timeToWait = timeToWait || 60000; assert.soon(function() { var x = self.chunkDiff(collName, dbName); print("chunk diff: " + x); return x < 2; }, "no balance happened", timeToWait); }; this.getShard = function(coll, query, includeEmpty) { var shards = this.getShardsForQuery(coll, query, includeEmpty); assert.eq(shards.length, 1); return shards[0]; }; /** * Returns the shards on which documents matching a particular query reside. */ this.getShardsForQuery = function(coll, query, includeEmpty) { if (!coll.getDB) { coll = this.s.getCollection(coll); } var explain = coll.find(query).explain("executionStats"); var shards = []; var execStages = explain.executionStats.executionStages; var plannerShards = explain.queryPlanner.winningPlan.shards; if (execStages.shards) { for (var i = 0; i < execStages.shards.length; i++) { var hasResults = execStages.shards[i].executionStages.nReturned && execStages.shards[i].executionStages.nReturned > 0; if (includeEmpty || hasResults) { shards.push(plannerShards[i].connectionString); } } } for (var i = 0; i < shards.length; i++) { for (var j = 0; j < this._connections.length; j++) { if (connectionURLTheSame(this._connections[j], shards[i])) { shards[i] = this._connections[j]; break; } } } return shards; }; this.shardColl = function(collName, key, split, move, dbName, waitForDelete) { split = (split != false ? (split || key) : split); move = (split != false && move != false ? (move || split) : false); if (collName.getDB) dbName = "" + collName.getDB(); else dbName = dbName || "test"; var c = dbName + "." + collName; if (collName.getDB) { c = "" + collName; } if (!_isSharded(dbName)) { assert.commandWorked(this.s.adminCommand({enableSharding: dbName})); } var result = assert.commandWorked(this.s.adminCommand({shardcollection: c, key: key})); if (split == false) { return; } result = assert.commandWorked(this.s.adminCommand({split: c, middle: split})); if (move == false) { return; } var result; for (var i = 0; i < 5; i++) { var otherShard = this.getOther(this.getPrimaryShard(dbName)).name; result = this.s.adminCommand( {movechunk: c, find: move, to: otherShard, _waitForDelete: waitForDelete}); if (result.ok) break; sleep(5 * 1000); } assert.commandWorked(result); }; /** * Kills the mongos with index n. */ this.stopMongos = function(n, opts) { if (otherParams.useBridge) { MongoRunner.stopMongos(unbridgedMongos[n], undefined, opts); this["s" + n].stop(); } else { MongoRunner.stopMongos(this["s" + n], undefined, opts); } }; /** * Kills the shard mongod with index n. */ this.stopMongod = function(n, opts) { if (otherParams.useBridge) { MongoRunner.stopMongod(unbridgedConnections[n], undefined, opts); this["d" + n].stop(); } else { MongoRunner.stopMongod(this["d" + n], undefined, opts); } }; /** * Kills the config server mongod with index n. */ this.stopConfigServer = function(n, opts) { if (otherParams.useBridge) { MongoRunner.stopMongod(unbridgedConfigServers[n], undefined, opts); this._configServers[n].stop(); } else { MongoRunner.stopMongod(this._configServers[n], undefined, opts); } }; /** * Stops and restarts a mongos process. * * If 'opts' is not specified, starts the mongos with its previous parameters. If 'opts' is * specified and 'opts.restart' is false or missing, starts mongos with the parameters specified * in 'opts'. If opts is specified and 'opts.restart' is true, merges the previous options * with the options specified in 'opts', with the options in 'opts' taking precedence. * * Warning: Overwrites the old s (if n = 0) admin, config, and sn member variables. */ this.restartMongos = function(n, opts) { var mongos; if (otherParams.useBridge) { mongos = unbridgedMongos[n]; } else { mongos = this["s" + n]; } opts = opts || mongos; opts.port = opts.port || mongos.port; this.stopMongos(n); if (otherParams.useBridge) { var bridgeOptions = (opts !== mongos) ? opts.bridgeOptions : mongos.fullOptions.bridgeOptions; bridgeOptions = Object.merge(otherParams.bridgeOptions, bridgeOptions || {}); bridgeOptions = Object.merge(bridgeOptions, { hostName: otherParams.useHostname ? hostName : "localhost", port: this._mongos[n].port, // The mongos processes identify themselves to mongobridge as host:port, where the // host is the actual hostname of the machine and not localhost. dest: hostName + ":" + opts.port, }); this._mongos[n] = new MongoBridge(bridgeOptions); } if (opts.restart) { opts = Object.merge(mongos.fullOptions, opts); // If the mongos is being restarted with a newer version, make sure we remove any // options that no longer exist in the newer version. if (MongoRunner.areBinVersionsTheSame('latest', opts.binVersion)) { delete opts.noAutoSplit; } } var newConn = MongoRunner.runMongos(opts); if (!newConn) { throw new Error("Failed to restart mongos " + n); } if (otherParams.useBridge) { this._mongos[n].connectToBridge(); unbridgedMongos[n] = newConn; } else { this._mongos[n] = newConn; } this['s' + n] = this._mongos[n]; if (n == 0) { this.s = this._mongos[n]; this.admin = this._mongos[n].getDB('admin'); this.config = this._mongos[n].getDB('config'); } }; /** * Stops and restarts a shard mongod process. * * If opts is specified, the new mongod is started using those options. Otherwise, it is started * with its previous parameters. The 'beforeRestartCallback' parameter is an optional function * that will be run after the MongoD is stopped, but before it is restarted. The intended uses * of the callback are modifications to the dbpath of the mongod that must be made while it is * stopped. * * Warning: Overwrites the old dn/shardn member variables. */ this.restartMongod = function(n, opts, beforeRestartCallback) { var mongod; if (otherParams.useBridge) { mongod = unbridgedConnections[n]; } else { mongod = this["d" + n]; } opts = opts || mongod; opts.port = opts.port || mongod.port; this.stopMongod(n); if (otherParams.useBridge) { var bridgeOptions = (opts !== mongod) ? opts.bridgeOptions : mongod.fullOptions.bridgeOptions; bridgeOptions = Object.merge(otherParams.bridgeOptions, bridgeOptions || {}); bridgeOptions = Object.merge(bridgeOptions, { hostName: otherParams.useHostname ? hostName : "localhost", port: this._connections[n].port, // The mongod processes identify themselves to mongobridge as host:port, where the // host is the actual hostname of the machine and not localhost. dest: hostName + ":" + opts.port, }); this._connections[n] = new MongoBridge(bridgeOptions); } if (arguments.length >= 3) { if (typeof (beforeRestartCallback) !== "function") { throw new Error("beforeRestartCallback must be a function but was of type " + typeof (beforeRestartCallback)); } beforeRestartCallback(); } opts.restart = true; var newConn = MongoRunner.runMongod(opts); if (!newConn) { throw new Error("Failed to restart shard " + n); } if (otherParams.useBridge) { this._connections[n].connectToBridge(); unbridgedConnections[n] = newConn; } else { this._connections[n] = newConn; } this["shard" + n] = this._connections[n]; this["d" + n] = this._connections[n]; }; /** * Restarts each node in a particular shard replica set using the shard's original startup * options by default. * * Option { startClean : true } forces clearing the data directory. * Option { auth : Object } object that contains the auth details for admin credentials. * Should contain the fields 'user' and 'pwd' * * * @param {int} shard server number (0, 1, 2, ...) to be restarted */ this.restartShardRS = function(n, options, signal, wait) { for (let i = 0; i < this["rs" + n].nodeList().length; i++) { this["rs" + n].restart(i); } this["rs" + n].awaitSecondaryNodes(); this._connections[n] = new Mongo(this["rs" + n].getURL()); this["shard" + n] = this._connections[n]; }; /** * Stops and restarts a config server mongod process. * * If opts is specified, the new mongod is started using those options. Otherwise, it is * started * with its previous parameters. * * Warning: Overwrites the old cn/confign member variables. */ this.restartConfigServer = function(n) { var mongod; if (otherParams.useBridge) { mongod = unbridgedConfigServers[n]; } else { mongod = this["c" + n]; } this.stopConfigServer(n); if (otherParams.useBridge) { var bridgeOptions = Object.merge(otherParams.bridgeOptions, mongod.fullOptions.bridgeOptions || {}); bridgeOptions = Object.merge(bridgeOptions, { hostName: otherParams.useHostname ? hostName : "localhost", port: this._configServers[n].port, // The mongod processes identify themselves to mongobridge as host:port, where the // host is the actual hostname of the machine and not localhost. dest: hostName + ":" + mongod.port, }); this._configServers[n] = new MongoBridge(bridgeOptions); } mongod.restart = true; var newConn = MongoRunner.runMongod(mongod); if (!newConn) { throw new Error("Failed to restart config server " + n); } if (otherParams.useBridge) { this._configServers[n].connectToBridge(); unbridgedConfigServers[n] = newConn; } else { this._configServers[n] = newConn; } this["config" + n] = this._configServers[n]; this["c" + n] = this._configServers[n]; }; /** * Helper method for setting primary shard of a database and making sure that it was successful. * Note: first mongos needs to be up. */ this.ensurePrimaryShard = function(dbName, shardName) { var db = this.s0.getDB('admin'); var res = db.adminCommand({movePrimary: dbName, to: shardName}); assert(res.ok || res.errmsg == "it is already the primary", tojson(res)); }; /** * Returns whether any settings to ShardingTest or jsTestOptions indicate this is a multiversion * cluster. * * Checks for 'last-stable' bin versions via: * jsTestOptions().shardMixedBinVersions, jsTestOptions().mongosBinVersion, * otherParams.configOptions.binVersion, otherParams.shardOptions.binVersion, * otherParams.mongosOptions.binVersion */ this.isMixedVersionCluster = function() { var lastStableBinVersion = MongoRunner.getBinVersionFor('last-stable'); // Must check shardMixedBinVersion because it causes shardOptions.binVersion to be an object // (versionIterator) rather than a version string. Must check mongosBinVersion, as well, // because it does not update mongosOptions.binVersion. if (jsTestOptions().shardMixedBinVersions || (jsTestOptions().mongosBinVersion && MongoRunner.areBinVersionsTheSame(lastStableBinVersion, jsTestOptions().mongosBinVersion))) { return true; } // Check for 'last-stable' config servers. if (otherParams.configOptions && otherParams.configOptions.binVersion && MongoRunner.areBinVersionsTheSame( lastStableBinVersion, MongoRunner.getBinVersionFor(otherParams.configOptions.binVersion))) { return true; } for (var i = 0; i < numConfigs; ++i) { if (otherParams['c' + i] && otherParams['c' + i].binVersion && MongoRunner.areBinVersionsTheSame( lastStableBinVersion, MongoRunner.getBinVersionFor(otherParams['c' + i].binVersion))) { return true; } } // Check for 'last-stable' mongod servers. if (otherParams.shardOptions && otherParams.shardOptions.binVersion && MongoRunner.areBinVersionsTheSame( lastStableBinVersion, MongoRunner.getBinVersionFor(otherParams.shardOptions.binVersion))) { return true; } for (var i = 0; i < numShards; ++i) { if (otherParams['d' + i] && otherParams['d' + i].binVersion && MongoRunner.areBinVersionsTheSame( lastStableBinVersion, MongoRunner.getBinVersionFor(otherParams['d' + i].binVersion))) { return true; } } // Check for 'last-stable' mongos servers. if (otherParams.mongosOptions && otherParams.mongosOptions.binVersion && MongoRunner.areBinVersionsTheSame( lastStableBinVersion, MongoRunner.getBinVersionFor(otherParams.mongosOptions.binVersion))) { return true; } for (var i = 0; i < numMongos; ++i) { if (otherParams['s' + i] && otherParams['s' + i].binVersion && MongoRunner.areBinVersionsTheSame( lastStableBinVersion, MongoRunner.getBinVersionFor(otherParams['s' + i].binVersion))) { return true; } } return false; }; /** * Runs a find on the namespace to force a refresh of the node's catalog cache. */ this.refreshCatalogCacheForNs = function(node, ns) { node.getCollection(ns).findOne(); }; /** * Returns if there is a new feature compatibility version for the "latest" version. This must * be manually changed if and when there is a new feature compatibility version. */ function _hasNewFeatureCompatibilityVersion() { return true; } /** * Returns the total number of mongod nodes across all shards, excluding config server nodes. * Used only for diagnostic logging. */ function totalNumShardNodes(shardsAsReplSets) { // Standalone mongod shards. if (!shardsAsReplSets) { return self._connections.length; } // Replica set shards. const numNodesPerReplSet = self._rs.map(r => r.test.nodes.length); return numNodesPerReplSet.reduce((a, b) => a + b, 0); } // ShardingTest initialization assert(isObject(params), 'ShardingTest configuration must be a JSON object'); var testName = params.name || jsTest.name(); var otherParams = Object.merge(params, params.other || {}); var numShards = otherParams.hasOwnProperty('shards') ? otherParams.shards : 2; var mongosVerboseLevel = otherParams.hasOwnProperty('verbose') ? otherParams.verbose : 1; var numMongos = otherParams.hasOwnProperty('mongos') ? otherParams.mongos : 1; var numConfigs = otherParams.hasOwnProperty('config') ? otherParams.config : 3; var startShardsAsRS = otherParams.hasOwnProperty('shardAsReplicaSet') ? otherParams.shardAsReplicaSet : true; // Default enableBalancer to false. otherParams.enableBalancer = ("enableBalancer" in otherParams) && (otherParams.enableBalancer === true); // Let autosplit behavior match that of the balancer if autosplit is not explicitly set. if (!("enableAutoSplit" in otherParams)) { otherParams.enableAutoSplit = otherParams.enableBalancer; } // Allow specifying mixed-type options like this: // { mongos : [ { bind_ip : "localhost" } ], // config : [ { nojournal : "" } ], // shards : { rs : true, d : true } } if (Array.isArray(numShards)) { for (var i = 0; i < numShards.length; i++) { otherParams["d" + i] = numShards[i]; } numShards = numShards.length; } else if (isObject(numShards)) { var tempCount = 0; for (var i in numShards) { otherParams[i] = numShards[i]; tempCount++; } numShards = tempCount; } if (Array.isArray(numMongos)) { for (var i = 0; i < numMongos.length; i++) { otherParams["s" + i] = numMongos[i]; } numMongos = numMongos.length; } else if (isObject(numMongos)) { var tempCount = 0; for (var i in numMongos) { otherParams[i] = numMongos[i]; tempCount++; } numMongos = tempCount; } if (Array.isArray(numConfigs)) { for (var i = 0; i < numConfigs.length; i++) { otherParams["c" + i] = numConfigs[i]; } numConfigs = numConfigs.length; } else if (isObject(numConfigs)) { var tempCount = 0; for (var i in numConfigs) { otherParams[i] = numConfigs[i]; tempCount++; } numConfigs = tempCount; } otherParams.useHostname = otherParams.useHostname == undefined ? true : otherParams.useHostname; otherParams.useBridge = otherParams.useBridge || false; otherParams.bridgeOptions = otherParams.bridgeOptions || {}; otherParams.causallyConsistent = otherParams.causallyConsistent || false; if (jsTestOptions().networkMessageCompressors) { otherParams.bridgeOptions["networkMessageCompressors"] = jsTestOptions().networkMessageCompressors; } this.keyFile = otherParams.keyFile; var hostName = otherParams.host === undefined ? getHostName() : otherParams.host; this._testName = testName; this._otherParams = otherParams; var pathOpts = {testName: testName}; this._connections = []; this._rs = []; this._rsObjects = []; let unbridgedConnections; let unbridgedConfigServers; let unbridgedMongos; let _makeAllocatePortFn; let _allocatePortForMongos; let _allocatePortForBridgeForMongos; let _allocatePortForShard; let _allocatePortForBridgeForShard; if (otherParams.useBridge) { unbridgedConnections = []; unbridgedConfigServers = []; unbridgedMongos = []; _makeAllocatePortFn = (preallocatedPorts, errorMessage) => { let idxNextNodePort = 0; return function() { if (idxNextNodePort >= preallocatedPorts.length) { throw new Error(errorMessage(preallocatedPorts.length)); } const nextPort = preallocatedPorts[idxNextNodePort]; ++idxNextNodePort; return nextPort; }; }; let errorMessage = (length) => "Cannot use more than " + length + " mongos processes when useBridge=true"; _allocatePortForBridgeForMongos = _makeAllocatePortFn(allocatePorts(MongoBridge.kBridgeOffset), errorMessage); _allocatePortForMongos = _makeAllocatePortFn(allocatePorts(MongoBridge.kBridgeOffset), errorMessage); errorMessage = (length) => "Cannot use more than " + length + " stand-alone shards when useBridge=true"; _allocatePortForBridgeForShard = _makeAllocatePortFn(allocatePorts(MongoBridge.kBridgeOffset), errorMessage); _allocatePortForShard = _makeAllocatePortFn(allocatePorts(MongoBridge.kBridgeOffset), errorMessage); } else { _allocatePortForBridgeForShard = _allocatePortForBridgeForMongos = function() { throw new Error("Using mongobridge isn't enabled for this sharded cluster"); }; _allocatePortForShard = _allocatePortForMongos = allocatePort; } otherParams.migrationLockAcquisitionMaxWaitMS = otherParams.migrationLockAcquisitionMaxWaitMS || 30000; let randomSeedAlreadySet = false; if (jsTest.options().randomBinVersions) { // We avoid setting the random seed unequivocally to avoid unexpected behavior in tests // that already make use of Random.setRandomSeed(). This conditional can be removed if // it becomes the standard to always be generating the seed through ShardingTest. Random.setRandomSeed(jsTest.options().seed); randomSeedAlreadySet = true; } // Should we start up shards as replica sets. const shardsAsReplSets = (otherParams.rs || otherParams["rs" + i] || startShardsAsRS); // // Start each shard replica set or standalone mongod. // let startTime = new Date(); // Measure the execution time of startup and initiate. for (var i = 0; i < numShards; i++) { if (shardsAsReplSets) { var setName = testName + "-rs" + i; var rsDefaults = { useHostname: otherParams.useHostname, oplogSize: 16, shardsvr: '', pathOpts: Object.merge(pathOpts, {shard: i}), }; if (otherParams.rs || otherParams["rs" + i]) { if (otherParams.rs) { rsDefaults = Object.merge(rsDefaults, otherParams.rs); } if (otherParams["rs" + i]) { rsDefaults = Object.merge(rsDefaults, otherParams["rs" + i]); } rsDefaults = Object.merge(rsDefaults, otherParams.rsOptions); rsDefaults.nodes = rsDefaults.nodes || otherParams.numReplicas; } if (startShardsAsRS && !(otherParams.rs || otherParams["rs" + i])) { if (jsTestOptions().shardMixedBinVersions) { if (!otherParams.shardOptions) { otherParams.shardOptions = {}; } // If the test doesn't depend on specific shard binVersions, create a mixed // version // shard cluster that randomly assigns shard binVersions, half "latest" and half // "last-stable". if (!otherParams.shardOptions.binVersion) { Random.setRandomSeed(); otherParams.shardOptions.binVersion = MongoRunner.versionIterator(["latest", "last-stable"], true); } } if (otherParams.shardOptions && otherParams.shardOptions.binVersion) { otherParams.shardOptions.binVersion = MongoRunner.versionIterator(otherParams.shardOptions.binVersion); } rsDefaults = Object.merge(rsDefaults, otherParams["d" + i]); rsDefaults = Object.merge(rsDefaults, otherParams.shardOptions); } rsDefaults.setParameter = rsDefaults.setParameter || {}; rsDefaults.setParameter.migrationLockAcquisitionMaxWaitMS = otherParams.migrationLockAcquisitionMaxWaitMS; var rsSettings = rsDefaults.settings; delete rsDefaults.settings; // If both rs and startShardsAsRS are specfied, the number of nodes // in the rs field should take priority. if (otherParams.rs || otherParams["rs" + i]) { var numReplicas = rsDefaults.nodes || 3; } else if (startShardsAsRS) { var numReplicas = 1; } delete rsDefaults.nodes; var protocolVersion = rsDefaults.protocolVersion; delete rsDefaults.protocolVersion; var rs = new ReplSetTest({ name: setName, nodes: numReplicas, host: hostName, useHostName: otherParams.useHostname, useBridge: otherParams.useBridge, bridgeOptions: otherParams.bridgeOptions, keyFile: this.keyFile, protocolVersion: protocolVersion, waitForKeys: false, settings: rsSettings, seedRandomNumberGenerator: !randomSeedAlreadySet, }); print("ShardingTest starting replica set for shard: " + setName); // Start up the replica set but don't wait for it to complete. This allows the startup // of each shard to proceed in parallel. this._rs[i] = {setName: setName, test: rs, nodes: rs.startSetAsync(rsDefaults), url: rs.getURL()}; } else { var options = { useHostname: otherParams.useHostname, pathOpts: Object.merge(pathOpts, {shard: i}), dbpath: "$testName$shard", shardsvr: '', keyFile: this.keyFile }; options.setParameter = options.setParameter || {}; options.setParameter.migrationLockAcquisitionMaxWaitMS = otherParams.migrationLockAcquisitionMaxWaitMS; if (jsTestOptions().shardMixedBinVersions) { if (!otherParams.shardOptions) { otherParams.shardOptions = {}; } // If the test doesn't depend on specific shard binVersions, create a mixed version // shard cluster that randomly assigns shard binVersions, half "latest" and half // "last-stable". if (!otherParams.shardOptions.binVersion) { Random.setRandomSeed(); otherParams.shardOptions.binVersion = MongoRunner.versionIterator(["latest", "last-stable"], true); } } if (otherParams.shardOptions && otherParams.shardOptions.binVersion) { otherParams.shardOptions.binVersion = MongoRunner.versionIterator(otherParams.shardOptions.binVersion); } options = Object.merge(options, otherParams.shardOptions); options = Object.merge(options, otherParams["d" + i]); options.port = options.port || _allocatePortForShard(); if (otherParams.useBridge) { var bridgeOptions = Object.merge(otherParams.bridgeOptions, options.bridgeOptions || {}); bridgeOptions = Object.merge(bridgeOptions, { hostName: otherParams.useHostname ? hostName : "localhost", port: _allocatePortForBridgeForShard(), // The mongod processes identify themselves to mongobridge as host:port, where // the host is the actual hostname of the machine and not localhost. dest: hostName + ":" + options.port, }); var bridge = new MongoBridge(bridgeOptions); } var conn = MongoRunner.runMongod(options); if (!conn) { throw new Error("Failed to start shard " + i); } if (otherParams.useBridge) { bridge.connectToBridge(); this._connections.push(bridge); unbridgedConnections.push(conn); } else { this._connections.push(conn); } _alldbpaths.push(testName + i); this["shard" + i] = this._connections[i]; this["d" + i] = this._connections[i]; this._rs[i] = null; this._rsObjects[i] = null; } } // // Start up the config server replica set. // this._configServers = []; // Using replica set for config servers var rstOptions = { useHostName: otherParams.useHostname, host: hostName, useBridge: otherParams.useBridge, bridgeOptions: otherParams.bridgeOptions, keyFile: this.keyFile, waitForKeys: false, name: testName + "-configRS", seedRandomNumberGenerator: !randomSeedAlreadySet, isConfigServer: true, }; // when using CSRS, always use wiredTiger as the storage engine var startOptions = { pathOpts: pathOpts, // Ensure that journaling is always enabled for config servers. journal: "", configsvr: "", storageEngine: "wiredTiger", }; if (otherParams.configOptions && otherParams.configOptions.binVersion) { otherParams.configOptions.binVersion = MongoRunner.versionIterator(otherParams.configOptions.binVersion); } startOptions = Object.merge(startOptions, otherParams.configOptions); rstOptions = Object.merge(rstOptions, otherParams.configReplSetTestOptions); var nodeOptions = []; for (var i = 0; i < numConfigs; ++i) { nodeOptions.push(otherParams["c" + i] || {}); } rstOptions.nodes = nodeOptions; // Start the config server's replica set without waiting for it to complete. This allows it // to proceed in parallel with the startup of each shard. this.configRS = new ReplSetTest(rstOptions); this.configRS.startSetAsync(startOptions); // // Wait for each shard replica set to finish starting up. // if (shardsAsReplSets) { for (let i = 0; i < numShards; i++) { print("Waiting for shard " + this._rs[i].setName + " to finish starting up."); this._rs[i].test.startSetAwait(); } } // // Wait for the config server to finish starting up. // print("Waiting for the config server to finish starting up."); this.configRS.startSetAwait(); var config = this.configRS.getReplSetConfig(); config.configsvr = true; config.settings = config.settings || {}; print("ShardingTest startup for all nodes took " + (new Date() - startTime) + "ms with " + this.configRS.nodeList().length + " config server nodes and " + totalNumShardNodes(shardsAsReplSets) + " total shard nodes."); // // Initiate each shard replica set. // if (shardsAsReplSets) { for (var i = 0; i < numShards; i++) { print("ShardingTest initiating replica set for shard: " + this._rs[i].setName); // ReplSetTest.initiate() requires all nodes to be to be authorized to run // replSetGetStatus. // TODO(SERVER-14017): Remove this in favor of using initiate() everywhere. this._rs[i].test.initiateWithAnyNodeAsPrimary(); this["rs" + i] = this._rs[i].test; this._rsObjects[i] = this._rs[i].test; _alldbpaths.push(null); this._connections.push(null); if (otherParams.useBridge) { unbridgedConnections.push(null); } } } // Do replication on replica sets if required for (var i = 0; i < numShards; i++) { if (!shardsAsReplSets) { continue; } var rs = this._rs[i].test; rs.getPrimary().getDB("admin").foo.save({x: 1}); if (this.keyFile) { authutil.asCluster(rs.nodes, this.keyFile, function() { rs.awaitReplication(); }); } rs.awaitSecondaryNodes(); var rsConn = new Mongo(rs.getURL()); rsConn.name = rs.getURL(); this._connections[i] = rsConn; this["shard" + i] = rsConn; rsConn.rs = rs; } // ReplSetTest.initiate() requires all nodes to be to be authorized to run replSetGetStatus. // TODO(SERVER-14017): Remove this in favor of using initiate() everywhere. this.configRS.initiateWithAnyNodeAsPrimary(config); // Wait for master to be elected before starting mongos var csrsPrimary = this.configRS.getPrimary(); print("ShardingTest startup and initiation for all nodes took " + (new Date() - startTime) + "ms with " + this.configRS.nodeList().length + " config server nodes and " + totalNumShardNodes(shardsAsReplSets) + " total shard nodes."); // If 'otherParams.mongosOptions.binVersion' is an array value, then we'll end up constructing a // version iterator. const mongosOptions = []; for (var i = 0; i < numMongos; ++i) { let options = { useHostname: otherParams.useHostname, pathOpts: Object.merge(pathOpts, {mongos: i}), verbose: mongosVerboseLevel, keyFile: this.keyFile, }; if (otherParams.mongosOptions && otherParams.mongosOptions.binVersion) { otherParams.mongosOptions.binVersion = MongoRunner.versionIterator(otherParams.mongosOptions.binVersion); } options = Object.merge(options, otherParams.mongosOptions); options = Object.merge(options, otherParams["s" + i]); options.port = options.port || _allocatePortForMongos(); mongosOptions.push(options); } const configRS = this.configRS; if (_hasNewFeatureCompatibilityVersion() && this.isMixedVersionCluster()) { function setFeatureCompatibilityVersion() { assert.commandWorked( csrsPrimary.adminCommand({setFeatureCompatibilityVersion: lastStableFCV})); // Wait for the new featureCompatibilityVersion to propagate to all nodes in the CSRS // to ensure that older versions of mongos can successfully connect. configRS.awaitReplication(); } if (this.keyFile) { authutil.asCluster(this.configRS.nodes, this.keyFile, setFeatureCompatibilityVersion); } else { setFeatureCompatibilityVersion(); } } // If chunkSize has been requested for this test, write the configuration if (otherParams.chunkSize) { function setChunkSize() { assert.commandWorked(csrsPrimary.getDB('config').settings.update( {_id: 'chunksize'}, {$set: {value: otherParams.chunkSize}}, {upsert: true, writeConcern: {w: 'majority', wtimeout: kDefaultWTimeoutMs}})); configRS.awaitLastOpCommitted(); } if (this.keyFile) { authutil.asCluster(csrsPrimary, this.keyFile, setChunkSize); } else { setChunkSize(); } } this._configDB = this.configRS.getURL(); this._configServers = this.configRS.nodes; for (var i = 0; i < numConfigs; ++i) { var conn = this._configServers[i]; this["config" + i] = conn; this["c" + i] = conn; } printjson('Config servers: ' + this._configDB); var configConnection = _connectWithRetry(this._configDB); print("ShardingTest " + this._testName + " :\n" + tojson({config: this._configDB, shards: this._connections})); this._mongos = []; // Start the MongoS servers for (var i = 0; i < numMongos; i++) { const options = mongosOptions[i]; options.configdb = this._configDB; if (otherParams.useBridge) { var bridgeOptions = Object.merge(otherParams.bridgeOptions, options.bridgeOptions || {}); bridgeOptions = Object.merge(bridgeOptions, { hostName: otherParams.useHostname ? hostName : "localhost", port: _allocatePortForBridgeForMongos(), // The mongos processes identify themselves to mongobridge as host:port, where the // host is the actual hostname of the machine and not localhost. dest: hostName + ":" + options.port, }); var bridge = new MongoBridge(bridgeOptions); } var conn = MongoRunner.runMongos(options); if (!conn) { throw new Error("Failed to start mongos " + i); } if (otherParams.causallyConsistent) { conn.setCausalConsistency(true); } if (otherParams.useBridge) { bridge.connectToBridge(); this._mongos.push(bridge); unbridgedMongos.push(conn); } else { this._mongos.push(conn); } if (i === 0) { this.s = this._mongos[i]; this.admin = this._mongos[i].getDB('admin'); this.config = this._mongos[i].getDB('config'); } this["s" + i] = this._mongos[i]; } _extendWithShMethods(); // If auth is enabled for the test, login the mongos connections as system in order to configure // the instances and then log them out again. if (this.keyFile) { authutil.asCluster(this._mongos, this.keyFile, _configureCluster); } else if (mongosOptions[0] && mongosOptions[0].keyFile) { authutil.asCluster(this._mongos, mongosOptions[0].keyFile, _configureCluster); } else { _configureCluster(); // Ensure that all config server nodes are up to date with any changes made to balancer // settings before adding shards to the cluster. This prevents shards, which read // config.settings with readPreference 'nearest', from accidentally fetching stale values // from secondaries that aren't up-to-date. this.configRS.awaitLastOpCommitted(); } try { if (!otherParams.manualAddShard) { var testName = this._testName; var admin = this.admin; this._connections.forEach(function(z) { var n = z.name || z.host || z; print("ShardingTest " + testName + " going to add shard : " + n); var result = assert.commandWorked(admin.runCommand({addshard: n}), "Failed to add shard " + n); z.shardName = result.shardAdded; }); } } catch (e) { // Clean up the running procceses on failure print("Failed to add shards, stopping cluster."); this.stop(); throw e; } // Ensure that all CSRS nodes are up to date. This is strictly needed for tests that use // multiple mongoses. In those cases, the first mongos initializes the contents of the 'config' // database, but without waiting for those writes to replicate to all the config servers then // the secondary mongoses risk reading from a stale config server and seeing an empty config // database. this.configRS.awaitLastOpCommitted(); if (jsTestOptions().keyFile) { jsTest.authenticate(configConnection); jsTest.authenticateNodes(this._configServers); jsTest.authenticateNodes(this._mongos); } // Ensure that the sessions collection exists so jstests can run things with // logical sessions and test them. We do this by forcing an immediate cache refresh // on the config server, which auto-shards the collection for the cluster. var lastStableBinVersion = MongoRunner.getBinVersionFor('last-stable'); if ((!otherParams.configOptions) || (otherParams.configOptions && !otherParams.configOptions.binVersion) || (otherParams.configOptions && otherParams.configOptions.binVersion && MongoRunner.areBinVersionsTheSame( lastStableBinVersion, MongoRunner.getBinVersionFor(otherParams.configOptions.binVersion)))) { this.configRS.getPrimary().getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1}); const x509AuthRequired = (mongosOptions[0] && mongosOptions[0].clusterAuthMode && mongosOptions[0].clusterAuthMode === "x509"); // Flushes the routing table cache on connection 'conn'. If 'keyFileLocal' is defined, // authenticates the keyfile user on 'authConn' - a connection or set of connections for // the shard - before executing the flush. const flushRT = function flushRoutingTableAndHandleAuth(conn, authConn, keyFileLocal) { // Invokes the actual execution of cache refresh. const execFlushRT = (conn) => { assert.commandWorked(conn.getDB("admin").runCommand( {_flushRoutingTableCacheUpdates: "config.system.sessions"})); }; if (keyFileLocal) { authutil.asCluster(authConn, keyFileLocal, () => execFlushRT(conn)); } else { execFlushRT(conn); } }; // TODO SERVER-45108: Enable support for x509 auth for _flushRoutingTableCacheUpdates. if (!otherParams.manualAddShard && !x509AuthRequired) { for (let i = 0; i < numShards; i++) { const keyFileLocal = (otherParams.shards && otherParams.shards[i] && otherParams.shards[i].keyFile) ? otherParams.shards[i].keyFile : this.keyFile; if (otherParams.rs || otherParams["rs" + i] || startShardsAsRS) { const rs = this._rs[i].test; flushRT(rs.getPrimary(), rs.nodes, keyFileLocal); } else { // If specified, use the keyFile for the standalone shard. flushRT(this["shard" + i], this["shard" + i], keyFileLocal); } } } } }; // Stub for a hook to check that collection UUIDs are consistent across shards and the config // server. ShardingTest.prototype.checkUUIDsConsistentAcrossCluster = function() {}; // Stub for a hook to check that indexes are consistent across shards. ShardingTest.prototype.checkIndexesConsistentAcrossCluster = function() {}; ShardingTest.prototype.checkOrphansAreDeleted = function() { print("Unhooked function"); };