diff options
Diffstat (limited to 'src/mongo/db/repl/master_slave.cpp')
-rw-r--r-- | src/mongo/db/repl/master_slave.cpp | 2280 |
1 files changed, 1129 insertions, 1151 deletions
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index a1a58527b62..1e1bd428d39 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -82,1374 +82,1352 @@ using std::vector; namespace mongo { namespace repl { - void pretouchOperation(OperationContext* txn, const BSONObj& op); - void pretouchN(vector<BSONObj>&, unsigned a, unsigned b); +void pretouchOperation(OperationContext* txn, const BSONObj& op); +void pretouchN(vector<BSONObj>&, unsigned a, unsigned b); - /* if 1 sync() is running */ - volatile int syncing = 0; - volatile int relinquishSyncingSome = 0; +/* if 1 sync() is running */ +volatile int syncing = 0; +volatile int relinquishSyncingSome = 0; - static time_t lastForcedResync = 0; +static time_t lastForcedResync = 0; - /* output by the web console */ - const char *replInfo = ""; - struct ReplInfo { - ReplInfo(const char *msg) { - replInfo = msg; - } - ~ReplInfo() { - replInfo = "?"; - } - }; - - - ReplSource::ReplSource(OperationContext* txn) { - nClonedThisPass = 0; - ensureMe(txn); - } - - ReplSource::ReplSource(OperationContext* txn, BSONObj o) : nClonedThisPass(0) { - only = o.getStringField("only"); - hostName = o.getStringField("host"); - _sourceName = o.getStringField("source"); - uassert( 10118 , "'host' field not set in sources collection object", !hostName.empty() ); - uassert( 10119 , "only source='main' allowed for now with replication", sourceName() == "main" ); - BSONElement e = o.getField("syncedTo"); - if ( !e.eoo() ) { - uassert(10120, "bad sources 'syncedTo' field value", - e.type() == Date || e.type() == bsonTimestamp); - Timestamp tmp( e.date() ); - syncedTo = tmp; - } - - BSONObj dbsObj = o.getObjectField("dbsNextPass"); - if ( !dbsObj.isEmpty() ) { - BSONObjIterator i(dbsObj); - while ( 1 ) { - BSONElement e = i.next(); - if ( e.eoo() ) - break; - addDbNextPass.insert( e.fieldName() ); - } - } - - dbsObj = o.getObjectField("incompleteCloneDbs"); - if ( !dbsObj.isEmpty() ) { - BSONObjIterator i(dbsObj); - while ( 1 ) { - BSONElement e = i.next(); - if ( e.eoo() ) - break; - incompleteCloneDbs.insert( e.fieldName() ); - } - } - ensureMe(txn); +/* output by the web console */ +const char* replInfo = ""; +struct ReplInfo { + ReplInfo(const char* msg) { + replInfo = msg; + } + ~ReplInfo() { + replInfo = "?"; + } +}; + + +ReplSource::ReplSource(OperationContext* txn) { + nClonedThisPass = 0; + ensureMe(txn); +} + +ReplSource::ReplSource(OperationContext* txn, BSONObj o) : nClonedThisPass(0) { + only = o.getStringField("only"); + hostName = o.getStringField("host"); + _sourceName = o.getStringField("source"); + uassert(10118, "'host' field not set in sources collection object", !hostName.empty()); + uassert(10119, "only source='main' allowed for now with replication", sourceName() == "main"); + BSONElement e = o.getField("syncedTo"); + if (!e.eoo()) { + uassert(10120, + "bad sources 'syncedTo' field value", + e.type() == Date || e.type() == bsonTimestamp); + Timestamp tmp(e.date()); + syncedTo = tmp; } - /* Turn our C++ Source object into a BSONObj */ - BSONObj ReplSource::jsobj() { - BSONObjBuilder b; - b.append("host", hostName); - b.append("source", sourceName()); - if ( !only.empty() ) - b.append("only", only); - if ( !syncedTo.isNull() ) - b.append("syncedTo", syncedTo); - - BSONObjBuilder dbsNextPassBuilder; - int n = 0; - for ( set<string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++ ) { - n++; - dbsNextPassBuilder.appendBool(*i, 1); + BSONObj dbsObj = o.getObjectField("dbsNextPass"); + if (!dbsObj.isEmpty()) { + BSONObjIterator i(dbsObj); + while (1) { + BSONElement e = i.next(); + if (e.eoo()) + break; + addDbNextPass.insert(e.fieldName()); } - if ( n ) - b.append("dbsNextPass", dbsNextPassBuilder.done()); + } - BSONObjBuilder incompleteCloneDbsBuilder; - n = 0; - for ( set<string>::iterator i = incompleteCloneDbs.begin(); i != incompleteCloneDbs.end(); i++ ) { - n++; - incompleteCloneDbsBuilder.appendBool(*i, 1); + dbsObj = o.getObjectField("incompleteCloneDbs"); + if (!dbsObj.isEmpty()) { + BSONObjIterator i(dbsObj); + while (1) { + BSONElement e = i.next(); + if (e.eoo()) + break; + incompleteCloneDbs.insert(e.fieldName()); } - if ( n ) - b.append("incompleteCloneDbs", incompleteCloneDbsBuilder.done()); - - return b.obj(); } + ensureMe(txn); +} + +/* Turn our C++ Source object into a BSONObj */ +BSONObj ReplSource::jsobj() { + BSONObjBuilder b; + b.append("host", hostName); + b.append("source", sourceName()); + if (!only.empty()) + b.append("only", only); + if (!syncedTo.isNull()) + b.append("syncedTo", syncedTo); + + BSONObjBuilder dbsNextPassBuilder; + int n = 0; + for (set<string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++) { + n++; + dbsNextPassBuilder.appendBool(*i, 1); + } + if (n) + b.append("dbsNextPass", dbsNextPassBuilder.done()); + + BSONObjBuilder incompleteCloneDbsBuilder; + n = 0; + for (set<string>::iterator i = incompleteCloneDbs.begin(); i != incompleteCloneDbs.end(); i++) { + n++; + incompleteCloneDbsBuilder.appendBool(*i, 1); + } + if (n) + b.append("incompleteCloneDbs", incompleteCloneDbsBuilder.done()); - void ReplSource::ensureMe(OperationContext* txn) { - string myname = getHostName(); + return b.obj(); +} - // local.me is an identifier for a server for getLastError w:2+ - bool exists = Helpers::getSingleton(txn, "local.me", _me); +void ReplSource::ensureMe(OperationContext* txn) { + string myname = getHostName(); - if (!exists || !_me.hasField("host") || _me["host"].String() != myname) { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dblk(txn->lockState(), "local", MODE_X); - WriteUnitOfWork wunit(txn); - // clean out local.me - Helpers::emptyCollection(txn, "local.me"); + // local.me is an identifier for a server for getLastError w:2+ + bool exists = Helpers::getSingleton(txn, "local.me", _me); - // repopulate - BSONObjBuilder b; - b.appendOID("_id", 0, true); - b.append("host", myname); - _me = b.obj(); - Helpers::putSingleton(txn, "local.me", _me); - wunit.commit(); - } - _me = _me.getOwned(); - } + if (!exists || !_me.hasField("host") || _me["host"].String() != myname) { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dblk(txn->lockState(), "local", MODE_X); + WriteUnitOfWork wunit(txn); + // clean out local.me + Helpers::emptyCollection(txn, "local.me"); - void ReplSource::save(OperationContext* txn) { + // repopulate BSONObjBuilder b; - verify( !hostName.empty() ); - b.append("host", hostName); - // todo: finish allowing multiple source configs. - // this line doesn't work right when source is null, if that is allowed as it is now: - //b.append("source", _sourceName); - BSONObj pattern = b.done(); + b.appendOID("_id", 0, true); + b.append("host", myname); + _me = b.obj(); + Helpers::putSingleton(txn, "local.me", _me); + wunit.commit(); + } + _me = _me.getOwned(); +} - BSONObj o = jsobj(); - LOG( 1 ) << "Saving repl source: " << o << endl; +void ReplSource::save(OperationContext* txn) { + BSONObjBuilder b; + verify(!hostName.empty()); + b.append("host", hostName); + // todo: finish allowing multiple source configs. + // this line doesn't work right when source is null, if that is allowed as it is now: + // b.append("source", _sourceName); + BSONObj pattern = b.done(); - { - OpDebug debug; + BSONObj o = jsobj(); + LOG(1) << "Saving repl source: " << o << endl; - OldClientContext ctx(txn, "local.sources"); + { + OpDebug debug; - const NamespaceString requestNs("local.sources"); - UpdateRequest request(requestNs); + OldClientContext ctx(txn, "local.sources"); - request.setQuery(pattern); - request.setUpdates(o); - request.setUpsert(); + const NamespaceString requestNs("local.sources"); + UpdateRequest request(requestNs); - UpdateResult res = update(txn, ctx.db(), request, &debug); + request.setQuery(pattern); + request.setUpdates(o); + request.setUpsert(); - verify( ! res.modifiers ); - verify( res.numMatched == 1 ); - } - } + UpdateResult res = update(txn, ctx.db(), request, &debug); - static void addSourceToList(OperationContext* txn, - ReplSource::SourceVector &v, - ReplSource& s, - ReplSource::SourceVector &old) { - if ( !s.syncedTo.isNull() ) { // Don't reuse old ReplSource if there was a forced resync. - for ( ReplSource::SourceVector::iterator i = old.begin(); i != old.end(); ) { - if ( s == **i ) { - v.push_back(*i); - old.erase(i); - return; - } - i++; + verify(!res.modifiers); + verify(res.numMatched == 1); + } +} + +static void addSourceToList(OperationContext* txn, + ReplSource::SourceVector& v, + ReplSource& s, + ReplSource::SourceVector& old) { + if (!s.syncedTo.isNull()) { // Don't reuse old ReplSource if there was a forced resync. + for (ReplSource::SourceVector::iterator i = old.begin(); i != old.end();) { + if (s == **i) { + v.push_back(*i); + old.erase(i); + return; } + i++; } - - v.push_back( std::shared_ptr< ReplSource >( new ReplSource( s ) ) ); } - /* we reuse our existing objects so that we can keep our existing connection - and cursor in effect. - */ - void ReplSource::loadAll(OperationContext* txn, SourceVector &v) { - const char* localSources = "local.sources"; - OldClientContext ctx(txn, localSources); - SourceVector old = v; - v.clear(); - - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); - if (!replSettings.source.empty()) { - // --source <host> specified. - // check that no items are in sources other than that - // add if missing - int n = 0; - unique_ptr<PlanExecutor> exec( - InternalPlanner::collectionScan(txn, - localSources, - ctx.db()->getCollection(localSources))); - BSONObj obj; - PlanExecutor::ExecState state; - while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { - n++; - ReplSource tmp(txn, obj); - if (tmp.hostName != replSettings.source) { - log() << "--source " << replSettings.source << " != " << tmp.hostName - << " from local.sources collection" << endl; - log() << "for instructions on changing this slave's source, see:" << endl; - log() << "http://dochub.mongodb.org/core/masterslave" << endl; - log() << "terminating mongod after 30 seconds" << endl; - sleepsecs(30); - dbexit( EXIT_REPLICATION_ERROR ); - } - if (tmp.only != replSettings.only) { - log() << "--only " << replSettings.only << " != " << tmp.only - << " from local.sources collection" << endl; - log() << "terminating after 30 seconds" << endl; - sleepsecs(30); - dbexit( EXIT_REPLICATION_ERROR ); - } - } - uassert(17065, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state); - uassert( 10002 , "local.sources collection corrupt?", n<2 ); - if ( n == 0 ) { - // source missing. add. - ReplSource s(txn); - s.hostName = replSettings.source; - s.only = replSettings.only; - s.save(txn); - } - } - else { - try { - massert(10384 , "--only requires use of --source", replSettings.only.empty()); - } - catch ( ... ) { - dbexit( EXIT_BADOPTIONS ); - } - } + v.push_back(std::shared_ptr<ReplSource>(new ReplSource(s))); +} - unique_ptr<PlanExecutor> exec( - InternalPlanner::collectionScan(txn, - localSources, - ctx.db()->getCollection(localSources))); +/* we reuse our existing objects so that we can keep our existing connection + and cursor in effect. +*/ +void ReplSource::loadAll(OperationContext* txn, SourceVector& v) { + const char* localSources = "local.sources"; + OldClientContext ctx(txn, localSources); + SourceVector old = v; + v.clear(); + + const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + if (!replSettings.source.empty()) { + // --source <host> specified. + // check that no items are in sources other than that + // add if missing + int n = 0; + unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan( + txn, localSources, ctx.db()->getCollection(localSources))); BSONObj obj; PlanExecutor::ExecState state; while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { + n++; ReplSource tmp(txn, obj); - if ( tmp.syncedTo.isNull() ) { - DBDirectClient c(txn); - BSONObj op = c.findOne( "local.oplog.$main", QUERY( "op" << NE << "n" ).sort( BSON( "$natural" << -1 ) ) ); - if ( !op.isEmpty() ) { - tmp.syncedTo = op[ "ts" ].timestamp(); - } + if (tmp.hostName != replSettings.source) { + log() << "--source " << replSettings.source << " != " << tmp.hostName + << " from local.sources collection" << endl; + log() << "for instructions on changing this slave's source, see:" << endl; + log() << "http://dochub.mongodb.org/core/masterslave" << endl; + log() << "terminating mongod after 30 seconds" << endl; + sleepsecs(30); + dbexit(EXIT_REPLICATION_ERROR); } - addSourceToList(txn, v, tmp, old); + if (tmp.only != replSettings.only) { + log() << "--only " << replSettings.only << " != " << tmp.only + << " from local.sources collection" << endl; + log() << "terminating after 30 seconds" << endl; + sleepsecs(30); + dbexit(EXIT_REPLICATION_ERROR); + } + } + uassert(17065, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state); + uassert(10002, "local.sources collection corrupt?", n < 2); + if (n == 0) { + // source missing. add. + ReplSource s(txn); + s.hostName = replSettings.source; + s.only = replSettings.only; + s.save(txn); + } + } else { + try { + massert(10384, "--only requires use of --source", replSettings.only.empty()); + } catch (...) { + dbexit(EXIT_BADOPTIONS); } - uassert(17066, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state); } - bool ReplSource::throttledForceResyncDead( OperationContext* txn, const char *requester ) { - if ( time( 0 ) - lastForcedResync > 600 ) { - forceResyncDead( txn, requester ); - lastForcedResync = time( 0 ); - return true; + unique_ptr<PlanExecutor> exec( + InternalPlanner::collectionScan(txn, localSources, ctx.db()->getCollection(localSources))); + BSONObj obj; + PlanExecutor::ExecState state; + while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { + ReplSource tmp(txn, obj); + if (tmp.syncedTo.isNull()) { + DBDirectClient c(txn); + BSONObj op = c.findOne("local.oplog.$main", + QUERY("op" << NE << "n").sort(BSON("$natural" << -1))); + if (!op.isEmpty()) { + tmp.syncedTo = op["ts"].timestamp(); + } } + addSourceToList(txn, v, tmp, old); + } + uassert(17066, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state); +} + +bool ReplSource::throttledForceResyncDead(OperationContext* txn, const char* requester) { + if (time(0) - lastForcedResync > 600) { + forceResyncDead(txn, requester); + lastForcedResync = time(0); + return true; + } + return false; +} + +void ReplSource::forceResyncDead(OperationContext* txn, const char* requester) { + if (!replAllDead) + return; + SourceVector sources; + ReplSource::loadAll(txn, sources); + for (SourceVector::iterator i = sources.begin(); i != sources.end(); ++i) { + log() << requester << " forcing resync from " << (*i)->hostName << endl; + (*i)->forceResync(txn, requester); + } + replAllDead = 0; +} + +class HandshakeCmd : public Command { +public: + void help(stringstream& h) const { + h << "internal"; + } + HandshakeCmd() : Command("handshake") {} + virtual bool isWriteCommandForConfigServer() const { + return false; + } + virtual bool slaveOk() const { + return true; + } + virtual bool adminOnly() const { return false; } + virtual void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) { + ActionSet actions; + actions.addAction(ActionType::internal); + out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + } - void ReplSource::forceResyncDead( OperationContext* txn, const char *requester ) { - if ( !replAllDead ) - return; - SourceVector sources; - ReplSource::loadAll(txn, sources); - for( SourceVector::iterator i = sources.begin(); i != sources.end(); ++i ) { - log() << requester << " forcing resync from " << (*i)->hostName << endl; - (*i)->forceResync( txn, requester ); - } - replAllDead = 0; - } - - class HandshakeCmd : public Command { - public: - void help(stringstream& h) const { h << "internal"; } - HandshakeCmd() : Command("handshake") {} - virtual bool isWriteCommandForConfigServer() const { return false; } - virtual bool slaveOk() const { return true; } - virtual bool adminOnly() const { return false; } - virtual void addRequiredPrivileges(const std::string& dbname, - const BSONObj& cmdObj, - std::vector<Privilege>* out) { - ActionSet actions; - actions.addAction(ActionType::internal); - out->push_back(Privilege(ResourcePattern::forClusterResource(), actions)); + virtual bool run(OperationContext* txn, + const string& ns, + BSONObj& cmdObj, + int options, + string& errmsg, + BSONObjBuilder& result) { + HandshakeArgs handshake; + Status status = handshake.initialize(cmdObj); + if (!status.isOK()) { + return appendCommandStatus(result, status); } - virtual bool run(OperationContext* txn, - const string& ns, - BSONObj& cmdObj, - int options, - string& errmsg, - BSONObjBuilder& result) { - - HandshakeArgs handshake; - Status status = handshake.initialize(cmdObj); - if (!status.isOK()) { - return appendCommandStatus(result, status); - } + ReplClientInfo::forClient(txn->getClient()).setRemoteID(handshake.getRid()); - ReplClientInfo::forClient(txn->getClient()).setRemoteID(handshake.getRid()); + status = getGlobalReplicationCoordinator()->processHandshake(txn, handshake); + return appendCommandStatus(result, status); + } - status = getGlobalReplicationCoordinator()->processHandshake(txn, handshake); - return appendCommandStatus(result, status); - } +} handshakeCmd; - } handshakeCmd; +bool replHandshake(DBClientConnection* conn, const OID& myRID) { + string myname = getHostName(); - bool replHandshake(DBClientConnection *conn, const OID& myRID) { - string myname = getHostName(); + BSONObjBuilder cmd; + cmd.append("handshake", myRID); - BSONObjBuilder cmd; - cmd.append("handshake", myRID); + BSONObj res; + bool ok = conn->runCommand("admin", cmd.obj(), res); + // ignoring for now on purpose for older versions + LOG(ok ? 1 : 0) << "replHandshake result: " << res << endl; + return true; +} - BSONObj res; - bool ok = conn->runCommand( "admin" , cmd.obj() , res ); - // ignoring for now on purpose for older versions - LOG( ok ? 1 : 0 ) << "replHandshake result: " << res << endl; +bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OID& myRID) { + if (reader->conn()) { return true; } - bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OID& myRID) { - if (reader->conn()) { - return true; - } - - if (!reader->connect(host)) { - return false; - } - - if (!replHandshake(reader->conn(), myRID)) { - return false; - } + if (!reader->connect(host)) { + return false; + } - return true; + if (!replHandshake(reader->conn(), myRID)) { + return false; } + return true; +} - void ReplSource::forceResync( OperationContext* txn, const char *requester ) { - BSONObj info; - { - // This is always a GlobalWrite lock (so no ns/db used from the context) - invariant(txn->lockState()->isW()); - Lock::TempRelease tempRelease(txn->lockState()); - if (!_connect(&oplogReader, HostAndPort(hostName), - getGlobalReplicationCoordinator()->getMyRID())) { - msgassertedNoTrace( 14051 , "unable to connect to resync"); - } - /* todo use getDatabaseNames() method here */ - bool ok = oplogReader.conn()->runCommand("admin", - BSON("listDatabases" << 1), - info, - QueryOption_SlaveOk); - massert( 10385 , "Unable to get database list", ok ); +void ReplSource::forceResync(OperationContext* txn, const char* requester) { + BSONObj info; + { + // This is always a GlobalWrite lock (so no ns/db used from the context) + invariant(txn->lockState()->isW()); + Lock::TempRelease tempRelease(txn->lockState()); + + if (!_connect(&oplogReader, + HostAndPort(hostName), + getGlobalReplicationCoordinator()->getMyRID())) { + msgassertedNoTrace(14051, "unable to connect to resync"); } + /* todo use getDatabaseNames() method here */ + bool ok = oplogReader.conn()->runCommand( + "admin", BSON("listDatabases" << 1), info, QueryOption_SlaveOk); + massert(10385, "Unable to get database list", ok); + } - BSONObjIterator i( info.getField( "databases" ).embeddedObject() ); - while( i.moreWithEOO() ) { - BSONElement e = i.next(); - if ( e.eoo() ) - break; - string name = e.embeddedObject().getField( "name" ).valuestr(); - if ( !e.embeddedObject().getBoolField( "empty" ) ) { - if ( name != "local" ) { - if ( only.empty() || only == name ) { - resyncDrop( txn, name ); - } + BSONObjIterator i(info.getField("databases").embeddedObject()); + while (i.moreWithEOO()) { + BSONElement e = i.next(); + if (e.eoo()) + break; + string name = e.embeddedObject().getField("name").valuestr(); + if (!e.embeddedObject().getBoolField("empty")) { + if (name != "local") { + if (only.empty() || only == name) { + resyncDrop(txn, name); } } } - syncedTo = Timestamp(); - addDbNextPass.clear(); - save(txn); - } - - void ReplSource::resyncDrop( OperationContext* txn, const string& db ) { - log() << "resync: dropping database " << db; - OldClientContext ctx(txn, db); - dropDatabase(txn, ctx.db()); } - - /* grab initial copy of a database from the master */ - void ReplSource::resync(OperationContext* txn, const std::string& dbName) { - const std::string db(dbName); // need local copy of the name, we're dropping the original - resyncDrop( txn, db ); - - { - log() << "resync: cloning database " << db << " to get an initial copy" << endl; - ReplInfo r("resync: cloning a database"); - - CloneOptions cloneOptions; - cloneOptions.fromDB = db; - cloneOptions.slaveOk = true; - cloneOptions.useReplAuth = true; - cloneOptions.snapshot = true; - cloneOptions.mayYield = true; - cloneOptions.mayBeInterrupted = false; - - Cloner cloner; - Status status = cloner.copyDb(txn, - db, - hostName.c_str(), - cloneOptions, - NULL); - - if (!status.isOK()) { - if (status.code() == ErrorCodes::DatabaseDifferCase) { - resyncDrop( txn, db ); - log() << "resync: database " << db - << " not valid on the master due to a name conflict, dropping."; - return; - } - else { - log() << "resync of " << db << " from " << hostName - << " failed due to: " << status.toString(); - throw SyncException(); - } + syncedTo = Timestamp(); + addDbNextPass.clear(); + save(txn); +} + +void ReplSource::resyncDrop(OperationContext* txn, const string& db) { + log() << "resync: dropping database " << db; + OldClientContext ctx(txn, db); + dropDatabase(txn, ctx.db()); +} + +/* grab initial copy of a database from the master */ +void ReplSource::resync(OperationContext* txn, const std::string& dbName) { + const std::string db(dbName); // need local copy of the name, we're dropping the original + resyncDrop(txn, db); + + { + log() << "resync: cloning database " << db << " to get an initial copy" << endl; + ReplInfo r("resync: cloning a database"); + + CloneOptions cloneOptions; + cloneOptions.fromDB = db; + cloneOptions.slaveOk = true; + cloneOptions.useReplAuth = true; + cloneOptions.snapshot = true; + cloneOptions.mayYield = true; + cloneOptions.mayBeInterrupted = false; + + Cloner cloner; + Status status = cloner.copyDb(txn, db, hostName.c_str(), cloneOptions, NULL); + + if (!status.isOK()) { + if (status.code() == ErrorCodes::DatabaseDifferCase) { + resyncDrop(txn, db); + log() << "resync: database " << db + << " not valid on the master due to a name conflict, dropping."; + return; + } else { + log() << "resync of " << db << " from " << hostName + << " failed due to: " << status.toString(); + throw SyncException(); } } - - log() << "resync: done with initial clone for db: " << db << endl; } - static DatabaseIgnorer ___databaseIgnorer; + log() << "resync: done with initial clone for db: " << db << endl; +} - void DatabaseIgnorer::doIgnoreUntilAfter( const string &db, const Timestamp &futureOplogTime ) { - if ( futureOplogTime > _ignores[ db ] ) { - _ignores[ db ] = futureOplogTime; - } +static DatabaseIgnorer ___databaseIgnorer; + +void DatabaseIgnorer::doIgnoreUntilAfter(const string& db, const Timestamp& futureOplogTime) { + if (futureOplogTime > _ignores[db]) { + _ignores[db] = futureOplogTime; } +} - bool DatabaseIgnorer::ignoreAt( const string &db, const Timestamp ¤tOplogTime ) { - if ( _ignores[ db ].isNull() ) { - return false; - } - if ( _ignores[ db ] >= currentOplogTime ) { - return true; - } else { - // The ignore state has expired, so clear it. - _ignores.erase( db ); - return false; - } +bool DatabaseIgnorer::ignoreAt(const string& db, const Timestamp& currentOplogTime) { + if (_ignores[db].isNull()) { + return false; + } + if (_ignores[db] >= currentOplogTime) { + return true; + } else { + // The ignore state has expired, so clear it. + _ignores.erase(db); + return false; + } +} + +bool ReplSource::handleDuplicateDbName(OperationContext* txn, + const BSONObj& op, + const char* ns, + const char* db) { + // We are already locked at this point + if (dbHolder().get(txn, ns) != NULL) { + // Database is already present. + return true; + } + BSONElement ts = op.getField("ts"); + if ((ts.type() == Date || ts.type() == bsonTimestamp) && + ___databaseIgnorer.ignoreAt(db, ts.timestamp())) { + // Database is ignored due to a previous indication that it is + // missing from master after optime "ts". + return false; + } + if (Database::duplicateUncasedName(db).empty()) { + // No duplicate database names are present. + return true; } - bool ReplSource::handleDuplicateDbName( OperationContext* txn, - const BSONObj &op, - const char* ns, - const char* db ) { - // We are already locked at this point - if (dbHolder().get(txn, ns) != NULL) { - // Database is already present. - return true; - } - BSONElement ts = op.getField( "ts" ); - if ( ( ts.type() == Date || ts.type() == bsonTimestamp ) && ___databaseIgnorer.ignoreAt( db, ts.timestamp() ) ) { - // Database is ignored due to a previous indication that it is - // missing from master after optime "ts". - return false; - } - if (Database::duplicateUncasedName(db).empty()) { - // No duplicate database names are present. - return true; + Timestamp lastTime; + bool dbOk = false; + { + // This is always a GlobalWrite lock (so no ns/db used from the context) + invariant(txn->lockState()->isW()); + Lock::TempRelease(txn->lockState()); + + // We always log an operation after executing it (never before), so + // a database list will always be valid as of an oplog entry generated + // before it was retrieved. + + BSONObj last = + oplogReader.findOne(this->ns().c_str(), Query().sort(BSON("$natural" << -1))); + if (!last.isEmpty()) { + BSONElement ts = last.getField("ts"); + massert(14032, + "Invalid 'ts' in remote log", + ts.type() == Date || ts.type() == bsonTimestamp); + lastTime = Timestamp(ts.date()); } - Timestamp lastTime; - bool dbOk = false; - { - // This is always a GlobalWrite lock (so no ns/db used from the context) - invariant(txn->lockState()->isW()); - Lock::TempRelease(txn->lockState()); - - // We always log an operation after executing it (never before), so - // a database list will always be valid as of an oplog entry generated - // before it was retrieved. - - BSONObj last = oplogReader.findOne( this->ns().c_str(), Query().sort( BSON( "$natural" << -1 ) ) ); - if ( !last.isEmpty() ) { - BSONElement ts = last.getField( "ts" ); - massert(14032, "Invalid 'ts' in remote log", - ts.type() == Date || ts.type() == bsonTimestamp); - lastTime = Timestamp( ts.date() ); - } - - BSONObj info; - bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info ); - massert( 14033, "Unable to get database list", ok ); - BSONObjIterator i( info.getField( "databases" ).embeddedObject() ); - while( i.more() ) { - BSONElement e = i.next(); - - const char * name = e.embeddedObject().getField( "name" ).valuestr(); - if ( strcasecmp( name, db ) != 0 ) - continue; + BSONObj info; + bool ok = oplogReader.conn()->runCommand("admin", BSON("listDatabases" << 1), info); + massert(14033, "Unable to get database list", ok); + BSONObjIterator i(info.getField("databases").embeddedObject()); + while (i.more()) { + BSONElement e = i.next(); - if ( strcmp( name, db ) == 0 ) { - // The db exists on master, still need to check that no conflicts exist there. - dbOk = true; - continue; - } + const char* name = e.embeddedObject().getField("name").valuestr(); + if (strcasecmp(name, db) != 0) + continue; - // The master has a db name that conflicts with the requested name. - dbOk = false; - break; + if (strcmp(name, db) == 0) { + // The db exists on master, still need to check that no conflicts exist there. + dbOk = true; + continue; } - } - if ( !dbOk ) { - ___databaseIgnorer.doIgnoreUntilAfter( db, lastTime ); - incompleteCloneDbs.erase(db); - addDbNextPass.erase(db); - return false; + // The master has a db name that conflicts with the requested name. + dbOk = false; + break; } + } - // Check for duplicates again, since we released the lock above. - set< string > duplicates; - Database::duplicateUncasedName(db, &duplicates); + if (!dbOk) { + ___databaseIgnorer.doIgnoreUntilAfter(db, lastTime); + incompleteCloneDbs.erase(db); + addDbNextPass.erase(db); + return false; + } - // The database is present on the master and no conflicting databases - // are present on the master. Drop any local conflicts. - for( set< string >::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i ) { - ___databaseIgnorer.doIgnoreUntilAfter( *i, lastTime ); - incompleteCloneDbs.erase(*i); - addDbNextPass.erase(*i); + // Check for duplicates again, since we released the lock above. + set<string> duplicates; + Database::duplicateUncasedName(db, &duplicates); - OldClientContext ctx(txn, *i); - dropDatabase(txn, ctx.db()); - } + // The database is present on the master and no conflicting databases + // are present on the master. Drop any local conflicts. + for (set<string>::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i) { + ___databaseIgnorer.doIgnoreUntilAfter(*i, lastTime); + incompleteCloneDbs.erase(*i); + addDbNextPass.erase(*i); - massert(14034, "Duplicate database names present after attempting to delete duplicates", - Database::duplicateUncasedName(db).empty()); - return true; + OldClientContext ctx(txn, *i); + dropDatabase(txn, ctx.db()); } - void ReplSource::applyCommand(OperationContext* txn, const BSONObj& op) { - try { - Status status = applyCommand_inlock(txn, op); - if (!status.isOK()) { - SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc()); - sync.setHostname(hostName); - if (sync.shouldRetry(txn, op)) { - uassert(28639, - "Failure retrying initial sync update", - applyCommand_inlock(txn, op).isOK()); - } + massert(14034, + "Duplicate database names present after attempting to delete duplicates", + Database::duplicateUncasedName(db).empty()); + return true; +} + +void ReplSource::applyCommand(OperationContext* txn, const BSONObj& op) { + try { + Status status = applyCommand_inlock(txn, op); + if (!status.isOK()) { + SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc()); + sync.setHostname(hostName); + if (sync.shouldRetry(txn, op)) { + uassert(28639, + "Failure retrying initial sync update", + applyCommand_inlock(txn, op).isOK()); } } - catch ( UserException& e ) { - log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;; - } - catch ( DBException& e ) { - log() << "sync: caught db exception " << e << " while applying op: " << op << endl;; - } - + } catch (UserException& e) { + log() << "sync: caught user assertion " << e << " while applying op: " << op << endl; + ; + } catch (DBException& e) { + log() << "sync: caught db exception " << e << " while applying op: " << op << endl; + ; } - - void ReplSource::applyOperation(OperationContext* txn, Database* db, const BSONObj& op) { - try { - Status status = applyOperation_inlock( txn, db, op ); - if (!status.isOK()) { - SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc()); - sync.setHostname(hostName); - if (sync.shouldRetry(txn, op)) { - uassert(15914, - "Failure retrying initial sync update", - applyOperation_inlock(txn, db, op).isOK()); - } +} + +void ReplSource::applyOperation(OperationContext* txn, Database* db, const BSONObj& op) { + try { + Status status = applyOperation_inlock(txn, db, op); + if (!status.isOK()) { + SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc()); + sync.setHostname(hostName); + if (sync.shouldRetry(txn, op)) { + uassert(15914, + "Failure retrying initial sync update", + applyOperation_inlock(txn, db, op).isOK()); } } - catch ( UserException& e ) { - log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;; - } - catch ( DBException& e ) { - log() << "sync: caught db exception " << e << " while applying op: " << op << endl;; - } - + } catch (UserException& e) { + log() << "sync: caught user assertion " << e << " while applying op: " << op << endl; + ; + } catch (DBException& e) { + log() << "sync: caught db exception " << e << " while applying op: " << op << endl; + ; } +} - /* local.$oplog.main is of the form: - { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> } - ... - see logOp() comments. +/* local.$oplog.main is of the form: + { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> } + ... + see logOp() comments. - @param alreadyLocked caller already put us in write lock if true - */ - void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, BSONObj& op, bool alreadyLocked) { - LOG(6) << "processing op: " << op << endl; - - if( op.getStringField("op")[0] == 'n' ) - return; - - char clientName[MaxDatabaseNameLen]; - const char *ns = op.getStringField("ns"); - nsToDatabase(ns, clientName); - - if ( *ns == '.' ) { - log() << "skipping bad op in oplog: " << op.toString() << endl; - return; - } - else if ( *ns == 0 ) { - /*if( op.getStringField("op")[0] != 'n' )*/ { - log() << "halting replication, bad op in oplog:\n " << op.toString() << endl; - replAllDead = "bad object in oplog"; - throw SyncException(); - } - //ns = "local.system.x"; - //nsToDatabase(ns, clientName); + @param alreadyLocked caller already put us in write lock if true +*/ +void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, + BSONObj& op, + bool alreadyLocked) { + LOG(6) << "processing op: " << op << endl; + + if (op.getStringField("op")[0] == 'n') + return; + + char clientName[MaxDatabaseNameLen]; + const char* ns = op.getStringField("ns"); + nsToDatabase(ns, clientName); + + if (*ns == '.') { + log() << "skipping bad op in oplog: " << op.toString() << endl; + return; + } else if (*ns == 0) { + /*if( op.getStringField("op")[0] != 'n' )*/ { + log() << "halting replication, bad op in oplog:\n " << op.toString() << endl; + replAllDead = "bad object in oplog"; + throw SyncException(); } + // ns = "local.system.x"; + // nsToDatabase(ns, clientName); + } - if ( !only.empty() && only != clientName ) - return; - - // Push the CurOp stack for "txn" so each individual oplog entry application is separately - // reported. - CurOp individualOp(txn); - txn->setReplicatedWrites(false); - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); - if (replSettings.pretouch && - !alreadyLocked/*doesn't make sense if in write lock already*/) { - if (replSettings.pretouch > 1) { - /* note: this is bad - should be put in ReplSource. but this is first test... */ - static int countdown; - verify( countdown >= 0 ); - if( countdown > 0 ) { - countdown--; // was pretouched on a prev pass + if (!only.empty() && only != clientName) + return; + + // Push the CurOp stack for "txn" so each individual oplog entry application is separately + // reported. + CurOp individualOp(txn); + txn->setReplicatedWrites(false); + const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + if (replSettings.pretouch && !alreadyLocked /*doesn't make sense if in write lock already*/) { + if (replSettings.pretouch > 1) { + /* note: this is bad - should be put in ReplSource. but this is first test... */ + static int countdown; + verify(countdown >= 0); + if (countdown > 0) { + countdown--; // was pretouched on a prev pass + } else { + const int m = 4; + if (tp.get() == 0) { + int nthr = min(8, replSettings.pretouch); + nthr = max(nthr, 1); + tp.reset(new OldThreadPool(nthr)); } - else { - const int m = 4; - if( tp.get() == 0 ) { - int nthr = min(8, replSettings.pretouch); - nthr = max(nthr, 1); - tp.reset( new OldThreadPool(nthr) ); - } - vector<BSONObj> v; - oplogReader.peek(v, replSettings.pretouch); - unsigned a = 0; - while( 1 ) { - if( a >= v.size() ) break; - unsigned b = a + m - 1; // v[a..b] - if( b >= v.size() ) b = v.size() - 1; - tp->schedule(pretouchN, v, a, b); - DEV cout << "pretouch task: " << a << ".." << b << endl; - a += m; - } - // we do one too... - pretouchOperation(txn, op); - tp->join(); - countdown = v.size(); + vector<BSONObj> v; + oplogReader.peek(v, replSettings.pretouch); + unsigned a = 0; + while (1) { + if (a >= v.size()) + break; + unsigned b = a + m - 1; // v[a..b] + if (b >= v.size()) + b = v.size() - 1; + tp->schedule(pretouchN, v, a, b); + DEV cout << "pretouch task: " << a << ".." << b << endl; + a += m; } - } - else { + // we do one too... pretouchOperation(txn, op); + tp->join(); + countdown = v.size(); } + } else { + pretouchOperation(txn, op); } + } - unique_ptr<Lock::GlobalWrite> lk(alreadyLocked ? 0 : new Lock::GlobalWrite(txn->lockState())); + unique_ptr<Lock::GlobalWrite> lk(alreadyLocked ? 0 : new Lock::GlobalWrite(txn->lockState())); - if ( replAllDead ) { - // hmmm why is this check here and not at top of this function? does it get set between top and here? - log() << "replAllDead, throwing SyncException: " << replAllDead << endl; - throw SyncException(); - } + if (replAllDead) { + // hmmm why is this check here and not at top of this function? does it get set between top and here? + log() << "replAllDead, throwing SyncException: " << replAllDead << endl; + throw SyncException(); + } - if (!handleDuplicateDbName(txn, op, ns, clientName)) { - return; - } + if (!handleDuplicateDbName(txn, op, ns, clientName)) { + return; + } - // special case apply for commands to avoid implicit database creation - if (*op.getStringField("op") == 'c') { - applyCommand(txn, op); - return; - } + // special case apply for commands to avoid implicit database creation + if (*op.getStringField("op") == 'c') { + applyCommand(txn, op); + return; + } - // This code executes on the slaves only, so it doesn't need to be sharding-aware since - // mongos will not send requests there. That's why the last argument is false (do not do - // version checking). - OldClientContext ctx(txn, ns, false); - - bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData(); - bool incompleteClone = incompleteCloneDbs.count( clientName ) != 0; - - LOG(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty << ", incompleteClone: " << incompleteClone << endl; - - if ( ctx.justCreated() || empty || incompleteClone ) { - // we must add to incomplete list now that setClient has been called - incompleteCloneDbs.insert( clientName ); - if ( nClonedThisPass ) { - /* we only clone one database per pass, even if a lot need done. This helps us - avoid overflowing the master's transaction log by doing too much work before going - back to read more transactions. (Imagine a scenario of slave startup where we try to - clone 100 databases in one pass.) - */ - addDbNextPass.insert( clientName ); - } - else { - if ( incompleteClone ) { - log() << "An earlier initial clone of '" << clientName << "' did not complete, now resyncing." << endl; - } - save(txn); - OldClientContext ctx(txn, ns); - nClonedThisPass++; - resync(txn, ctx.db()->name()); - addDbNextPass.erase(clientName); - incompleteCloneDbs.erase( clientName ); + // This code executes on the slaves only, so it doesn't need to be sharding-aware since + // mongos will not send requests there. That's why the last argument is false (do not do + // version checking). + OldClientContext ctx(txn, ns, false); + + bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData(); + bool incompleteClone = incompleteCloneDbs.count(clientName) != 0; + + LOG(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty + << ", incompleteClone: " << incompleteClone << endl; + + if (ctx.justCreated() || empty || incompleteClone) { + // we must add to incomplete list now that setClient has been called + incompleteCloneDbs.insert(clientName); + if (nClonedThisPass) { + /* we only clone one database per pass, even if a lot need done. This helps us + avoid overflowing the master's transaction log by doing too much work before going + back to read more transactions. (Imagine a scenario of slave startup where we try to + clone 100 databases in one pass.) + */ + addDbNextPass.insert(clientName); + } else { + if (incompleteClone) { + log() << "An earlier initial clone of '" << clientName + << "' did not complete, now resyncing." << endl; } save(txn); + OldClientContext ctx(txn, ns); + nClonedThisPass++; + resync(txn, ctx.db()->name()); + addDbNextPass.erase(clientName); + incompleteCloneDbs.erase(clientName); } - else { - applyOperation(txn, ctx.db(), op); - addDbNextPass.erase( clientName ); - } + save(txn); + } else { + applyOperation(txn, ctx.db(), op); + addDbNextPass.erase(clientName); } +} - void ReplSource::syncToTailOfRemoteLog() { - string _ns = ns(); - BSONObjBuilder b; - if ( !only.empty() ) { - b.appendRegex("ns", string("^") + pcrecpp::RE::QuoteMeta( only )); - } - BSONObj last = oplogReader.findOne( _ns.c_str(), Query( b.done() ).sort( BSON( "$natural" << -1 ) ) ); - if ( !last.isEmpty() ) { - BSONElement ts = last.getField( "ts" ); - massert(10386, "non Date ts found: " + last.toString(), - ts.type() == Date || ts.type() == bsonTimestamp); - syncedTo = Timestamp( ts.date() ); - } +void ReplSource::syncToTailOfRemoteLog() { + string _ns = ns(); + BSONObjBuilder b; + if (!only.empty()) { + b.appendRegex("ns", string("^") + pcrecpp::RE::QuoteMeta(only)); + } + BSONObj last = oplogReader.findOne(_ns.c_str(), Query(b.done()).sort(BSON("$natural" << -1))); + if (!last.isEmpty()) { + BSONElement ts = last.getField("ts"); + massert(10386, + "non Date ts found: " + last.toString(), + ts.type() == Date || ts.type() == bsonTimestamp); + syncedTo = Timestamp(ts.date()); } +} - class ReplApplyBatchSize : public ServerParameter { - public: - ReplApplyBatchSize() - : ServerParameter( ServerParameterSet::getGlobal(), "replApplyBatchSize" ), - _value( 1 ) { - } +class ReplApplyBatchSize : public ServerParameter { +public: + ReplApplyBatchSize() + : ServerParameter(ServerParameterSet::getGlobal(), "replApplyBatchSize"), _value(1) {} - int get() const { return _value; } + int get() const { + return _value; + } + + virtual void append(OperationContext* txn, BSONObjBuilder& b, const string& name) { + b.append(name, _value); + } - virtual void append(OperationContext* txn, BSONObjBuilder& b, const string& name) { - b.append( name, _value ); + virtual Status set(const BSONElement& newValuElement) { + return set(newValuElement.numberInt()); + } + + virtual Status set(int b) { + if (b < 1 || b > 1024) { + return Status(ErrorCodes::BadValue, "replApplyBatchSize has to be >= 1 and < 1024"); } - virtual Status set( const BSONElement& newValuElement ) { - return set( newValuElement.numberInt() ); + const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + if (replSettings.slavedelay != 0 && b > 1) { + return Status(ErrorCodes::BadValue, "can't use a batch size > 1 with slavedelay"); + } + if (!replSettings.slave) { + return Status(ErrorCodes::BadValue, + "can't set replApplyBatchSize on a non-slave machine"); } - virtual Status set( int b ) { - if( b < 1 || b > 1024 ) { - return Status( ErrorCodes::BadValue, - "replApplyBatchSize has to be >= 1 and < 1024" ); - } + _value = b; + return Status::OK(); + } - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); - if ( replSettings.slavedelay != 0 && b > 1 ) { - return Status( ErrorCodes::BadValue, - "can't use a batch size > 1 with slavedelay" ); - } - if ( ! replSettings.slave ) { - return Status( ErrorCodes::BadValue, - "can't set replApplyBatchSize on a non-slave machine" ); - } + virtual Status setFromString(const string& str) { + return set(atoi(str.c_str())); + } - _value = b; - return Status::OK(); - } + int _value; - virtual Status setFromString( const string& str ) { - return set( atoi( str.c_str() ) ); - } +} replApplyBatchSize; - int _value; - - } replApplyBatchSize; - - /* slave: pull some data from the master's oplog - note: not yet in db mutex at this point. - @return -1 error - 0 ok, don't sleep - 1 ok, sleep - */ - int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) { - int okResultCode = 1; - string ns = string("local.oplog.$") + sourceName(); - LOG(2) << "sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n'; - - bool tailing = true; - oplogReader.tailCheck(); - - bool initial = syncedTo.isNull(); - - if ( !oplogReader.haveCursor() || initial ) { - if ( initial ) { - // Important to grab last oplog timestamp before listing databases. - syncToTailOfRemoteLog(); - BSONObj info; - bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info ); - massert( 10389 , "Unable to get database list", ok ); - BSONObjIterator i( info.getField( "databases" ).embeddedObject() ); - while( i.moreWithEOO() ) { - BSONElement e = i.next(); - if ( e.eoo() ) - break; - string name = e.embeddedObject().getField( "name" ).valuestr(); - if ( !e.embeddedObject().getBoolField( "empty" ) ) { - if ( name != "local" ) { - if ( only.empty() || only == name ) { - LOG( 2 ) << "adding to 'addDbNextPass': " << name << endl; - addDbNextPass.insert( name ); - } +/* slave: pull some data from the master's oplog + note: not yet in db mutex at this point. + @return -1 error + 0 ok, don't sleep + 1 ok, sleep +*/ +int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) { + int okResultCode = 1; + string ns = string("local.oplog.$") + sourceName(); + LOG(2) << "sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n'; + + bool tailing = true; + oplogReader.tailCheck(); + + bool initial = syncedTo.isNull(); + + if (!oplogReader.haveCursor() || initial) { + if (initial) { + // Important to grab last oplog timestamp before listing databases. + syncToTailOfRemoteLog(); + BSONObj info; + bool ok = oplogReader.conn()->runCommand("admin", BSON("listDatabases" << 1), info); + massert(10389, "Unable to get database list", ok); + BSONObjIterator i(info.getField("databases").embeddedObject()); + while (i.moreWithEOO()) { + BSONElement e = i.next(); + if (e.eoo()) + break; + string name = e.embeddedObject().getField("name").valuestr(); + if (!e.embeddedObject().getBoolField("empty")) { + if (name != "local") { + if (only.empty() || only == name) { + LOG(2) << "adding to 'addDbNextPass': " << name << endl; + addDbNextPass.insert(name); } } } - // obviously global isn't ideal, but non-repl set is old so - // keeping it simple - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lk(txn->lockState()); - save(txn); } + // obviously global isn't ideal, but non-repl set is old so + // keeping it simple + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); + save(txn); + } - BSONObjBuilder gte; - gte.append("$gte", syncedTo); - BSONObjBuilder query; - query.append("ts", gte.done()); - if ( !only.empty() ) { - // note we may here skip a LOT of data table scanning, a lot of work for the master. - // maybe append "\\." here? - query.appendRegex("ns", string("^") + pcrecpp::RE::QuoteMeta( only )); - } - BSONObj queryObj = query.done(); - // e.g. queryObj = { ts: { $gte: syncedTo } } + BSONObjBuilder gte; + gte.append("$gte", syncedTo); + BSONObjBuilder query; + query.append("ts", gte.done()); + if (!only.empty()) { + // note we may here skip a LOT of data table scanning, a lot of work for the master. + // maybe append "\\." here? + query.appendRegex("ns", string("^") + pcrecpp::RE::QuoteMeta(only)); + } + BSONObj queryObj = query.done(); + // e.g. queryObj = { ts: { $gte: syncedTo } } - oplogReader.tailingQuery(ns.c_str(), queryObj); - tailing = false; + oplogReader.tailingQuery(ns.c_str(), queryObj); + tailing = false; + } else { + LOG(2) << "tailing=true\n"; + } + + if (!oplogReader.haveCursor()) { + log() << "dbclient::query returns null (conn closed?)" << endl; + oplogReader.resetConnection(); + return -1; + } + + // show any deferred database creates from a previous pass + { + set<string>::iterator i = addDbNextPass.begin(); + if (i != addDbNextPass.end()) { + BSONObjBuilder b; + b.append("ns", *i + '.'); + b.append("op", "db"); + BSONObj op = b.done(); + _sync_pullOpLog_applyOperation(txn, op, false); + } + } + + if (!oplogReader.more()) { + if (tailing) { + LOG(2) << "tailing & no new activity\n"; + okResultCode = 0; // don't sleep + + } else { + log() << ns << " oplog is empty" << endl; } - else { - LOG(2) << "tailing=true\n"; + { + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); + save(txn); } + return okResultCode; + } - if( !oplogReader.haveCursor() ) { - log() << "dbclient::query returns null (conn closed?)" << endl; - oplogReader.resetConnection(); - return -1; + Timestamp nextOpTime; + { + BSONObj op = oplogReader.next(); + BSONElement ts = op.getField("ts"); + if (ts.type() != Date && ts.type() != bsonTimestamp) { + string err = op.getStringField("$err"); + if (!err.empty()) { + // 13051 is "tailable cursor requested on non capped collection" + if (op.getIntField("code") == 13051) { + log() << "trying to slave off of a non-master" << '\n'; + massert(13344, "trying to slave off of a non-master", false); + } else { + error() << "$err reading remote oplog: " + err << '\n'; + massert(10390, "got $err reading remote oplog", false); + } + } else { + error() << "bad object read from remote oplog: " << op.toString() << '\n'; + massert(10391, "bad object read from remote oplog", false); + } } - // show any deferred database creates from a previous pass - { - set<string>::iterator i = addDbNextPass.begin(); - if ( i != addDbNextPass.end() ) { - BSONObjBuilder b; - b.append("ns", *i + '.'); - b.append("op", "db"); - BSONObj op = b.done(); - _sync_pullOpLog_applyOperation(txn, op, false); + nextOpTime = Timestamp(ts.date()); + LOG(2) << "first op time received: " << nextOpTime.toString() << '\n'; + if (initial) { + LOG(1) << "initial run\n"; + } + if (tailing) { + if (!(syncedTo < nextOpTime)) { + warning() << "ASSERTION failed : syncedTo < nextOpTime" << endl; + log() << "syncTo: " << syncedTo.toStringLong() << endl; + log() << "nextOpTime: " << nextOpTime.toStringLong() << endl; + verify(false); } + oplogReader.putBack(op); // op will be processed in the loop below + nextOpTime = Timestamp(); // will reread the op below + } else if (nextOpTime != syncedTo) { // didn't get what we queried for - error + log() << "nextOpTime " << nextOpTime.toStringLong() << ' ' + << ((nextOpTime < syncedTo) ? "<??" : ">") << " syncedTo " + << syncedTo.toStringLong() << '\n' + << "time diff: " << (nextOpTime.getSecs() - syncedTo.getSecs()) << "sec\n" + << "tailing: " << tailing << '\n' << "data too stale, halting replication" + << endl; + replInfo = replAllDead = "data too stale halted replication"; + verify(syncedTo < nextOpTime); + throw SyncException(); + } else { + /* t == syncedTo, so the first op was applied previously or it is the first op of initial query and need not be applied. */ } + } - if ( !oplogReader.more() ) { - if ( tailing ) { - LOG(2) << "tailing & no new activity\n"; - okResultCode = 0; // don't sleep + // apply operations + { + int n = 0; + time_t saveLast = time(0); + while (1) { + // we need "&& n" to assure we actually process at least one op to get a sync + // point recorded in the first place. + const bool moreInitialSyncsPending = !addDbNextPass.empty() && n; - } - else { - log() << ns << " oplog is empty" << endl; - } - { + if (moreInitialSyncsPending || !oplogReader.more()) { ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite lk(txn->lockState()); - save(txn); - } - return okResultCode; - } - Timestamp nextOpTime; - { - BSONObj op = oplogReader.next(); - BSONElement ts = op.getField("ts"); - if ( ts.type() != Date && ts.type() != bsonTimestamp ) { - string err = op.getStringField("$err"); - if ( !err.empty() ) { - // 13051 is "tailable cursor requested on non capped collection" - if (op.getIntField("code") == 13051) { - log() << "trying to slave off of a non-master" << '\n'; - massert( 13344 , "trying to slave off of a non-master", false ); - } - else { - error() << "$err reading remote oplog: " + err << '\n'; - massert( 10390 , "got $err reading remote oplog", false ); - } + if (tailing) { + okResultCode = 0; // don't sleep } - else { - error() << "bad object read from remote oplog: " << op.toString() << '\n'; - massert( 10391 , "bad object read from remote oplog", false); - } - } - nextOpTime = Timestamp( ts.date() ); - LOG(2) << "first op time received: " << nextOpTime.toString() << '\n'; - if ( initial ) { - LOG(1) << "initial run\n"; - } - if( tailing ) { - if( !( syncedTo < nextOpTime ) ) { - warning() << "ASSERTION failed : syncedTo < nextOpTime" << endl; - log() << "syncTo: " << syncedTo.toStringLong() << endl; - log() << "nextOpTime: " << nextOpTime.toStringLong() << endl; - verify(false); - } - oplogReader.putBack( op ); // op will be processed in the loop below - nextOpTime = Timestamp(); // will reread the op below - } - else if ( nextOpTime != syncedTo ) { // didn't get what we queried for - error - log() - << "nextOpTime " << nextOpTime.toStringLong() << ' ' - << ((nextOpTime < syncedTo) ? "<??" : ">") - << " syncedTo " << syncedTo.toStringLong() << '\n' - << "time diff: " << (nextOpTime.getSecs() - syncedTo.getSecs()) - << "sec\n" - << "tailing: " << tailing << '\n' - << "data too stale, halting replication" << endl; - replInfo = replAllDead = "data too stale halted replication"; - verify( syncedTo < nextOpTime ); - throw SyncException(); - } - else { - /* t == syncedTo, so the first op was applied previously or it is the first op of initial query and need not be applied. */ + syncedTo = nextOpTime; + save(txn); // note how far we are synced up to now + nApplied = n; + break; } - } - // apply operations - { - int n = 0; - time_t saveLast = time(0); - while ( 1 ) { - // we need "&& n" to assure we actually process at least one op to get a sync - // point recorded in the first place. - const bool moreInitialSyncsPending = !addDbNextPass.empty() && n; - - if ( moreInitialSyncsPending || !oplogReader.more() ) { - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lk(txn->lockState()); + OCCASIONALLY if (n > 0 && (n > 100000 || time(0) - saveLast > 60)) { + // periodically note our progress, in case we are doing a lot of work and crash + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); + syncedTo = nextOpTime; + // can't update local log ts since there are pending operations from our peer + save(txn); + log() << "checkpoint applied " << n << " operations" << endl; + log() << "syncedTo: " << syncedTo.toStringLong() << endl; + saveLast = time(0); + n = 0; + } - if (tailing) { - okResultCode = 0; // don't sleep - } + BSONObj op = oplogReader.next(); - syncedTo = nextOpTime; - save(txn); // note how far we are synced up to now - nApplied = n; - break; + int b = replApplyBatchSize.get(); + bool justOne = b == 1; + unique_ptr<Lock::GlobalWrite> lk(justOne ? 0 : new Lock::GlobalWrite(txn->lockState())); + while (1) { + BSONElement ts = op.getField("ts"); + if (!(ts.type() == Date || ts.type() == bsonTimestamp)) { + log() << "sync error: problem querying remote oplog record" << endl; + log() << "op: " << op.toString() << endl; + log() << "halting replication" << endl; + replInfo = replAllDead = "sync error: no ts found querying remote oplog record"; + throw SyncException(); } - - OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) { - // periodically note our progress, in case we are doing a lot of work and crash + Timestamp last = nextOpTime; + nextOpTime = Timestamp(ts.date()); + if (!(last < nextOpTime)) { + log() << "sync error: last applied optime at slave >= nextOpTime from master" + << endl; + log() << " last: " << last.toStringLong() << endl; + log() << " nextOpTime: " << nextOpTime.toStringLong() << endl; + log() << " halting replication" << endl; + replInfo = replAllDead = "sync error last >= nextOpTime"; + uassert( + 10123, + "replication error last applied optime at slave >= nextOpTime from master", + false); + } + const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + if (replSettings.slavedelay && + (unsigned(time(0)) < nextOpTime.getSecs() + replSettings.slavedelay)) { + verify(justOne); + oplogReader.putBack(op); + _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1; ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite lk(txn->lockState()); - syncedTo = nextOpTime; - // can't update local log ts since there are pending operations from our peer - save(txn); - log() << "checkpoint applied " << n << " operations" << endl; + if (n > 0) { + syncedTo = last; + save(txn); + } + log() << "applied " << n << " operations" << endl; log() << "syncedTo: " << syncedTo.toStringLong() << endl; - saveLast = time(0); - n = 0; + log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl; + return okResultCode; } - BSONObj op = oplogReader.next(); - - int b = replApplyBatchSize.get(); - bool justOne = b == 1; - unique_ptr<Lock::GlobalWrite> lk(justOne ? 0 : new Lock::GlobalWrite(txn->lockState())); - while( 1 ) { - - BSONElement ts = op.getField("ts"); - if( !( ts.type() == Date || ts.type() == bsonTimestamp ) ) { - log() << "sync error: problem querying remote oplog record" << endl; - log() << "op: " << op.toString() << endl; - log() << "halting replication" << endl; - replInfo = replAllDead = "sync error: no ts found querying remote oplog record"; - throw SyncException(); - } - Timestamp last = nextOpTime; - nextOpTime = Timestamp( ts.date() ); - if ( !( last < nextOpTime ) ) { - log() << "sync error: last applied optime at slave >= nextOpTime from master" << endl; - log() << " last: " << last.toStringLong() << endl; - log() << " nextOpTime: " << nextOpTime.toStringLong() << endl; - log() << " halting replication" << endl; - replInfo = replAllDead = "sync error last >= nextOpTime"; - uassert( 10123 , "replication error last applied optime at slave >= nextOpTime from master", false); - } - const ReplSettings& replSettings = - getGlobalReplicationCoordinator()->getSettings(); - if ( replSettings.slavedelay && ( unsigned( time( 0 ) ) < nextOpTime.getSecs() + replSettings.slavedelay ) ) { - verify( justOne ); - oplogReader.putBack( op ); - _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1; - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lk(txn->lockState()); - if ( n > 0 ) { - syncedTo = last; - save(txn); - } - log() << "applied " << n << " operations" << endl; - log() << "syncedTo: " << syncedTo.toStringLong() << endl; - log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl; - return okResultCode; - } - - _sync_pullOpLog_applyOperation(txn, op, !justOne); - n++; + _sync_pullOpLog_applyOperation(txn, op, !justOne); + n++; - if( --b == 0 ) - break; - // if to here, we are doing mulpile applications in a singel write lock acquisition - if( !oplogReader.moreInCurrentBatch() ) { - // break if no more in batch so we release lock while reading from the master - break; - } - op = oplogReader.next(); + if (--b == 0) + break; + // if to here, we are doing mulpile applications in a singel write lock acquisition + if (!oplogReader.moreInCurrentBatch()) { + // break if no more in batch so we release lock while reading from the master + break; } + op = oplogReader.next(); } } - - return okResultCode; } + return okResultCode; +} - /* note: not yet in mutex at this point. - returns >= 0 if ok. return -1 if you want to reconnect. - return value of zero indicates no sleep necessary before next call - */ - int ReplSource::sync(OperationContext* txn, int& nApplied) { - _sleepAdviceTime = 0; - ReplInfo r("sync"); - if (!serverGlobalParams.quiet) { - LogstreamBuilder l = log(); - l << "syncing from "; - if( sourceName() != "main" ) { - l << "source:" << sourceName() << ' '; - } - l << "host:" << hostName << endl; - } - nClonedThisPass = 0; - - // FIXME Handle cases where this db isn't on default port, or default port is spec'd in hostName. - if ((string("localhost") == hostName || string("127.0.0.1") == hostName) && - serverGlobalParams.port == ServerGlobalParams::DefaultDBPort) { - log() << "can't sync from self (localhost). sources configuration may be wrong." << endl; - sleepsecs(5); - return -1; - } - if ( !_connect(&oplogReader, - HostAndPort(hostName), - getGlobalReplicationCoordinator()->getMyRID()) ) { - LOG(4) << "can't connect to sync source" << endl; - return -1; - } +/* note: not yet in mutex at this point. + returns >= 0 if ok. return -1 if you want to reconnect. + return value of zero indicates no sleep necessary before next call +*/ +int ReplSource::sync(OperationContext* txn, int& nApplied) { + _sleepAdviceTime = 0; + ReplInfo r("sync"); + if (!serverGlobalParams.quiet) { + LogstreamBuilder l = log(); + l << "syncing from "; + if (sourceName() != "main") { + l << "source:" << sourceName() << ' '; + } + l << "host:" << hostName << endl; + } + nClonedThisPass = 0; + + // FIXME Handle cases where this db isn't on default port, or default port is spec'd in hostName. + if ((string("localhost") == hostName || string("127.0.0.1") == hostName) && + serverGlobalParams.port == ServerGlobalParams::DefaultDBPort) { + log() << "can't sync from self (localhost). sources configuration may be wrong." << endl; + sleepsecs(5); + return -1; + } - return _sync_pullOpLog(txn, nApplied); + if (!_connect( + &oplogReader, HostAndPort(hostName), getGlobalReplicationCoordinator()->getMyRID())) { + LOG(4) << "can't connect to sync source" << endl; + return -1; } - /* --------------------------------------------------------------*/ + return _sync_pullOpLog(txn, nApplied); +} - static bool _replMainStarted = false; +/* --------------------------------------------------------------*/ - /* - TODO: - _ source has autoptr to the cursor - _ reuse that cursor when we can - */ +static bool _replMainStarted = false; - /* returns: # of seconds to sleep before next pass - 0 = no sleep recommended - 1 = special sentinel indicating adaptive sleep recommended - */ - int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nApplied) { - { - ReplInfo r("replMain load sources"); - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lk(txn->lockState()); - ReplSource::loadAll(txn, sources); +/* +TODO: +_ source has autoptr to the cursor +_ reuse that cursor when we can +*/ - // only need this param for initial reset - _replMainStarted = true; - } +/* returns: # of seconds to sleep before next pass + 0 = no sleep recommended + 1 = special sentinel indicating adaptive sleep recommended +*/ +int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nApplied) { + { + ReplInfo r("replMain load sources"); + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); + ReplSource::loadAll(txn, sources); - if ( sources.empty() ) { - /* replication is not configured yet (for --slave) in local.sources. Poll for config it - every 20 seconds. - */ - log() << "no source given, add a master to local.sources to start replication" << endl; - return 20; - } + // only need this param for initial reset + _replMainStarted = true; + } - int sleepAdvice = 1; - for ( ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++ ) { - ReplSource *s = i->get(); - int res = -1; - try { - res = s->sync(txn, nApplied); - bool moreToSync = s->haveMoreDbsToSync(); - if( res < 0 ) { - sleepAdvice = 3; - } - else if( moreToSync ) { - sleepAdvice = 0; - } - else if ( s->sleepAdvice() ) { - sleepAdvice = s->sleepAdvice(); - } - else - sleepAdvice = res; - } - catch ( const SyncException& ) { - log() << "caught SyncException" << endl; - return 10; - } - catch ( AssertionException& e ) { - if ( e.severe() ) { - log() << "replMain AssertionException " << e.what() << endl; - return 60; - } - else { - log() << "AssertionException " << e.what() << endl; - } - replInfo = "replMain caught AssertionException"; - } - catch ( const DBException& e ) { - log() << "DBException " << e.what() << endl; - replInfo = "replMain caught DBException"; - } - catch ( const std::exception &e ) { - log() << "std::exception " << e.what() << endl; - replInfo = "replMain caught std::exception"; - } - catch ( ... ) { - log() << "unexpected exception during replication. replication will halt" << endl; - replAllDead = "caught unexpected exception during replication"; - } - if ( res < 0 ) - s->oplogReader.resetConnection(); - } - return sleepAdvice; + if (sources.empty()) { + /* replication is not configured yet (for --slave) in local.sources. Poll for config it + every 20 seconds. + */ + log() << "no source given, add a master to local.sources to start replication" << endl; + return 20; } - static void replMain(OperationContext* txn) { - ReplSource::SourceVector sources; - while ( 1 ) { - int s = 0; - { - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lk(txn->lockState()); - if ( replAllDead ) { - // throttledForceResyncDead can throw - if ( !getGlobalReplicationCoordinator()->getSettings().autoresync || - !ReplSource::throttledForceResyncDead( txn, "auto" ) ) { - log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds" << endl; - break; - } - } - verify( syncing == 0 ); // i.e., there is only one sync thread running. we will want to change/fix this. - syncing++; + int sleepAdvice = 1; + for (ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++) { + ReplSource* s = i->get(); + int res = -1; + try { + res = s->sync(txn, nApplied); + bool moreToSync = s->haveMoreDbsToSync(); + if (res < 0) { + sleepAdvice = 3; + } else if (moreToSync) { + sleepAdvice = 0; + } else if (s->sleepAdvice()) { + sleepAdvice = s->sleepAdvice(); + } else + sleepAdvice = res; + } catch (const SyncException&) { + log() << "caught SyncException" << endl; + return 10; + } catch (AssertionException& e) { + if (e.severe()) { + log() << "replMain AssertionException " << e.what() << endl; + return 60; + } else { + log() << "AssertionException " << e.what() << endl; } + replInfo = "replMain caught AssertionException"; + } catch (const DBException& e) { + log() << "DBException " << e.what() << endl; + replInfo = "replMain caught DBException"; + } catch (const std::exception& e) { + log() << "std::exception " << e.what() << endl; + replInfo = "replMain caught std::exception"; + } catch (...) { + log() << "unexpected exception during replication. replication will halt" << endl; + replAllDead = "caught unexpected exception during replication"; + } + if (res < 0) + s->oplogReader.resetConnection(); + } + return sleepAdvice; +} - try { - int nApplied = 0; - s = _replMain(txn, sources, nApplied); - if( s == 1 ) { - if( nApplied == 0 ) s = 2; - else if( nApplied > 100 ) { - // sleep very little - just enough that we aren't truly hammering master - sleepmillis(75); - s = 0; - } +static void replMain(OperationContext* txn) { + ReplSource::SourceVector sources; + while (1) { + int s = 0; + { + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); + if (replAllDead) { + // throttledForceResyncDead can throw + if (!getGlobalReplicationCoordinator()->getSettings().autoresync || + !ReplSource::throttledForceResyncDead(txn, "auto")) { + log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds" + << endl; + break; } } - catch (...) { - log() << "caught exception in _replMain" << endl; - s = 4; - } - - { - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite lk(txn->lockState()); - verify( syncing == 1 ); - syncing--; - } - - if( relinquishSyncingSome ) { - relinquishSyncingSome = 0; - s = 1; // sleep before going back in to syncing=1 - } + verify( + syncing == + 0); // i.e., there is only one sync thread running. we will want to change/fix this. + syncing++; + } - if ( s ) { - stringstream ss; - ss << "sleep " << s << " sec before next pass"; - string msg = ss.str(); - if (!serverGlobalParams.quiet) - log() << msg << endl; - ReplInfo r(msg.c_str()); - sleepsecs(s); + try { + int nApplied = 0; + s = _replMain(txn, sources, nApplied); + if (s == 1) { + if (nApplied == 0) + s = 2; + else if (nApplied > 100) { + // sleep very little - just enough that we aren't truly hammering master + sleepmillis(75); + s = 0; + } } + } catch (...) { + log() << "caught exception in _replMain" << endl; + s = 4; } - } - - static void replMasterThread() { - sleepsecs(4); - Client::initThread("replmaster"); - int toSleep = 10; - while( 1 ) { - sleepsecs(toSleep); - // Write a keep-alive like entry to the log. This will make things like - // printReplicationStatus() and printSlaveReplicationStatus() stay up-to-date even - // when things are idle. - OperationContextImpl txn; - AuthorizationSession::get(txn.getClient())->grantInternalAuthorization(); + { + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite lk(txn->lockState()); + verify(syncing == 1); + syncing--; + } - Lock::GlobalWrite globalWrite(txn.lockState(), 1); - if (globalWrite.isLocked()) { - toSleep = 10; + if (relinquishSyncingSome) { + relinquishSyncingSome = 0; + s = 1; // sleep before going back in to syncing=1 + } - try { - WriteUnitOfWork wuow(&txn); - getGlobalServiceContext()->getOpObserver()->onOpMessage(&txn, BSONObj()); - wuow.commit(); - } - catch (...) { - log() << "caught exception in replMasterThread()" << endl; - } - } - else { - LOG(5) << "couldn't logKeepalive" << endl; - toSleep = 1; - } + if (s) { + stringstream ss; + ss << "sleep " << s << " sec before next pass"; + string msg = ss.str(); + if (!serverGlobalParams.quiet) + log() << msg << endl; + ReplInfo r(msg.c_str()); + sleepsecs(s); } } - - static void replSlaveThread() { - sleepsecs(1); - Client::initThread("replslave"); - +} + +static void replMasterThread() { + sleepsecs(4); + Client::initThread("replmaster"); + int toSleep = 10; + while (1) { + sleepsecs(toSleep); + + // Write a keep-alive like entry to the log. This will make things like + // printReplicationStatus() and printSlaveReplicationStatus() stay up-to-date even + // when things are idle. OperationContextImpl txn; AuthorizationSession::get(txn.getClient())->grantInternalAuthorization(); - DisableDocumentValidation validationDisabler(&txn); - while ( 1 ) { + Lock::GlobalWrite globalWrite(txn.lockState(), 1); + if (globalWrite.isLocked()) { + toSleep = 10; + try { - replMain(&txn); - sleepsecs(5); - } - catch ( AssertionException& ) { - ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry"); - log() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl; - sleepsecs(300); - } - catch ( DBException& e ) { - log() << "exception in replSlaveThread(): " << e.what() - << ", sleeping 5 minutes before retry" << endl; - sleepsecs(300); - } - catch ( ... ) { - log() << "error in replSlaveThread(): sleeping 5 minutes before retry" << endl; - sleepsecs(300); + WriteUnitOfWork wuow(&txn); + getGlobalServiceContext()->getOpObserver()->onOpMessage(&txn, BSONObj()); + wuow.commit(); + } catch (...) { + log() << "caught exception in replMasterThread()" << endl; } + } else { + LOG(5) << "couldn't logKeepalive" << endl; + toSleep = 1; } } +} - void startMasterSlave(OperationContext* txn) { - - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); - if( !replSettings.slave && !replSettings.master ) - return; +static void replSlaveThread() { + sleepsecs(1); + Client::initThread("replslave"); - AuthorizationSession::get(txn->getClient())->grantInternalAuthorization(); + OperationContextImpl txn; + AuthorizationSession::get(txn.getClient())->grantInternalAuthorization(); + DisableDocumentValidation validationDisabler(&txn); - { - ReplSource temp(txn); // Ensures local.me is populated + while (1) { + try { + replMain(&txn); + sleepsecs(5); + } catch (AssertionException&) { + ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry"); + log() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl; + sleepsecs(300); + } catch (DBException& e) { + log() << "exception in replSlaveThread(): " << e.what() + << ", sleeping 5 minutes before retry" << endl; + sleepsecs(300); + } catch (...) { + log() << "error in replSlaveThread(): sleeping 5 minutes before retry" << endl; + sleepsecs(300); } + } +} - if ( replSettings.slave ) { - verify( replSettings.slave == SimpleSlave ); - LOG(1) << "slave=true" << endl; - stdx::thread repl_thread(replSlaveThread); - } +void startMasterSlave(OperationContext* txn) { + const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); + if (!replSettings.slave && !replSettings.master) + return; - if ( replSettings.master ) { - LOG(1) << "master=true" << endl; - createOplog(txn); - stdx::thread t(replMasterThread); - } + AuthorizationSession::get(txn->getClient())->grantInternalAuthorization(); - if (replSettings.fastsync) { - while(!_replMainStarted) // don't allow writes until we've set up from log - sleepmillis( 50 ); - } + { + ReplSource temp(txn); // Ensures local.me is populated } - int _dummy_z; - - void pretouchN(vector<BSONObj>& v, unsigned a, unsigned b) { - Client::initThreadIfNotAlready("pretouchN"); - OperationContextImpl txn; // XXX - ScopedTransaction transaction(&txn, MODE_S); - Lock::GlobalRead lk(txn.lockState()); + if (replSettings.slave) { + verify(replSettings.slave == SimpleSlave); + LOG(1) << "slave=true" << endl; + stdx::thread repl_thread(replSlaveThread); + } - for( unsigned i = a; i <= b; i++ ) { - const BSONObj& op = v[i]; - const char *which = "o"; - const char *opType = op.getStringField("op"); - if ( *opType == 'i' ) - ; - else if( *opType == 'u' ) - which = "o2"; - else - continue; - /* todo : other operations */ + if (replSettings.master) { + LOG(1) << "master=true" << endl; + createOplog(txn); + stdx::thread t(replMasterThread); + } - try { - BSONObj o = op.getObjectField(which); - BSONElement _id; - if( o.getObjectID(_id) ) { - const char *ns = op.getStringField("ns"); - BSONObjBuilder b; - b.append(_id); - BSONObj result; - OldClientContext ctx(&txn, ns); - if( Helpers::findById(&txn, ctx.db(), ns, b.done(), result) ) - _dummy_z += result.objsize(); // touch - } - } - catch( DBException& e ) { - log() << "ignoring assertion in pretouchN() " << a << ' ' << b << ' ' << i << ' ' << e.toString() << endl; - } - } + if (replSettings.fastsync) { + while (!_replMainStarted) // don't allow writes until we've set up from log + sleepmillis(50); } +} +int _dummy_z; - void pretouchOperation(OperationContext* txn, const BSONObj& op) { +void pretouchN(vector<BSONObj>& v, unsigned a, unsigned b) { + Client::initThreadIfNotAlready("pretouchN"); - if (txn->lockState()->isWriteLocked()) { - return; // no point pretouching if write locked. not sure if this will ever fire, but just in case. - } + OperationContextImpl txn; // XXX + ScopedTransaction transaction(&txn, MODE_S); + Lock::GlobalRead lk(txn.lockState()); - const char *which = "o"; - const char *opType = op.getStringField("op"); - if ( *opType == 'i' ) + for (unsigned i = a; i <= b; i++) { + const BSONObj& op = v[i]; + const char* which = "o"; + const char* opType = op.getStringField("op"); + if (*opType == 'i') ; - else if( *opType == 'u' ) + else if (*opType == 'u') which = "o2"; else - return; + continue; /* todo : other operations */ try { BSONObj o = op.getObjectField(which); BSONElement _id; - if( o.getObjectID(_id) ) { - const char *ns = op.getStringField("ns"); + if (o.getObjectID(_id)) { + const char* ns = op.getStringField("ns"); BSONObjBuilder b; b.append(_id); BSONObj result; - AutoGetCollectionForRead ctx(txn, ns ); - if (Helpers::findById(txn, ctx.getDb(), ns, b.done(), result)) { - _dummy_z += result.objsize(); // touch - } + OldClientContext ctx(&txn, ns); + if (Helpers::findById(&txn, ctx.db(), ns, b.done(), result)) + _dummy_z += result.objsize(); // touch } + } catch (DBException& e) { + log() << "ignoring assertion in pretouchN() " << a << ' ' << b << ' ' << i << ' ' + << e.toString() << endl; } - catch( DBException& ) { - log() << "ignoring assertion in pretouchOperation()" << endl; + } +} + +void pretouchOperation(OperationContext* txn, const BSONObj& op) { + if (txn->lockState()->isWriteLocked()) { + return; // no point pretouching if write locked. not sure if this will ever fire, but just in case. + } + + const char* which = "o"; + const char* opType = op.getStringField("op"); + if (*opType == 'i') + ; + else if (*opType == 'u') + which = "o2"; + else + return; + /* todo : other operations */ + + try { + BSONObj o = op.getObjectField(which); + BSONElement _id; + if (o.getObjectID(_id)) { + const char* ns = op.getStringField("ns"); + BSONObjBuilder b; + b.append(_id); + BSONObj result; + AutoGetCollectionForRead ctx(txn, ns); + if (Helpers::findById(txn, ctx.getDb(), ns, b.done(), result)) { + _dummy_z += result.objsize(); // touch + } } + } catch (DBException&) { + log() << "ignoring assertion in pretouchOperation()" << endl; } +} -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |