path: root/src/mongo/db/repl.cpp
diff options
Diffstat (limited to 'src/mongo/db/repl.cpp')
1 files changed, 1516 insertions, 0 deletions
diff --git a/src/mongo/db/repl.cpp b/src/mongo/db/repl.cpp
new file mode 100644
index 00000000000..25ecb6b455f
--- /dev/null
+++ b/src/mongo/db/repl.cpp
@@ -0,0 +1,1516 @@
+// repl.cpp
+/* TODO
+ _ on a syncexception, don't allow going back to master state?
+* Copyright (C) 2008 10gen Inc.
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* GNU Affero General Public License for more details.
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <>.
+/* Collections we use:
+ local.sources - indicates what sources we pull from as a "slave", and the last update of each
+ local.oplog.$main - our op log as "master"
+ local.dbinfo.<dbname> - no longer used???
+ local.pair.startup - [deprecated] can contain a special value indicating for a pair that we have the master copy.
+ used when replacing other half of the pair which has permanently failed.
+ local.pair.sync - [deprecated] { initialsynccomplete: 1 }
+#include "pch.h"
+#include "jsobj.h"
+#include "../util/goodies.h"
+#include "repl.h"
+#include "../util/net/message.h"
+#include "../util/background.h"
+#include "../client/dbclient.h"
+#include "../client/connpool.h"
+#include "pdfile.h"
+#include "ops/query.h"
+#include "db.h"
+#include "commands.h"
+#include "security.h"
+#include "cmdline.h"
+#include "repl_block.h"
+#include "repl/rs.h"
+#include "replutil.h"
+#include "repl/connections.h"
+#include "ops/update.h"
+namespace mongo {
+ // our config from command line etc.
+ ReplSettings replSettings;
+ /* if 1 sync() is running */
+ volatile int syncing = 0;
+ static volatile int relinquishSyncingSome = 0;
+ /* "dead" means something really bad happened like replication falling completely out of sync.
+ when non-null, we are dead and the string is informational
+ */
+ const char *replAllDead = 0;
+ time_t lastForcedResync = 0;
+} // namespace mongo
+namespace mongo {
+ /* output by the web console */
+ const char *replInfo = "";
+ struct ReplInfo {
+ ReplInfo(const char *msg) {
+ replInfo = msg;
+ }
+ ~ReplInfo() {
+ replInfo = "?";
+ }
+ };
+ /* operator requested resynchronization of replication (on the slave). { resync : 1 } */
+ class CmdResync : public Command {
+ public:
+ virtual bool slaveOk() const {
+ return true;
+ }
+ virtual bool adminOnly() const {
+ return true;
+ }
+ virtual bool logTheOp() { return false; }
+ virtual LockType locktype() const { return WRITE; }
+ void help(stringstream&h) const { h << "resync (from scratch) an out of date replica slave.\n"; }
+ CmdResync() : Command("resync") { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( cmdLine.usingReplSets() ) {
+ errmsg = "resync command not currently supported with replica sets. See RS102 info in the mongodb documentations";
+ result.append("info", "");
+ return false;
+ }
+ if ( cmdObj.getBoolField( "force" ) ) {
+ if ( !waitForSyncToFinish( errmsg ) )
+ return false;
+ replAllDead = "resync forced";
+ }
+ if ( !replAllDead ) {
+ errmsg = "not dead, no need to resync";
+ return false;
+ }
+ if ( !waitForSyncToFinish( errmsg ) )
+ return false;
+ ReplSource::forceResyncDead( "client" );
+ result.append( "info", "triggered resync for all sources" );
+ return true;
+ }
+ bool waitForSyncToFinish( string &errmsg ) const {
+ // Wait for slave thread to finish syncing, so sources will be be
+ // reloaded with new saved state on next pass.
+ Timer t;
+ while ( 1 ) {
+ if ( syncing == 0 || t.millis() > 30000 )
+ break;
+ {
+ dbtemprelease t;
+ relinquishSyncingSome = 1;
+ sleepmillis(1);
+ }
+ }
+ if ( syncing ) {
+ errmsg = "timeout waiting for sync() to finish";
+ return false;
+ }
+ return true;
+ }
+ } cmdResync;
+ bool anyReplEnabled() {
+ return replSettings.slave || replSettings.master || theReplSet;
+ }
+ bool replAuthenticate(DBClientBase *conn);
+ void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level ) {
+ if ( replSet ) {
+ if( theReplSet == 0 ) {
+ result.append("ismaster", false);
+ result.append("secondary", false);
+ result.append("info", ReplSet::startupStatusMsg.get());
+ result.append( "isreplicaset" , true );
+ return;
+ }
+ theReplSet->fillIsMaster(result);
+ return;
+ }
+ if ( replAllDead ) {
+ result.append("ismaster", 0);
+ string s = string("dead: ") + replAllDead;
+ result.append("info", s);
+ }
+ else {
+ result.appendBool("ismaster", _isMaster() );
+ }
+ if ( level && replSet ) {
+ result.append( "info" , "is replica set" );
+ }
+ else if ( level ) {
+ BSONObjBuilder sources( result.subarrayStart( "sources" ) );
+ readlock lk( "local.sources" );
+ Client::Context ctx( "local.sources", dbpath, authed );
+ shared_ptr<Cursor> c = findTableScan("local.sources", BSONObj());
+ int n = 0;
+ while ( c->ok() ) {
+ BSONObj s = c->current();
+ BSONObjBuilder bb;
+ bb.append( s["host"] );
+ string sourcename = s["source"].valuestr();
+ if ( sourcename != "main" )
+ bb.append( s["source"] );
+ {
+ BSONElement e = s["syncedTo"];
+ BSONObjBuilder t( bb.subobjStart( "syncedTo" ) );
+ t.appendDate( "time" , e.timestampTime() );
+ t.append( "inc" , e.timestampInc() );
+ t.done();
+ }
+ if ( level > 1 ) {
+ dbtemprelease unlock;
+ // note: there is no so-style timeout on this connection; perhaps we should have one.
+ ScopedDbConnection conn( s["host"].valuestr() );
+ DBClientConnection *cliConn = dynamic_cast< DBClientConnection* >( &conn.conn() );
+ if ( cliConn && replAuthenticate( cliConn ) ) {
+ BSONObj first = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << 1 ) ) );
+ BSONObj last = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << -1 ) ) );
+ bb.appendDate( "masterFirst" , first["ts"].timestampTime() );
+ bb.appendDate( "masterLast" , last["ts"].timestampTime() );
+ double lag = (double) (last["ts"].timestampTime() - s["syncedTo"].timestampTime());
+ bb.append( "lagSeconds" , lag / 1000 );
+ }
+ conn.done();
+ }
+ sources.append( BSONObjBuilder::numStr( n++ ) , bb.obj() );
+ c->advance();
+ }
+ sources.done();
+ }
+ }
+ class CmdIsMaster : public Command {
+ public:
+ virtual bool requiresAuth() { return false; }
+ virtual bool slaveOk() const {
+ return true;
+ }
+ virtual void help( stringstream &help ) const {
+ help << "Check if this server is primary for a replica pair/set; also if it is --master or --slave in simple master/slave setups.\n";
+ help << "{ isMaster : 1 }";
+ }
+ virtual LockType locktype() const { return NONE; }
+ CmdIsMaster() : Command("isMaster", true, "ismaster") { }
+ virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) {
+ /* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not
+ authenticated.
+ we allow unauthenticated ismaster but we aren't as verbose informationally if
+ one is not authenticated for admin db to be safe.
+ */
+ bool authed = cc().getAuthenticationInfo()->isAuthorizedReads("admin");
+ appendReplicationInfo( result , authed );
+ result.appendNumber("maxBsonObjectSize", BSONObjMaxUserSize);
+ return true;
+ }
+ } cmdismaster;
+ ReplSource::ReplSource() {
+ nClonedThisPass = 0;
+ }
+ ReplSource::ReplSource(BSONObj o) : nClonedThisPass(0) {
+ only = o.getStringField("only");
+ hostName = o.getStringField("host");
+ _sourceName = o.getStringField("source");
+ uassert( 10118 , "'host' field not set in sources collection object", !hostName.empty() );
+ uassert( 10119 , "only source='main' allowed for now with replication", sourceName() == "main" );
+ BSONElement e = o.getField("syncedTo");
+ if ( !e.eoo() ) {
+ uassert( 10120 , "bad sources 'syncedTo' field value", e.type() == Date || e.type() == Timestamp );
+ OpTime tmp( );
+ syncedTo = tmp;
+ }
+ BSONObj dbsObj = o.getObjectField("dbsNextPass");
+ if ( !dbsObj.isEmpty() ) {
+ BSONObjIterator i(dbsObj);
+ while ( 1 ) {
+ BSONElement e =;
+ if ( e.eoo() )
+ break;
+ addDbNextPass.insert( e.fieldName() );
+ }
+ }
+ dbsObj = o.getObjectField("incompleteCloneDbs");
+ if ( !dbsObj.isEmpty() ) {
+ BSONObjIterator i(dbsObj);
+ while ( 1 ) {
+ BSONElement e =;
+ if ( e.eoo() )
+ break;
+ incompleteCloneDbs.insert( e.fieldName() );
+ }
+ }
+ }
+ /* Turn our C++ Source object into a BSONObj */
+ BSONObj ReplSource::jsobj() {
+ BSONObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName());
+ if ( !only.empty() )
+ b.append("only", only);
+ if ( !syncedTo.isNull() )
+ b.appendTimestamp("syncedTo", syncedTo.asDate());
+ BSONObjBuilder dbsNextPassBuilder;
+ int n = 0;
+ for ( set<string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++ ) {
+ n++;
+ dbsNextPassBuilder.appendBool(*i, 1);
+ }
+ if ( n )
+ b.append("dbsNextPass", dbsNextPassBuilder.done());
+ BSONObjBuilder incompleteCloneDbsBuilder;
+ n = 0;
+ for ( set<string>::iterator i = incompleteCloneDbs.begin(); i != incompleteCloneDbs.end(); i++ ) {
+ n++;
+ incompleteCloneDbsBuilder.appendBool(*i, 1);
+ }
+ if ( n )
+ b.append("incompleteCloneDbs", incompleteCloneDbsBuilder.done());
+ return b.obj();
+ }
+ void ReplSource::save() {
+ BSONObjBuilder b;
+ assert( !hostName.empty() );
+ b.append("host", hostName);
+ // todo: finish allowing multiple source configs.
+ // this line doesn't work right when source is null, if that is allowed as it is now:
+ //b.append("source", _sourceName);
+ BSONObj pattern = b.done();
+ BSONObj o = jsobj();
+ log( 1 ) << "Saving repl source: " << o << endl;
+ {
+ OpDebug debug;
+ Client::Context ctx("local.sources");
+ UpdateResult res = updateObjects("local.sources", o, pattern, true/*upsert for pair feature*/, false,false,debug);
+ assert( ! res.mod );
+ assert( res.num == 1 );
+ }
+ }
+ static void addSourceToList(ReplSource::SourceVector &v, ReplSource& s, ReplSource::SourceVector &old) {
+ if ( !s.syncedTo.isNull() ) { // Don't reuse old ReplSource if there was a forced resync.
+ for ( ReplSource::SourceVector::iterator i = old.begin(); i != old.end(); ) {
+ if ( s == **i ) {
+ v.push_back(*i);
+ old.erase(i);
+ return;
+ }
+ i++;
+ }
+ }
+ v.push_back( shared_ptr< ReplSource >( new ReplSource( s ) ) );
+ }
+ /* we reuse our existing objects so that we can keep our existing connection
+ and cursor in effect.
+ */
+ void ReplSource::loadAll(SourceVector &v) {
+ Client::Context ctx("local.sources");
+ SourceVector old = v;
+ v.clear();
+ if ( !cmdLine.source.empty() ) {
+ // --source <host> specified.
+ // check that no items are in sources other than that
+ // add if missing
+ shared_ptr<Cursor> c = findTableScan("local.sources", BSONObj());
+ int n = 0;
+ while ( c->ok() ) {
+ n++;
+ ReplSource tmp(c->current());
+ if ( tmp.hostName != cmdLine.source ) {
+ log() << "repl: --source " << cmdLine.source << " != " << tmp.hostName << " from local.sources collection" << endl;
+ log() << "repl: for instructions on changing this slave's source, see:" << endl;
+ log() << "" << endl;
+ log() << "repl: terminating mongod after 30 seconds" << endl;
+ sleepsecs(30);
+ }
+ if ( tmp.only != cmdLine.only ) {
+ log() << "--only " << cmdLine.only << " != " << tmp.only << " from local.sources collection" << endl;
+ log() << "terminating after 30 seconds" << endl;
+ sleepsecs(30);
+ }
+ c->advance();
+ }
+ uassert( 10002 , "local.sources collection corrupt?", n<2 );
+ if ( n == 0 ) {
+ // source missing. add.
+ ReplSource s;
+ s.hostName = cmdLine.source;
+ s.only = cmdLine.only;
+ }
+ }
+ else {
+ try {
+ massert( 10384 , "--only requires use of --source", cmdLine.only.empty());
+ }
+ catch ( ... ) {
+ dbexit( EXIT_BADOPTIONS );
+ }
+ }
+ shared_ptr<Cursor> c = findTableScan("local.sources", BSONObj());
+ while ( c->ok() ) {
+ ReplSource tmp(c->current());
+ if ( tmp.syncedTo.isNull() ) {
+ DBDirectClient c;
+ if ( c.exists( "local.oplog.$main" ) ) {
+ BSONObj op = c.findOne( "local.oplog.$main", QUERY( "op" << NE << "n" ).sort( BSON( "$natural" << -1 ) ) );
+ if ( !op.isEmpty() ) {
+ tmp.syncedTo = op[ "ts" ].date();
+ }
+ }
+ }
+ addSourceToList(v, tmp, old);
+ c->advance();
+ }
+ }
+ BSONObj opTimeQuery = fromjson("{\"getoptime\":1}");
+ bool ReplSource::throttledForceResyncDead( const char *requester ) {
+ if ( time( 0 ) - lastForcedResync > 600 ) {
+ forceResyncDead( requester );
+ lastForcedResync = time( 0 );
+ return true;
+ }
+ return false;
+ }
+ void ReplSource::forceResyncDead( const char *requester ) {
+ if ( !replAllDead )
+ return;
+ SourceVector sources;
+ ReplSource::loadAll(sources);
+ for( SourceVector::iterator i = sources.begin(); i != sources.end(); ++i ) {
+ log() << requester << " forcing resync from " << (*i)->hostName << endl;
+ (*i)->forceResync( requester );
+ }
+ replAllDead = 0;
+ }
+ void ReplSource::forceResync( const char *requester ) {
+ BSONObj info;
+ {
+ dbtemprelease t;
+ if (!oplogReader.connect(hostName)) {
+ msgassertedNoTrace( 14051 , "unable to connect to resync");
+ }
+ /* todo use getDatabaseNames() method here */
+ bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info );
+ massert( 10385 , "Unable to get database list", ok );
+ }
+ BSONObjIterator i( info.getField( "databases" ).embeddedObject() );
+ while( i.moreWithEOO() ) {
+ BSONElement e =;
+ if ( e.eoo() )
+ break;
+ string name = e.embeddedObject().getField( "name" ).valuestr();
+ if ( !e.embeddedObject().getBoolField( "empty" ) ) {
+ if ( name != "local" ) {
+ if ( only.empty() || only == name ) {
+ resyncDrop( name.c_str(), requester );
+ }
+ }
+ }
+ }
+ syncedTo = OpTime();
+ addDbNextPass.clear();
+ save();
+ }
+ string ReplSource::resyncDrop( const char *db, const char *requester ) {
+ log() << "resync: dropping database " << db << endl;
+ Client::Context ctx(db);
+ dropDatabase(db);
+ return db;
+ }
+ /* grab initial copy of a database from the master */
+ void ReplSource::resync(string db) {
+ string dummyNs = resyncDrop( db.c_str(), "internal" );
+ Client::Context ctx( dummyNs );
+ {
+ log() << "resync: cloning database " << db << " to get an initial copy" << endl;
+ ReplInfo r("resync: cloning a database");
+ string errmsg;
+ int errCode = 0;
+ bool ok = cloneFrom(hostName.c_str(), errmsg, cc().database()->name, false, /*slaveOk*/ true, /*replauth*/ true, /*snapshot*/false, /*mayYield*/true, /*mayBeInterrupted*/false, &errCode);
+ if ( !ok ) {
+ if ( errCode == DatabaseDifferCaseCode ) {
+ resyncDrop( db.c_str(), "internal" );
+ log() << "resync: database " << db << " not valid on the master due to a name conflict, dropping." << endl;
+ return;
+ }
+ else {
+ problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl;
+ throw SyncException();
+ }
+ }
+ }
+ log() << "resync: done with initial clone for db: " << db << endl;
+ return;
+ }
+ DatabaseIgnorer ___databaseIgnorer;
+ void DatabaseIgnorer::doIgnoreUntilAfter( const string &db, const OpTime &futureOplogTime ) {
+ if ( futureOplogTime > _ignores[ db ] ) {
+ _ignores[ db ] = futureOplogTime;
+ }
+ }
+ bool DatabaseIgnorer::ignoreAt( const string &db, const OpTime &currentOplogTime ) {
+ if ( _ignores[ db ].isNull() ) {
+ return false;
+ }
+ if ( _ignores[ db ] >= currentOplogTime ) {
+ return true;
+ } else {
+ // The ignore state has expired, so clear it.
+ _ignores.erase( db );
+ return false;
+ }
+ }
+ bool ReplSource::handleDuplicateDbName( const BSONObj &op, const char *ns, const char *db ) {
+ if ( dbHolder()._isLoaded( ns, dbpath ) ) {
+ // Database is already present.
+ return true;
+ }
+ BSONElement ts = op.getField( "ts" );
+ if ( ( ts.type() == Date || ts.type() == Timestamp ) && ___databaseIgnorer.ignoreAt( db, ) ) {
+ // Database is ignored due to a previous indication that it is
+ // missing from master after optime "ts".
+ return false;
+ }
+ if ( Database::duplicateUncasedName( false, db, dbpath ).empty() ) {
+ // No duplicate database names are present.
+ return true;
+ }
+ OpTime lastTime;
+ bool dbOk = false;
+ {
+ dbtemprelease release;
+ // We always log an operation after executing it (never before), so
+ // a database list will always be valid as of an oplog entry generated
+ // before it was retrieved.
+ BSONObj last = oplogReader.findOne( this->ns().c_str(), Query().sort( BSON( "$natural" << -1 ) ) );
+ if ( !last.isEmpty() ) {
+ BSONElement ts = last.getField( "ts" );
+ massert( 14032, "Invalid 'ts' in remote log", ts.type() == Date || ts.type() == Timestamp );
+ lastTime = OpTime( );
+ }
+ BSONObj info;
+ bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info );
+ massert( 14033, "Unable to get database list", ok );
+ BSONObjIterator i( info.getField( "databases" ).embeddedObject() );
+ while( i.more() ) {
+ BSONElement e =;
+ const char * name = e.embeddedObject().getField( "name" ).valuestr();
+ if ( strcasecmp( name, db ) != 0 )
+ continue;
+ if ( strcmp( name, db ) == 0 ) {
+ // The db exists on master, still need to check that no conflicts exist there.
+ dbOk = true;
+ continue;
+ }
+ // The master has a db name that conflicts with the requested name.
+ dbOk = false;
+ break;
+ }
+ }
+ if ( !dbOk ) {
+ ___databaseIgnorer.doIgnoreUntilAfter( db, lastTime );
+ incompleteCloneDbs.erase(db);
+ addDbNextPass.erase(db);
+ return false;
+ }
+ // Check for duplicates again, since we released the lock above.
+ set< string > duplicates;
+ Database::duplicateUncasedName( false, db, dbpath, &duplicates );
+ // The database is present on the master and no conflicting databases
+ // are present on the master. Drop any local conflicts.
+ for( set< string >::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i ) {
+ ___databaseIgnorer.doIgnoreUntilAfter( *i, lastTime );
+ incompleteCloneDbs.erase(*i);
+ addDbNextPass.erase(*i);
+ Client::Context ctx(*i);
+ dropDatabase(*i);
+ }
+ massert( 14034, "Duplicate database names present after attempting to delete duplicates",
+ Database::duplicateUncasedName( false, db, dbpath ).empty() );
+ return true;
+ }
+ void ReplSource::applyOperation(const BSONObj& op) {
+ try {
+ bool failedUpdate = applyOperation_inlock( op );
+ if (failedUpdate) {
+ Sync sync(hostName);
+ if (sync.shouldRetry(op)) {
+ uassert(15914, "Failure retrying initial sync update", !applyOperation_inlock(op));
+ }
+ }
+ }
+ catch ( UserException& e ) {
+ log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;;
+ }
+ catch ( DBException& e ) {
+ log() << "sync: caught db exception " << e << " while applying op: " << op << endl;;
+ }
+ }
+ /* local.$oplog.main is of the form:
+ { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
+ ...
+ see logOp() comments.
+ @param alreadyLocked caller already put us in write lock if true
+ */
+ void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, bool alreadyLocked) {
+ if( logLevel >= 6 ) // op.tostring is expensive so doing this check explicitly
+ log(6) << "processing op: " << op << endl;
+ if( op.getStringField("op")[0] == 'n' )
+ return;
+ char clientName[MaxDatabaseNameLen];
+ const char *ns = op.getStringField("ns");
+ nsToDatabase(ns, clientName);
+ if ( *ns == '.' ) {
+ problem() << "skipping bad op in oplog: " << op.toString() << endl;
+ return;
+ }
+ else if ( *ns == 0 ) {
+ /*if( op.getStringField("op")[0] != 'n' )*/ {
+ problem() << "halting replication, bad op in oplog:\n " << op.toString() << endl;
+ replAllDead = "bad object in oplog";
+ throw SyncException();
+ }
+ //ns = "local.system.x";
+ //nsToDatabase(ns, clientName);
+ }
+ if ( !only.empty() && only != clientName )
+ return;
+ if( cmdLine.pretouch && !alreadyLocked/*doesn't make sense if in write lock already*/ ) {
+ if( cmdLine.pretouch > 1 ) {
+ /* note: this is bad - should be put in ReplSource. but this is first test... */
+ static int countdown;
+ assert( countdown >= 0 );
+ if( countdown > 0 ) {
+ countdown--; // was pretouched on a prev pass
+ }
+ else {
+ const int m = 4;
+ if( tp.get() == 0 ) {
+ int nthr = min(8, cmdLine.pretouch);
+ nthr = max(nthr, 1);
+ tp.reset( new ThreadPool(nthr) );
+ }
+ vector<BSONObj> v;
+ oplogReader.peek(v, cmdLine.pretouch);
+ unsigned a = 0;
+ while( 1 ) {
+ if( a >= v.size() ) break;
+ unsigned b = a + m - 1; // v[a..b]
+ if( b >= v.size() ) b = v.size() - 1;
+ tp->schedule(pretouchN, v, a, b);
+ DEV cout << "pretouch task: " << a << ".." << b << endl;
+ a += m;
+ }
+ // we do one too...
+ pretouchOperation(op);
+ tp->join();
+ countdown = v.size();
+ }
+ }
+ else {
+ pretouchOperation(op);
+ }
+ }
+ scoped_ptr<writelock> lk( alreadyLocked ? 0 : new writelock() );
+ if ( replAllDead ) {
+ // hmmm why is this check here and not at top of this function? does it get set between top and here?
+ log() << "replAllDead, throwing SyncException: " << replAllDead << endl;
+ throw SyncException();
+ }
+ if ( !handleDuplicateDbName( op, ns, clientName ) ) {
+ return;
+ }
+ Client::Context ctx( ns );
+ ctx.getClient()->curop()->reset();
+ bool empty = ctx.db()->isEmpty();
+ bool incompleteClone = incompleteCloneDbs.count( clientName ) != 0;
+ if( logLevel >= 6 )
+ log(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty << ", incompleteClone: " << incompleteClone << endl;
+ // always apply admin command command
+ // this is a bit hacky -- the semantics of replication/commands aren't well specified
+ if ( strcmp( clientName, "admin" ) == 0 && *op.getStringField( "op" ) == 'c' ) {
+ applyOperation( op );
+ return;
+ }
+ if ( ctx.justCreated() || empty || incompleteClone ) {
+ // we must add to incomplete list now that setClient has been called
+ incompleteCloneDbs.insert( clientName );
+ if ( nClonedThisPass ) {
+ /* we only clone one database per pass, even if a lot need done. This helps us
+ avoid overflowing the master's transaction log by doing too much work before going
+ back to read more transactions. (Imagine a scenario of slave startup where we try to
+ clone 100 databases in one pass.)
+ */
+ addDbNextPass.insert( clientName );
+ }
+ else {
+ if ( incompleteClone ) {
+ log() << "An earlier initial clone of '" << clientName << "' did not complete, now resyncing." << endl;
+ }
+ save();
+ Client::Context ctx(ns);
+ nClonedThisPass++;
+ resync(ctx.db()->name);
+ addDbNextPass.erase(clientName);
+ incompleteCloneDbs.erase( clientName );
+ }
+ save();
+ }
+ else {
+ applyOperation( op );
+ addDbNextPass.erase( clientName );
+ }
+ }
+ void ReplSource::syncToTailOfRemoteLog() {
+ string _ns = ns();
+ BSONObjBuilder b;
+ if ( !only.empty() ) {
+ b.appendRegex("ns", string("^") + only);
+ }
+ BSONObj last = oplogReader.findOne( _ns.c_str(), Query( b.done() ).sort( BSON( "$natural" << -1 ) ) );
+ if ( !last.isEmpty() ) {
+ BSONElement ts = last.getField( "ts" );
+ massert( 10386 , "non Date ts found: " + last.toString(), ts.type() == Date || ts.type() == Timestamp );
+ syncedTo = OpTime( );
+ }
+ }
+ extern unsigned replApplyBatchSize;
+ /* slave: pull some data from the master's oplog
+ note: not yet in db mutex at this point.
+ @return -1 error
+ 0 ok, don't sleep
+ 1 ok, sleep
+ */
+ int ReplSource::sync_pullOpLog(int& nApplied) {
+ int okResultCode = 1;
+ string ns = string("local.oplog.$") + sourceName();
+ log(2) << "repl: sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n';
+ bool tailing = true;
+ oplogReader.tailCheck();
+ bool initial = syncedTo.isNull();
+ if ( !oplogReader.haveCursor() || initial ) {
+ if ( initial ) {
+ // Important to grab last oplog timestamp before listing databases.
+ syncToTailOfRemoteLog();
+ BSONObj info;
+ bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info );
+ massert( 10389 , "Unable to get database list", ok );
+ BSONObjIterator i( info.getField( "databases" ).embeddedObject() );
+ while( i.moreWithEOO() ) {
+ BSONElement e =;
+ if ( e.eoo() )
+ break;
+ string name = e.embeddedObject().getField( "name" ).valuestr();
+ if ( !e.embeddedObject().getBoolField( "empty" ) ) {
+ if ( name != "local" ) {
+ if ( only.empty() || only == name ) {
+ log( 2 ) << "adding to 'addDbNextPass': " << name << endl;
+ addDbNextPass.insert( name );
+ }
+ }
+ }
+ }
+ dblock lk;
+ save();
+ }
+ BSONObjBuilder q;
+ q.appendDate("$gte", syncedTo.asDate());
+ BSONObjBuilder query;
+ query.append("ts", q.done());
+ if ( !only.empty() ) {
+ // note we may here skip a LOT of data table scanning, a lot of work for the master.
+ query.appendRegex("ns", string("^") + only); // maybe append "\\." here?
+ }
+ BSONObj queryObj = query.done();
+ // e.g. queryObj = { ts: { $gte: syncedTo } }
+ oplogReader.tailingQuery(ns.c_str(), queryObj);
+ tailing = false;
+ }
+ else {
+ log(2) << "repl: tailing=true\n";
+ }
+ if( !oplogReader.haveCursor() ) {
+ problem() << "repl: dbclient::query returns null (conn closed?)" << endl;
+ oplogReader.resetConnection();
+ return -1;
+ }
+ // show any deferred database creates from a previous pass
+ {
+ set<string>::iterator i = addDbNextPass.begin();
+ if ( i != addDbNextPass.end() ) {
+ BSONObjBuilder b;
+ b.append("ns", *i + '.');
+ b.append("op", "db");
+ BSONObj op = b.done();
+ sync_pullOpLog_applyOperation(op, false);
+ }
+ }
+ if ( !oplogReader.more() ) {
+ if ( tailing ) {
+ log(2) << "repl: tailing & no new activity\n";
+ if( oplogReader.awaitCapable() )
+ okResultCode = 0; // don't sleep
+ }
+ else {
+ log() << "repl: " << ns << " oplog is empty\n";
+ }
+ {
+ dblock lk;
+ save();
+ }
+ return okResultCode;
+ }
+ OpTime nextOpTime;
+ {
+ BSONObj op =;
+ BSONElement ts = op.getField("ts");
+ if ( ts.type() != Date && ts.type() != Timestamp ) {
+ string err = op.getStringField("$err");
+ if ( !err.empty() ) {
+ // 13051 is "tailable cursor requested on non capped collection"
+ if (op.getIntField("code") == 13051) {
+ problem() << "trying to slave off of a non-master" << '\n';
+ massert( 13344 , "trying to slave off of a non-master", false );
+ }
+ else {
+ problem() << "repl: $err reading remote oplog: " + err << '\n';
+ massert( 10390 , "got $err reading remote oplog", false );
+ }
+ }
+ else {
+ problem() << "repl: bad object read from remote oplog: " << op.toString() << '\n';
+ massert( 10391 , "repl: bad object read from remote oplog", false);
+ }
+ }
+ nextOpTime = OpTime( );
+ log(2) << "repl: first op time received: " << nextOpTime.toString() << '\n';
+ if ( initial ) {
+ log(1) << "repl: initial run\n";
+ }
+ if( tailing ) {
+ if( !( syncedTo < nextOpTime ) ) {
+ log() << "repl ASSERTION failed : syncedTo < nextOpTime" << endl;
+ log() << "repl syncTo: " << syncedTo.toStringLong() << endl;
+ log() << "repl nextOpTime: " << nextOpTime.toStringLong() << endl;
+ assert(false);
+ }
+ oplogReader.putBack( op ); // op will be processed in the loop below
+ nextOpTime = OpTime(); // will reread the op below
+ }
+ else if ( nextOpTime != syncedTo ) { // didn't get what we queried for - error
+ Nullstream& l = log();
+ l << "repl: nextOpTime " << nextOpTime.toStringLong() << ' ';
+ if ( nextOpTime < syncedTo )
+ l << "<??";
+ else
+ l << ">";
+ l << " syncedTo " << syncedTo.toStringLong() << '\n';
+ log() << "repl: time diff: " << (nextOpTime.getSecs() - syncedTo.getSecs()) << "sec\n";
+ log() << "repl: tailing: " << tailing << '\n';
+ log() << "repl: data too stale, halting replication" << endl;
+ replInfo = replAllDead = "data too stale halted replication";
+ assert( syncedTo < nextOpTime );
+ throw SyncException();
+ }
+ else {
+ /* t == syncedTo, so the first op was applied previously or it is the first op of initial query and need not be applied. */
+ }
+ }
+ // apply operations
+ {
+ int n = 0;
+ time_t saveLast = time(0);
+ while ( 1 ) {
+ bool moreInitialSyncsPending = !addDbNextPass.empty() && n; // we need "&& n" to assure we actually process at least one op to get a sync point recorded in the first place.
+ if ( moreInitialSyncsPending || !oplogReader.more() ) {
+ dblock lk;
+ // NOTE aaron 2011-03-29 This block may be unnecessary, but I'm leaving it in place to avoid changing timing behavior.
+ {
+ dbtemprelease t;
+ if ( !moreInitialSyncsPending && oplogReader.more() ) {
+ continue;
+ }
+ // otherwise, break out of loop so we can set to completed or clone more dbs
+ }
+ if( oplogReader.awaitCapable() && tailing )
+ okResultCode = 0; // don't sleep
+ syncedTo = nextOpTime;
+ save(); // note how far we are synced up to now
+ log() << "repl: applied " << n << " operations" << endl;
+ nApplied = n;
+ log() << "repl: end sync_pullOpLog syncedTo: " << syncedTo.toStringLong() << endl;
+ break;
+ }
+ else {
+ }
+ OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) {
+ // periodically note our progress, in case we are doing a lot of work and crash
+ dblock lk;
+ syncedTo = nextOpTime;
+ // can't update local log ts since there are pending operations from our peer
+ save();
+ log() << "repl: checkpoint applied " << n << " operations" << endl;
+ log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl;
+ saveLast = time(0);
+ n = 0;
+ }
+ BSONObj op =;
+ unsigned b = replApplyBatchSize;
+ bool justOne = b == 1;
+ scoped_ptr<writelock> lk( justOne ? 0 : new writelock() );
+ while( 1 ) {
+ BSONElement ts = op.getField("ts");
+ if( !( ts.type() == Date || ts.type() == Timestamp ) ) {
+ log() << "sync error: problem querying remote oplog record" << endl;
+ log() << "op: " << op.toString() << endl;
+ log() << "halting replication" << endl;
+ replInfo = replAllDead = "sync error: no ts found querying remote oplog record";
+ throw SyncException();
+ }
+ OpTime last = nextOpTime;
+ nextOpTime = OpTime( );
+ if ( !( last < nextOpTime ) ) {
+ log() << "sync error: last applied optime at slave >= nextOpTime from master" << endl;
+ log() << " last: " << last.toStringLong() << endl;
+ log() << " nextOpTime: " << nextOpTime.toStringLong() << endl;
+ log() << " halting replication" << endl;
+ replInfo = replAllDead = "sync error last >= nextOpTime";
+ uassert( 10123 , "replication error last applied optime at slave >= nextOpTime from master", false);
+ }
+ if ( replSettings.slavedelay && ( unsigned( time( 0 ) ) < nextOpTime.getSecs() + replSettings.slavedelay ) ) {
+ assert( justOne );
+ oplogReader.putBack( op );
+ _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1;
+ dblock lk;
+ if ( n > 0 ) {
+ syncedTo = last;
+ save();
+ }
+ log() << "repl: applied " << n << " operations" << endl;
+ log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl;
+ log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl;
+ return okResultCode;
+ }
+ sync_pullOpLog_applyOperation(op, !justOne);
+ n++;
+ if( --b == 0 )
+ break;
+ // if to here, we are doing mulpile applications in a singel write lock acquisition
+ if( !oplogReader.moreInCurrentBatch() ) {
+ // break if no more in batch so we release lock while reading from the master
+ break;
+ }
+ op =;
+ getDur().commitIfNeeded();
+ }
+ }
+ }
+ return okResultCode;
+ }
+ BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}");
+ bool replAuthenticate(DBClientBase *conn) {
+ if( noauth ) {
+ return true;
+ }
+ if( ! cc().isAdmin() ) {
+ log() << "replauthenticate: requires admin permissions, failing\n";
+ return false;
+ }
+ string u;
+ string p;
+ if (internalSecurity.pwd.length() > 0) {
+ u = internalSecurity.user;
+ p = internalSecurity.pwd;
+ }
+ else {
+ BSONObj user;
+ {
+ dblock lk;
+ Client::Context ctxt("local.");
+ if( !Helpers::findOne("local.system.users", userReplQuery, user) ||
+ // try the first user in local
+ !Helpers::getSingleton("local.system.users", user) ) {
+ log() << "replauthenticate: no user in local.system.users to use for authentication\n";
+ return false;
+ }
+ }
+ u = user.getStringField("user");
+ p = user.getStringField("pwd");
+ massert( 10392 , "bad user object? [1]", !u.empty());
+ massert( 10393 , "bad user object? [2]", !p.empty());
+ }
+ string err;
+ if( !conn->auth("local", u.c_str(), p.c_str(), err, false) ) {
+ log() << "replauthenticate: can't authenticate to master server, user:" << u << endl;
+ return false;
+ }
+ return true;
+ }
+ bool replHandshake(DBClientConnection *conn) {
+ string myname = getHostName();
+ BSONObj me;
+ {
+ dblock l;
+ // is an identifier for a server for getLastError w:2+
+ if ( ! Helpers::getSingleton( "" , me ) ||
+ ! me.hasField("host") ||
+ me["host"].String() != myname ) {
+ // clean out
+ Helpers::emptyCollection("");
+ // repopulate
+ BSONObjBuilder b;
+ b.appendOID( "_id" , 0 , true );
+ b.append( "host", myname );
+ me = b.obj();
+ Helpers::putSingleton( "" , me );
+ }
+ }
+ BSONObjBuilder cmd;
+ cmd.appendAs( me["_id"] , "handshake" );
+ if (theReplSet) {
+ cmd.append("member", theReplSet->selfId());
+ }
+ BSONObj res;
+ bool ok = conn->runCommand( "admin" , cmd.obj() , res );
+ // ignoring for now on purpose for older versions
+ log(ok) << "replHandshake res not: " << ok << " res: " << res << endl;
+ return true;
+ }
+ bool OplogReader::commonConnect(const string& hostName) {
+ if( conn() == 0 ) {
+ _conn = shared_ptr<DBClientConnection>(new DBClientConnection( false, 0, 0 /* tcp timeout */));
+ string errmsg;
+ ReplInfo r("trying to connect to sync source");
+ if ( !_conn->connect(hostName.c_str(), errmsg) ||
+ (!noauth && !replAuthenticate(_conn.get())) ) {
+ resetConnection();
+ log() << "repl: " << errmsg << endl;
+ return false;
+ }
+ }
+ return true;
+ }
+ bool OplogReader::connect(string hostName) {
+ if (conn() != 0) {
+ return true;
+ }
+ if (commonConnect(hostName)) {
+ return replHandshake(_conn.get());
+ }
+ return false;
+ }
+ bool OplogReader::connect(const BSONObj& rid, const int from, const string& to) {
+ if (conn() != 0) {
+ return true;
+ }
+ if (commonConnect(to)) {
+ log() << "handshake between " << from << " and " << to << endl;
+ return passthroughHandshake(rid, from);
+ }
+ return false;
+ }
+ bool OplogReader::passthroughHandshake(const BSONObj& rid, const int f) {
+ BSONObjBuilder cmd;
+ cmd.appendAs( rid["_id"], "handshake" );
+ cmd.append( "member" , f );
+ BSONObj res;
+ return conn()->runCommand( "admin" , cmd.obj() , res );
+ }
+ /* note: not yet in mutex at this point.
+ returns >= 0 if ok. return -1 if you want to reconnect.
+ return value of zero indicates no sleep necessary before next call
+ */
+ int ReplSource::sync(int& nApplied) {
+ _sleepAdviceTime = 0;
+ ReplInfo r("sync");
+ if ( !cmdLine.quiet ) {
+ Nullstream& l = log();
+ l << "repl: syncing from ";
+ if( sourceName() != "main" ) {
+ l << "source:" << sourceName() << ' ';
+ }
+ l << "host:" << hostName << endl;
+ }
+ nClonedThisPass = 0;
+ // FIXME Handle cases where this db isn't on default port, or default port is spec'd in hostName.
+ if ( (string("localhost") == hostName || string("") == hostName) && cmdLine.port == CmdLine::DefaultDBPort ) {
+ log() << "repl: can't sync from self (localhost). sources configuration may be wrong." << endl;
+ sleepsecs(5);
+ return -1;
+ }
+ if ( !oplogReader.connect(hostName) ) {
+ log(4) << "repl: can't connect to sync source" << endl;
+ return -1;
+ }
+ /*
+ // get current mtime at the server.
+ BSONObj o = conn->findOne("admin.$cmd", opTimeQuery);
+ BSONElement e = o.getField("optime");
+ if( e.eoo() ) {
+ log() << "repl: failed to get cur optime from master" << endl;
+ log() << " " << o.toString() << endl;
+ return false;
+ }
+ uassert( 10124 , e.type() == Date );
+ OpTime serverCurTime;
+ serverCurTime.asDate() =;
+ */
+ return sync_pullOpLog(nApplied);
+ }
+ /* --------------------------------------------------------------*/
+ /*
+ _ source has autoptr to the cursor
+ _ reuse that cursor when we can
+ */
+ /* returns: # of seconds to sleep before next pass
+ 0 = no sleep recommended
+ 1 = special sentinel indicating adaptive sleep recommended
+ */
+ int _replMain(ReplSource::SourceVector& sources, int& nApplied) {
+ {
+ ReplInfo r("replMain load sources");
+ dblock lk;
+ ReplSource::loadAll(sources);
+ replSettings.fastsync = false; // only need this param for initial reset
+ }
+ if ( sources.empty() ) {
+ /* replication is not configured yet (for --slave) in local.sources. Poll for config it
+ every 20 seconds.
+ */
+ log() << "no source given, add a master to local.sources to start replication" << endl;
+ return 20;
+ }
+ int sleepAdvice = 1;
+ for ( ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++ ) {
+ ReplSource *s = i->get();
+ int res = -1;
+ try {
+ res = s->sync(nApplied);
+ bool moreToSync = s->haveMoreDbsToSync();
+ if( res < 0 ) {
+ sleepAdvice = 3;
+ }
+ else if( moreToSync ) {
+ sleepAdvice = 0;
+ }
+ else if ( s->sleepAdvice() ) {
+ sleepAdvice = s->sleepAdvice();
+ }
+ else
+ sleepAdvice = res;
+ }
+ catch ( const SyncException& ) {
+ log() << "caught SyncException" << endl;
+ return 10;
+ }
+ catch ( AssertionException& e ) {
+ if ( e.severe() ) {
+ log() << "replMain AssertionException " << e.what() << endl;
+ return 60;
+ }
+ else {
+ log() << "repl: AssertionException " << e.what() << '\n';
+ }
+ replInfo = "replMain caught AssertionException";
+ }
+ catch ( const DBException& e ) {
+ log() << "repl: DBException " << e.what() << endl;
+ replInfo = "replMain caught DBException";
+ }
+ catch ( const std::exception &e ) {
+ log() << "repl: std::exception " << e.what() << endl;
+ replInfo = "replMain caught std::exception";
+ }
+ catch ( ... ) {
+ log() << "unexpected exception during replication. replication will halt" << endl;
+ replAllDead = "caught unexpected exception during replication";
+ }
+ if ( res < 0 )
+ s->oplogReader.resetConnection();
+ }
+ return sleepAdvice;
+ }
+ void replMain() {
+ ReplSource::SourceVector sources;
+ while ( 1 ) {
+ int s = 0;
+ {
+ dblock lk;
+ if ( replAllDead ) {
+ // throttledForceResyncDead can throw
+ if ( !replSettings.autoresync || !ReplSource::throttledForceResyncDead( "auto" ) ) {
+ log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds" << endl;
+ break;
+ }
+ }
+ assert( syncing == 0 ); // i.e., there is only one sync thread running. we will want to change/fix this.
+ syncing++;
+ }
+ try {
+ int nApplied = 0;
+ s = _replMain(sources, nApplied);
+ if( s == 1 ) {
+ if( nApplied == 0 ) s = 2;
+ else if( nApplied > 100 ) {
+ // sleep very little - just enought that we aren't truly hammering master
+ sleepmillis(75);
+ s = 0;
+ }
+ }
+ }
+ catch (...) {
+ out() << "caught exception in _replMain" << endl;
+ s = 4;
+ }
+ {
+ dblock lk;
+ assert( syncing == 1 );
+ syncing--;
+ }
+ if( relinquishSyncingSome ) {
+ relinquishSyncingSome = 0;
+ s = 1; // sleep before going back in to syncing=1
+ }
+ if ( s ) {
+ stringstream ss;
+ ss << "repl: sleep " << s << " sec before next pass";
+ string msg = ss.str();
+ if ( ! cmdLine.quiet )
+ log() << msg << endl;
+ ReplInfo r(msg.c_str());
+ sleepsecs(s);
+ }
+ }
+ }
+ static void replMasterThread() {
+ sleepsecs(4);
+ Client::initThread("replmaster");
+ int toSleep = 10;
+ while( 1 ) {
+ sleepsecs( toSleep );
+ /* write a keep-alive like entry to the log. this will make things like
+ printReplicationStatus() and printSlaveReplicationStatus() stay up-to-date
+ even when things are idle.
+ */
+ {
+ writelocktry lk("",1);
+ if ( ) {
+ toSleep = 10;
+ replLocalAuth();
+ try {
+ logKeepalive();
+ }
+ catch(...) {
+ log() << "caught exception in replMasterThread()" << endl;
+ }
+ }
+ else {
+ log(5) << "couldn't logKeepalive" << endl;
+ toSleep = 1;
+ }
+ }
+ }
+ }
+ void replSlaveThread() {
+ sleepsecs(1);
+ Client::initThread("replslave");
+ cc().iAmSyncThread();
+ {
+ dblock lk;
+ replLocalAuth();
+ }
+ while ( 1 ) {
+ try {
+ replMain();
+ sleepsecs(5);
+ }
+ catch ( AssertionException& ) {
+ ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry");
+ problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ }
+ catch ( DBException& e ) {
+ problem() << "exception in replSlaveThread(): " << e.what()
+ << ", sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ }
+ catch ( ... ) {
+ problem() << "error in replSlaveThread(): sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ }
+ }
+ }
+ void tempThread() {
+ while ( 1 ) {
+ out() << << endl;
+ sleepmillis(100);
+ }
+ }
+ void newRepl();
+ void oldRepl();
+ void startReplSets(ReplSetCmdline*);
+ void startReplication() {
+ /* if we are going to be a replica set, we aren't doing other forms of replication. */
+ if( !cmdLine._replSet.empty() ) {
+ if( replSettings.slave || replSettings.master ) {
+ log() << "***" << endl;
+ log() << "ERROR: can't use --slave or --master replication options with --replSet" << endl;
+ log() << "***" << endl;
+ }
+ newRepl();
+ replSet = true;
+ ReplSetCmdline *replSetCmdline = new ReplSetCmdline(cmdLine._replSet);
+ boost::thread t( boost::bind( &startReplSets, replSetCmdline) );
+ return;
+ }
+ oldRepl();
+ /* this was just to see if anything locks for longer than it should -- we need to be careful
+ not to be locked when trying to connect() or query() the other side.
+ */
+ //boost::thread tempt(tempThread);
+ if( !replSettings.slave && !replSettings.master )
+ return;
+ {
+ dblock lk;
+ replLocalAuth();
+ }
+ if ( replSettings.slave ) {
+ assert( replSettings.slave == SimpleSlave );
+ log(1) << "slave=true" << endl;
+ boost::thread repl_thread(replSlaveThread);
+ }
+ if ( replSettings.master ) {
+ log(1) << "master=true" << endl;
+ replSettings.master = true;
+ createOplog();
+ boost::thread t(replMasterThread);
+ }
+ while( replSettings.fastsync ) // don't allow writes until we've set up from log
+ sleepmillis( 50 );
+ }
+ void testPretouch() {
+ int nthr = min(8, 8);
+ nthr = max(nthr, 1);
+ int m = 8 / nthr;
+ ThreadPool tp(nthr);
+ vector<BSONObj> v;
+ BSONObj x = BSON( "ns" << "" << "o" << BSON( "_id" << 1 ) << "op" << "i" );
+ v.push_back(x);
+ v.push_back(x);
+ v.push_back(x);
+ unsigned a = 0;
+ while( 1 ) {
+ if( a >= v.size() ) break;
+ unsigned b = a + m - 1; // v[a..b]
+ if( b >= v.size() ) b = v.size() - 1;
+ tp.schedule(pretouchN, v, a, b);
+ DEV cout << "pretouch task: " << a << ".." << b << endl;
+ a += m;
+ }
+ tp.join();
+ }
+ class ReplApplyBatchSizeValidator : public ParameterValidator {
+ public:
+ ReplApplyBatchSizeValidator() : ParameterValidator( "replApplyBatchSize" ) {}
+ virtual bool isValid( BSONElement e , string& errmsg ) const {
+ int b = e.numberInt();
+ if( b < 1 || b > 1024 ) {
+ errmsg = "replApplyBatchSize has to be >= 1 and < 1024";
+ return false;
+ }
+ if ( replSettings.slavedelay != 0 && b > 1 ) {
+ errmsg = "can't use a batch size > 1 with slavedelay";
+ return false;
+ }
+ if ( ! replSettings.slave ) {
+ errmsg = "can't set replApplyBatchSize on a non-slave machine";
+ return false;
+ }
+ return true;
+ }
+ } replApplyBatchSizeValidator;
+} // namespace mongo