/** * Sets up a replica set. To make the set running, call {@link #startSet}, * followed by {@link #initiate} (and optionally, * {@link #awaitSecondaryNodes} to block till the set is fully operational). * Note that some of the replica start up parameters are not passed here, * but to the #startSet method. * * @param {Object} opts * * { * name {string}: name of this replica set. Default: 'testReplSet' * host {string}: name of the host machine. Hostname will be used * if not specified. * useHostName {boolean}: if true, use hostname of machine, * otherwise use localhost * nodes {number|Object|Array.}: number of replicas. Default: 0. * Can also be an Object (or Array). * Format for Object: * { * : replica member option Object. @see MongoRunner.runMongod * : and so on... * } * * Format for Array: * An array of replica member option Object. @see MongoRunner.runMongod * * Note: For both formats, a special boolean property 'arbiter' can be * specified to denote a member is an arbiter. * * nodeOptions {Object}: Options to apply to all nodes in the replica set. * Format for Object: * { cmdline-param-with-no-arg : "", * param-with-arg : arg } * This turns into "mongod --cmdline-param-with-no-arg --param-with-arg arg" * * oplogSize {number}: Default: 40 * useSeedList {boolean}: Use the connection string format of this set * as the replica set name (overrides the name property). Default: false * keyFile {string} * shardSvr {boolean}: Default: false * startPort {number}: port offset to be used for each replica. Default: 31000 * } * * Member variables: * numNodes {number} - number of nodes * nodes {Array.} - connection to replica set members */ ReplSetTest = function( opts ){ this.name = opts.name || "testReplSet"; this.useHostName = opts.useHostName == undefined ? true : opts.useHostName; this.host = this.useHostName ? (opts.host || getHostName()) : 'localhost'; this.oplogSize = opts.oplogSize || 40; this.useSeedList = opts.useSeedList || false; this.ports = []; this.keyFile = opts.keyFile this.shardSvr = opts.shardSvr || false; this.startPort = opts.startPort || 31000; this.nodeOptions = {} if( isObject( opts.nodes ) ){ var len = 0 for( var i in opts.nodes ){ var options = this.nodeOptions[ "n" + len ] = Object.merge(opts.nodeOptions, opts.nodes[i]); if( i.startsWith( "a" ) ) options.arbiter = true; len++ } this.numNodes = len } else if( Array.isArray( opts.nodes ) ){ for( var i = 0; i < opts.nodes.length; i++ ) this.nodeOptions[ "n" + i ] = Object.merge(opts.nodeOptions, opts.nodes[i]); this.numNodes = opts.nodes.length } else { for ( var i =0; i < opts.nodes; i++ ) this.nodeOptions[ "n" + i ] = opts.nodeOptions; this.numNodes = opts.nodes; } this.ports = allocatePorts( this.numNodes , this.startPort ); this.nodes = [] this.initLiveNodes() Object.extend( this, ReplSetTest.Health ) Object.extend( this, ReplSetTest.State ) } // List of nodes as host:port strings. ReplSetTest.prototype.nodeList = function() { var list = []; for(var i=0; i configVersion && slaveConfigVersion === masterVersion) { configVersion = slaveConfigVersion; print ("changing expected config version # since secondary # " + "was higher and the master # now matches it"); } return false; } // Continue if we're connected to an arbiter if (res = slave.getDB("admin").runCommand({replSetGetStatus: 1})) { if (res.myState == 7) { continue; } } ++secondaryCount; var name = slave.toString().substr(14); // strip "connection to " print("ReplSetTest awaitReplication: checking secondary #" + secondaryCount + ": " + name); slave.getDB("admin").getMongo().setSlaveOk(); var log = slave.getDB("local")['oplog.rs']; if (log.find({}).sort({'$natural': -1}).limit(1).hasNext()) { var entry = log.find({}).sort({'$natural': -1}).limit(1).next(); var ts = entry['ts']; if (self.latest.t < ts.t || (self.latest.t == ts.t && self.latest.i < ts.i)) { self.latest = self.liveNodes.master.getDB("local")['oplog.rs']. find({}). sort({'$natural': -1}). limit(1). next()['ts']; print("ReplSetTest awaitReplication: timestamp for " + name + " is newer, resetting latest to " + tojson(self.latest)); return false; } if (!friendlyEqual(self.latest, ts)) { print("ReplSetTest awaitReplication: timestamp for secondary #" + secondaryCount + ", " + name + ", is " + tojson(ts) + " but latest is " + tojson(self.latest)); print("ReplSetTest awaitReplication: last oplog entry (of " + log.count() + ") for secondary #" + secondaryCount + ", " + name + ", is " + tojsononeline(entry)); print("ReplSetTest awaitReplication: secondary #" + secondaryCount + ", " + name + ", is NOT synced"); return false; } print("ReplSetTest awaitReplication: secondary #" + secondaryCount + ", " + name + ", is synced"); } else { print("ReplSetTest awaitReplication: waiting for secondary #" + secondaryCount + ", " + name + ", to have an oplog built"); return false; } } print("ReplSetTest awaitReplication: finished: all " + secondaryCount + " secondaries synced at timestamp " + tojson(self.latest)); return true; } catch (e) { print("ReplSetTest awaitReplication: caught exception: " + e); // we might have a new master now self.getLastOpTimeWritten(); print("ReplSetTest awaitReplication: resetting: timestamp for primary " + self.liveNodes.master + " is " + tojson(self.latest)); return false; } }, "awaiting replication", timeout); } ReplSetTest.prototype.getHashes = function( db ){ this.getMaster(); var res = {}; res.master = this.liveNodes.master.getDB( db ).runCommand( "dbhash" ) res.slaves = this.liveNodes.slaves.map( function(z){ return z.getDB( db ).runCommand( "dbhash" ); } ) return res; } /** * Starts up a server. Options are saved by default for subsequent starts. * * * Options { remember : true } re-applies the saved options from a prior start. * Options { noRemember : true } ignores the current properties. * Options { appendOptions : true } appends the current options to those remembered. * Options { startClean : true } clears the data directory before starting. * * @param {int|conn|[int|conn]} n array or single server number (0, 1, 2, ...) or conn * @param {object} [options] * @param {boolean} [restart] If false, the data directory will be cleared * before the server starts. Default: false. * */ ReplSetTest.prototype.start = function( n , options , restart , wait ){ if( n.length ){ var nodes = n var started = [] for( var i = 0; i < nodes.length; i++ ){ if( this.start( nodes[i], Object.merge({}, options), restart, wait ) ){ started.push( nodes[i] ) } } return started } print( "ReplSetTest n is : " + n ) defaults = { useHostName : this.useHostName, oplogSize : this.oplogSize, keyFile : this.keyFile, port : this.getPort( n ), noprealloc : "", smallfiles : "", rest : "", replSet : this.useSeedList ? this.getURL() : this.name, dbpath : "$set-$node" } defaults = Object.merge( defaults, ReplSetTest.nodeOptions || {} ) // TODO : should we do something special if we don't currently know about this node? n = this.getNodeId( n ) // // Note : this replaces the binVersion of the shared startSet() options the first time // through, so the full set is guaranteed to have different versions if size > 1. If using // start() independently, independent version choices will be made // if( options && options.binVersion ){ options.binVersion = MongoRunner.versionIterator( options.binVersion ) } options = Object.merge( defaults, options ) options = Object.merge( options, this.nodeOptions[ "n" + n ] ) options.restart = options.restart || restart var pathOpts = { node : n, set : this.name } options.pathOpts = Object.merge( options.pathOpts || {}, pathOpts ) if( tojson(options) != tojson({}) ) printjson(options) // make sure to call getPath, otherwise folders wont be cleaned this.getPath(n); print("ReplSetTest " + (restart ? "(Re)" : "") + "Starting...."); var rval = this.nodes[n] = MongoRunner.runMongod( options ) if( ! rval ) return rval // Add replica set specific attributes this.nodes[n].nodeId = n printjson( this.nodes ) wait = wait || false if( ! wait.toFixed ){ if( wait ) wait = 0 else wait = -1 } if( wait < 0 ) return rval // Wait for startup this.waitForHealth( rval, this.UP, wait ) return rval } /** * Restarts a db without clearing the data directory by default. If the server is not * stopped first, this function will not work. * * Option { startClean : true } forces clearing the data directory. * Option { auth : Object } object that contains the auth details for admin credentials. * Should contain the fields 'user' and 'pwd' * * @param {int|conn|[int|conn]} n array or single server number (0, 1, 2, ...) or conn */ ReplSetTest.prototype.restart = function( n , options, signal, wait ){ // Can specify wait as third parameter, if using default signal if( signal == true || signal == false ){ wait = signal signal = undefined } this.stop(n, signal, options); started = this.start( n , options , true, wait ); if (jsTestOptions().keyFile) { if (started.length) { // if n was an array of conns, start will return an array of connections for (var i = 0; i < started.length; i++) { jsTest.authenticate(started[i]); } } else { jsTest.authenticate(started); } } return started; } ReplSetTest.prototype.stopMaster = function(signal, opts) { var master = this.getMaster(); var master_id = this.getNodeId( master ); return this.stop(master_id, signal, opts); } /** * Stops a particular node or nodes, specified by conn or id * * @param {number|Mongo} n the index or connection object of the replica set member to stop. * @param {number} signal the signal number to use for killing * @param {Object} opts @see MongoRunner.stopMongod */ ReplSetTest.prototype.stop = function(n, signal, opts) { // Flatten array of nodes to stop if( n.length ){ nodes = n var stopped = [] for( var i = 0; i < nodes.length; i++ ){ if (this.stop(nodes[i], signal, opts)) stopped.push( nodes[i] ) } return stopped } // Can specify wait as second parameter, if using default signal if( signal == true || signal == false ){ signal = undefined } var port = this.getPort( n ); print('ReplSetTest stop *** Shutting down mongod in port ' + port + ' ***'); var ret = MongoRunner.stopMongod( port , signal, opts ); print('ReplSetTest stop *** Mongod in port ' + port + ' shutdown with code (' + ret + ') ***'); return ret; }; /** * Kill all members of this replica set. * * @param {number} signal The signal number to use for killing the members * @param {boolean} forRestart will not cleanup data directory or teardown * bridges if set to true. * @param {Object} opts @see MongoRunner.stopMongod */ ReplSetTest.prototype.stopSet = function( signal , forRestart, opts ) { for(var i=0; i < this.ports.length; i++) { this.stop(i, signal, opts); } if ( forRestart ) { return; } if ( this._alldbpaths ){ print("ReplSetTest stopSet deleting all dbpaths"); for( i=0; i 1) { var readers = []; var largestTS = null; var nodes = this.nodes; var rsSize = nodes.length; for (var i = 0; i < rsSize; i++) { readers[i] = new OplogReader(nodes[i]); var currTS = readers[i].getFirstDoc()["ts"]; if (currTS.t > largestTS.t || (currTS.t == largestTS.t && currTS.i > largestTS.i) ) { largestTS = currTS; } } // start all oplogReaders at the same place. for (var i = 0; i < rsSize; i++) { readers[i].query(largestTS); } var firstReader = readers[0]; while (firstReader.hasNext()) { var ts = firstReader.next()["ts"]; for(var i = 1; i < rsSize; i++) { assert.eq(ts, readers[i].next()["ts"], " non-matching ts for node: " + readers[i].mongo); } } // ensure no other node has more oplog for (var i = 1; i < rsSize; i++) { assert.eq(false, readers[i].hasNext(), "" + readers[i] + " shouldn't have more oplog."); } } } /** * Waits until there is a master node */ ReplSetTest.prototype.waitForMaster = function( timeout ){ var master = undefined var self = this; assert.soon(function() { return ( master = self.getMaster() ); }, "waiting for master", timeout); return master } /** * Wait for a health indicator to go to a particular state or states. * * @param node is a single node or list of nodes, by id or conn * @param state is a single state or list of states. ReplSetTest.Health.DOWN can * only be used in cases when there is a primary available or slave[0] can * respond to the isMaster command. */ ReplSetTest.prototype.waitForHealth = function( node, state, timeout ){ this.waitForIndicator( node, state, "health", timeout ) } /** * Wait for a state indicator to go to a particular state or states. * * @param node is a single node or list of nodes, by id or conn * @param state is a single state or list of states * */ ReplSetTest.prototype.waitForState = function( node, state, timeout ){ this.waitForIndicator( node, state, "state", timeout ) } /** * Wait for a rs indicator to go to a particular state or states. * * @param node is a single node or list of nodes, by id or conn * @param states is a single state or list of states * @param ind is the indicator specified * */ ReplSetTest.prototype.waitForIndicator = function( node, states, ind, timeout ){ if( node.length ){ var nodes = node for( var i = 0; i < nodes.length; i++ ){ if( states.length ) this.waitForIndicator( nodes[i], states[i], ind, timeout ) else this.waitForIndicator( nodes[i], states, ind, timeout ) } return; } timeout = timeout || 30000; if( ! node.getDB ){ node = this.nodes[node] } if( ! states.length ) states = [ states ] print( "ReplSetTest waitForIndicator " + ind + " on " + node ) printjson( states ) print( "ReplSetTest waitForIndicator from node " + node ) var lastTime = null var currTime = new Date().getTime() var status = undefined; var self = this; assert.soon(function() { try { var conn = self.callIsMaster(); if (!conn) conn = self.liveNodes.slaves[0]; if (!conn) return false; // Try again to load connection var getStatusFunc = function() { status = conn.getDB('admin').runCommand({replSetGetStatus: 1}); }; if (self.keyFile) { // Authenticate connection used for running replSetGetStatus if needed. authutil.asCluster(conn, self.keyFile, getStatusFunc); } else { getStatusFunc(); } } catch ( ex ) { print( "ReplSetTest waitForIndicator could not get status: " + tojson( ex ) ); return false; } var printStatus = false if( lastTime == null || ( currTime = new Date().getTime() ) - (1000 * 5) > lastTime ) { if( lastTime == null ) { print( "ReplSetTest waitForIndicator Initial status ( timeout : " + timeout + " ) :" ); } printjson( status ); lastTime = new Date().getTime(); printStatus = true; } if (typeof status.members == 'undefined') { return false; } for( var i = 0; i < status.members.length; i++ ) { if( printStatus ) { print( "Status for : " + status.members[i].name + ", checking " + node.host + "/" + node.name ); } if( status.members[i].name == node.host || status.members[i].name == node.name ) { for( var j = 0; j < states.length; j++ ) { if( printStatus ) { print( "Status " + " : " + status.members[i][ind] + " target state : " + states[j] ); } if( status.members[i][ind] == states[j] ) { return true; } } } } return false; }, "waiting for state indicator " + ind + " for " + timeout + "ms", timeout); print( "ReplSetTest waitForIndicator final status:" ) printjson( status ) }; ReplSetTest.Health = {}; ReplSetTest.Health.UP = 1; ReplSetTest.Health.DOWN = 0; ReplSetTest.State = {}; ReplSetTest.State.PRIMARY = 1; ReplSetTest.State.SECONDARY = 2; ReplSetTest.State.RECOVERING = 3; // Note there is no state 4. ReplSetTest.State.STARTUP_2 = 5; ReplSetTest.State.UNKNOWN = 6; ReplSetTest.State.ARBITER = 7; ReplSetTest.State.DOWN = 8; ReplSetTest.State.ROLLBACK = 9; ReplSetTest.State.REMOVED = 10; /** * Overflows a replica set secondary or secondaries, specified by id or conn. */ ReplSetTest.prototype.overflow = function( secondaries ){ // Create a new collection to overflow, allow secondaries to replicate var master = this.getMaster() var overflowColl = master.getCollection( "_overflow.coll" ) overflowColl.insert({ replicated : "value" }) this.awaitReplication() this.stop(secondaries); var count = master.getDB("local").oplog.rs.count(); var prevCount = -1; // Keep inserting till we hit our capped coll limits while (count != prevCount) { print("ReplSetTest overflow inserting 10000"); var bulk = overflowColl.initializeUnorderedBulkOp(); for (var i = 0; i < 10000; i++) { bulk.insert({ overflow : "value" }); } bulk.execute(); prevCount = count; this.awaitReplication(); count = master.getDB("local").oplog.rs.count(); print( "ReplSetTest overflow count : " + count + " prev : " + prevCount ); } // Restart all our secondaries and wait for recovery state this.start( secondaries, { remember : true }, true, true ) this.waitForState( secondaries, this.RECOVERING, 5 * 60 * 1000 ) } /** * Bridging allows you to test network partitioning. For example, you can set * up a replica set, run bridge(), then kill the connection between any two * nodes x and y with partition(x, y). * * Once you have called bridging, you cannot reconfigure the replica set. */ ReplSetTest.prototype.bridge = function( opts ) { if (this.bridges) { print("ReplSetTest bridge bridges have already been created!"); return; } var n = this.nodes.length; // create bridges this.bridges = []; for (var i=0; i1, 0->2, 1->0, 1->2, 2->0, 2->1. We can kill * the connection between nodes 0 and 2 by calling replTest.partition(0,2) or * replTest.partition(2,0) (either way is identical). Then the replica set would * have the following bridges: 0->1, 1->0, 1->2, 2->1. * * The bidirectional parameter, which defaults to true, determines whether * replTest.partition(0,2) will stop the bridges for 0->2 and 2->0 (true), or * just 0->2 (false). */ ReplSetTest.prototype.partition = function(from, to, bidirectional) { bidirectional = typeof bidirectional !== 'undefined' ? bidirectional : true; this.bridges[from][to].stop(); if (bidirectional) { this.bridges[to][from].stop(); } }; /** * This reverses a partition created by partition() above. */ ReplSetTest.prototype.unPartition = function(from, to, bidirectional) { bidirectional = typeof bidirectional !== 'undefined' ? bidirectional : true; this.bridges[from][to].start(); if (bidirectional) { this.bridges[to][from].start(); } }; /** * Helpers for partitioning in only one direction so that the test files are more clear to readers. */ ReplSetTest.prototype.partitionOneWay = function(from, to) { this.partition(from, to, false); }; ReplSetTest.prototype.unPartitionOneWay = function(from, to) { this.unPartition(from, to, false); }; /** * Helpers for adding/removing delays from a partition. */ ReplSetTest.prototype.addPartitionDelay = function(from, to, delay, bidirectional) { bidirectional = typeof bidirectional !== 'undefined' ? bidirectional : true; this.bridges[from][to].setDelay(delay); if (bidirectional) { this.bridges[to][from].setDelay(delay); } }; ReplSetTest.prototype.removePartitionDelay = function(from, to, bidirectional) { this.addPartitionDelay(from, to, 0, bidirectional); }; ReplSetTest.prototype.addOneWayPartitionDelay = function(from, to, delay) { this.addPartitionDelay(from, to, delay, false); }; ReplSetTest.prototype.removeOneWayPartitionDelay = function(from, to) { this.addPartitionDelay(from, to, 0, false); };