summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/master_slave.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/master_slave.cpp')
-rw-r--r--src/mongo/db/repl/master_slave.cpp2280
1 files changed, 1129 insertions, 1151 deletions
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index a1a58527b62..1e1bd428d39 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -82,1374 +82,1352 @@ using std::vector;
namespace mongo {
namespace repl {
- void pretouchOperation(OperationContext* txn, const BSONObj& op);
- void pretouchN(vector<BSONObj>&, unsigned a, unsigned b);
+void pretouchOperation(OperationContext* txn, const BSONObj& op);
+void pretouchN(vector<BSONObj>&, unsigned a, unsigned b);
- /* if 1 sync() is running */
- volatile int syncing = 0;
- volatile int relinquishSyncingSome = 0;
+/* if 1 sync() is running */
+volatile int syncing = 0;
+volatile int relinquishSyncingSome = 0;
- static time_t lastForcedResync = 0;
+static time_t lastForcedResync = 0;
- /* output by the web console */
- const char *replInfo = "";
- struct ReplInfo {
- ReplInfo(const char *msg) {
- replInfo = msg;
- }
- ~ReplInfo() {
- replInfo = "?";
- }
- };
-
-
- ReplSource::ReplSource(OperationContext* txn) {
- nClonedThisPass = 0;
- ensureMe(txn);
- }
-
- ReplSource::ReplSource(OperationContext* txn, BSONObj o) : nClonedThisPass(0) {
- only = o.getStringField("only");
- hostName = o.getStringField("host");
- _sourceName = o.getStringField("source");
- uassert( 10118 , "'host' field not set in sources collection object", !hostName.empty() );
- uassert( 10119 , "only source='main' allowed for now with replication", sourceName() == "main" );
- BSONElement e = o.getField("syncedTo");
- if ( !e.eoo() ) {
- uassert(10120, "bad sources 'syncedTo' field value",
- e.type() == Date || e.type() == bsonTimestamp);
- Timestamp tmp( e.date() );
- syncedTo = tmp;
- }
-
- BSONObj dbsObj = o.getObjectField("dbsNextPass");
- if ( !dbsObj.isEmpty() ) {
- BSONObjIterator i(dbsObj);
- while ( 1 ) {
- BSONElement e = i.next();
- if ( e.eoo() )
- break;
- addDbNextPass.insert( e.fieldName() );
- }
- }
-
- dbsObj = o.getObjectField("incompleteCloneDbs");
- if ( !dbsObj.isEmpty() ) {
- BSONObjIterator i(dbsObj);
- while ( 1 ) {
- BSONElement e = i.next();
- if ( e.eoo() )
- break;
- incompleteCloneDbs.insert( e.fieldName() );
- }
- }
- ensureMe(txn);
+/* output by the web console */
+const char* replInfo = "";
+struct ReplInfo {
+ ReplInfo(const char* msg) {
+ replInfo = msg;
+ }
+ ~ReplInfo() {
+ replInfo = "?";
+ }
+};
+
+
+ReplSource::ReplSource(OperationContext* txn) {
+ nClonedThisPass = 0;
+ ensureMe(txn);
+}
+
+ReplSource::ReplSource(OperationContext* txn, BSONObj o) : nClonedThisPass(0) {
+ only = o.getStringField("only");
+ hostName = o.getStringField("host");
+ _sourceName = o.getStringField("source");
+ uassert(10118, "'host' field not set in sources collection object", !hostName.empty());
+ uassert(10119, "only source='main' allowed for now with replication", sourceName() == "main");
+ BSONElement e = o.getField("syncedTo");
+ if (!e.eoo()) {
+ uassert(10120,
+ "bad sources 'syncedTo' field value",
+ e.type() == Date || e.type() == bsonTimestamp);
+ Timestamp tmp(e.date());
+ syncedTo = tmp;
}
- /* Turn our C++ Source object into a BSONObj */
- BSONObj ReplSource::jsobj() {
- BSONObjBuilder b;
- b.append("host", hostName);
- b.append("source", sourceName());
- if ( !only.empty() )
- b.append("only", only);
- if ( !syncedTo.isNull() )
- b.append("syncedTo", syncedTo);
-
- BSONObjBuilder dbsNextPassBuilder;
- int n = 0;
- for ( set<string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++ ) {
- n++;
- dbsNextPassBuilder.appendBool(*i, 1);
+ BSONObj dbsObj = o.getObjectField("dbsNextPass");
+ if (!dbsObj.isEmpty()) {
+ BSONObjIterator i(dbsObj);
+ while (1) {
+ BSONElement e = i.next();
+ if (e.eoo())
+ break;
+ addDbNextPass.insert(e.fieldName());
}
- if ( n )
- b.append("dbsNextPass", dbsNextPassBuilder.done());
+ }
- BSONObjBuilder incompleteCloneDbsBuilder;
- n = 0;
- for ( set<string>::iterator i = incompleteCloneDbs.begin(); i != incompleteCloneDbs.end(); i++ ) {
- n++;
- incompleteCloneDbsBuilder.appendBool(*i, 1);
+ dbsObj = o.getObjectField("incompleteCloneDbs");
+ if (!dbsObj.isEmpty()) {
+ BSONObjIterator i(dbsObj);
+ while (1) {
+ BSONElement e = i.next();
+ if (e.eoo())
+ break;
+ incompleteCloneDbs.insert(e.fieldName());
}
- if ( n )
- b.append("incompleteCloneDbs", incompleteCloneDbsBuilder.done());
-
- return b.obj();
}
+ ensureMe(txn);
+}
+
+/* Turn our C++ Source object into a BSONObj */
+BSONObj ReplSource::jsobj() {
+ BSONObjBuilder b;
+ b.append("host", hostName);
+ b.append("source", sourceName());
+ if (!only.empty())
+ b.append("only", only);
+ if (!syncedTo.isNull())
+ b.append("syncedTo", syncedTo);
+
+ BSONObjBuilder dbsNextPassBuilder;
+ int n = 0;
+ for (set<string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++) {
+ n++;
+ dbsNextPassBuilder.appendBool(*i, 1);
+ }
+ if (n)
+ b.append("dbsNextPass", dbsNextPassBuilder.done());
+
+ BSONObjBuilder incompleteCloneDbsBuilder;
+ n = 0;
+ for (set<string>::iterator i = incompleteCloneDbs.begin(); i != incompleteCloneDbs.end(); i++) {
+ n++;
+ incompleteCloneDbsBuilder.appendBool(*i, 1);
+ }
+ if (n)
+ b.append("incompleteCloneDbs", incompleteCloneDbsBuilder.done());
- void ReplSource::ensureMe(OperationContext* txn) {
- string myname = getHostName();
+ return b.obj();
+}
- // local.me is an identifier for a server for getLastError w:2+
- bool exists = Helpers::getSingleton(txn, "local.me", _me);
+void ReplSource::ensureMe(OperationContext* txn) {
+ string myname = getHostName();
- if (!exists || !_me.hasField("host") || _me["host"].String() != myname) {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dblk(txn->lockState(), "local", MODE_X);
- WriteUnitOfWork wunit(txn);
- // clean out local.me
- Helpers::emptyCollection(txn, "local.me");
+ // local.me is an identifier for a server for getLastError w:2+
+ bool exists = Helpers::getSingleton(txn, "local.me", _me);
- // repopulate
- BSONObjBuilder b;
- b.appendOID("_id", 0, true);
- b.append("host", myname);
- _me = b.obj();
- Helpers::putSingleton(txn, "local.me", _me);
- wunit.commit();
- }
- _me = _me.getOwned();
- }
+ if (!exists || !_me.hasField("host") || _me["host"].String() != myname) {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dblk(txn->lockState(), "local", MODE_X);
+ WriteUnitOfWork wunit(txn);
+ // clean out local.me
+ Helpers::emptyCollection(txn, "local.me");
- void ReplSource::save(OperationContext* txn) {
+ // repopulate
BSONObjBuilder b;
- verify( !hostName.empty() );
- b.append("host", hostName);
- // todo: finish allowing multiple source configs.
- // this line doesn't work right when source is null, if that is allowed as it is now:
- //b.append("source", _sourceName);
- BSONObj pattern = b.done();
+ b.appendOID("_id", 0, true);
+ b.append("host", myname);
+ _me = b.obj();
+ Helpers::putSingleton(txn, "local.me", _me);
+ wunit.commit();
+ }
+ _me = _me.getOwned();
+}
- BSONObj o = jsobj();
- LOG( 1 ) << "Saving repl source: " << o << endl;
+void ReplSource::save(OperationContext* txn) {
+ BSONObjBuilder b;
+ verify(!hostName.empty());
+ b.append("host", hostName);
+ // todo: finish allowing multiple source configs.
+ // this line doesn't work right when source is null, if that is allowed as it is now:
+ // b.append("source", _sourceName);
+ BSONObj pattern = b.done();
- {
- OpDebug debug;
+ BSONObj o = jsobj();
+ LOG(1) << "Saving repl source: " << o << endl;
- OldClientContext ctx(txn, "local.sources");
+ {
+ OpDebug debug;
- const NamespaceString requestNs("local.sources");
- UpdateRequest request(requestNs);
+ OldClientContext ctx(txn, "local.sources");
- request.setQuery(pattern);
- request.setUpdates(o);
- request.setUpsert();
+ const NamespaceString requestNs("local.sources");
+ UpdateRequest request(requestNs);
- UpdateResult res = update(txn, ctx.db(), request, &debug);
+ request.setQuery(pattern);
+ request.setUpdates(o);
+ request.setUpsert();
- verify( ! res.modifiers );
- verify( res.numMatched == 1 );
- }
- }
+ UpdateResult res = update(txn, ctx.db(), request, &debug);
- static void addSourceToList(OperationContext* txn,
- ReplSource::SourceVector &v,
- ReplSource& s,
- ReplSource::SourceVector &old) {
- if ( !s.syncedTo.isNull() ) { // Don't reuse old ReplSource if there was a forced resync.
- for ( ReplSource::SourceVector::iterator i = old.begin(); i != old.end(); ) {
- if ( s == **i ) {
- v.push_back(*i);
- old.erase(i);
- return;
- }
- i++;
+ verify(!res.modifiers);
+ verify(res.numMatched == 1);
+ }
+}
+
+static void addSourceToList(OperationContext* txn,
+ ReplSource::SourceVector& v,
+ ReplSource& s,
+ ReplSource::SourceVector& old) {
+ if (!s.syncedTo.isNull()) { // Don't reuse old ReplSource if there was a forced resync.
+ for (ReplSource::SourceVector::iterator i = old.begin(); i != old.end();) {
+ if (s == **i) {
+ v.push_back(*i);
+ old.erase(i);
+ return;
}
+ i++;
}
-
- v.push_back( std::shared_ptr< ReplSource >( new ReplSource( s ) ) );
}
- /* we reuse our existing objects so that we can keep our existing connection
- and cursor in effect.
- */
- void ReplSource::loadAll(OperationContext* txn, SourceVector &v) {
- const char* localSources = "local.sources";
- OldClientContext ctx(txn, localSources);
- SourceVector old = v;
- v.clear();
-
- const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
- if (!replSettings.source.empty()) {
- // --source <host> specified.
- // check that no items are in sources other than that
- // add if missing
- int n = 0;
- unique_ptr<PlanExecutor> exec(
- InternalPlanner::collectionScan(txn,
- localSources,
- ctx.db()->getCollection(localSources)));
- BSONObj obj;
- PlanExecutor::ExecState state;
- while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
- n++;
- ReplSource tmp(txn, obj);
- if (tmp.hostName != replSettings.source) {
- log() << "--source " << replSettings.source << " != " << tmp.hostName
- << " from local.sources collection" << endl;
- log() << "for instructions on changing this slave's source, see:" << endl;
- log() << "http://dochub.mongodb.org/core/masterslave" << endl;
- log() << "terminating mongod after 30 seconds" << endl;
- sleepsecs(30);
- dbexit( EXIT_REPLICATION_ERROR );
- }
- if (tmp.only != replSettings.only) {
- log() << "--only " << replSettings.only << " != " << tmp.only
- << " from local.sources collection" << endl;
- log() << "terminating after 30 seconds" << endl;
- sleepsecs(30);
- dbexit( EXIT_REPLICATION_ERROR );
- }
- }
- uassert(17065, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state);
- uassert( 10002 , "local.sources collection corrupt?", n<2 );
- if ( n == 0 ) {
- // source missing. add.
- ReplSource s(txn);
- s.hostName = replSettings.source;
- s.only = replSettings.only;
- s.save(txn);
- }
- }
- else {
- try {
- massert(10384 , "--only requires use of --source", replSettings.only.empty());
- }
- catch ( ... ) {
- dbexit( EXIT_BADOPTIONS );
- }
- }
+ v.push_back(std::shared_ptr<ReplSource>(new ReplSource(s)));
+}
- unique_ptr<PlanExecutor> exec(
- InternalPlanner::collectionScan(txn,
- localSources,
- ctx.db()->getCollection(localSources)));
+/* we reuse our existing objects so that we can keep our existing connection
+ and cursor in effect.
+*/
+void ReplSource::loadAll(OperationContext* txn, SourceVector& v) {
+ const char* localSources = "local.sources";
+ OldClientContext ctx(txn, localSources);
+ SourceVector old = v;
+ v.clear();
+
+ const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
+ if (!replSettings.source.empty()) {
+ // --source <host> specified.
+ // check that no items are in sources other than that
+ // add if missing
+ int n = 0;
+ unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(
+ txn, localSources, ctx.db()->getCollection(localSources)));
BSONObj obj;
PlanExecutor::ExecState state;
while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
+ n++;
ReplSource tmp(txn, obj);
- if ( tmp.syncedTo.isNull() ) {
- DBDirectClient c(txn);
- BSONObj op = c.findOne( "local.oplog.$main", QUERY( "op" << NE << "n" ).sort( BSON( "$natural" << -1 ) ) );
- if ( !op.isEmpty() ) {
- tmp.syncedTo = op[ "ts" ].timestamp();
- }
+ if (tmp.hostName != replSettings.source) {
+ log() << "--source " << replSettings.source << " != " << tmp.hostName
+ << " from local.sources collection" << endl;
+ log() << "for instructions on changing this slave's source, see:" << endl;
+ log() << "http://dochub.mongodb.org/core/masterslave" << endl;
+ log() << "terminating mongod after 30 seconds" << endl;
+ sleepsecs(30);
+ dbexit(EXIT_REPLICATION_ERROR);
}
- addSourceToList(txn, v, tmp, old);
+ if (tmp.only != replSettings.only) {
+ log() << "--only " << replSettings.only << " != " << tmp.only
+ << " from local.sources collection" << endl;
+ log() << "terminating after 30 seconds" << endl;
+ sleepsecs(30);
+ dbexit(EXIT_REPLICATION_ERROR);
+ }
+ }
+ uassert(17065, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state);
+ uassert(10002, "local.sources collection corrupt?", n < 2);
+ if (n == 0) {
+ // source missing. add.
+ ReplSource s(txn);
+ s.hostName = replSettings.source;
+ s.only = replSettings.only;
+ s.save(txn);
+ }
+ } else {
+ try {
+ massert(10384, "--only requires use of --source", replSettings.only.empty());
+ } catch (...) {
+ dbexit(EXIT_BADOPTIONS);
}
- uassert(17066, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state);
}
- bool ReplSource::throttledForceResyncDead( OperationContext* txn, const char *requester ) {
- if ( time( 0 ) - lastForcedResync > 600 ) {
- forceResyncDead( txn, requester );
- lastForcedResync = time( 0 );
- return true;
+ unique_ptr<PlanExecutor> exec(
+ InternalPlanner::collectionScan(txn, localSources, ctx.db()->getCollection(localSources)));
+ BSONObj obj;
+ PlanExecutor::ExecState state;
+ while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
+ ReplSource tmp(txn, obj);
+ if (tmp.syncedTo.isNull()) {
+ DBDirectClient c(txn);
+ BSONObj op = c.findOne("local.oplog.$main",
+ QUERY("op" << NE << "n").sort(BSON("$natural" << -1)));
+ if (!op.isEmpty()) {
+ tmp.syncedTo = op["ts"].timestamp();
+ }
}
+ addSourceToList(txn, v, tmp, old);
+ }
+ uassert(17066, "Internal error reading from local.sources", PlanExecutor::IS_EOF == state);
+}
+
+bool ReplSource::throttledForceResyncDead(OperationContext* txn, const char* requester) {
+ if (time(0) - lastForcedResync > 600) {
+ forceResyncDead(txn, requester);
+ lastForcedResync = time(0);
+ return true;
+ }
+ return false;
+}
+
+void ReplSource::forceResyncDead(OperationContext* txn, const char* requester) {
+ if (!replAllDead)
+ return;
+ SourceVector sources;
+ ReplSource::loadAll(txn, sources);
+ for (SourceVector::iterator i = sources.begin(); i != sources.end(); ++i) {
+ log() << requester << " forcing resync from " << (*i)->hostName << endl;
+ (*i)->forceResync(txn, requester);
+ }
+ replAllDead = 0;
+}
+
+class HandshakeCmd : public Command {
+public:
+ void help(stringstream& h) const {
+ h << "internal";
+ }
+ HandshakeCmd() : Command("handshake") {}
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
+ virtual bool slaveOk() const {
+ return true;
+ }
+ virtual bool adminOnly() const {
return false;
}
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
- void ReplSource::forceResyncDead( OperationContext* txn, const char *requester ) {
- if ( !replAllDead )
- return;
- SourceVector sources;
- ReplSource::loadAll(txn, sources);
- for( SourceVector::iterator i = sources.begin(); i != sources.end(); ++i ) {
- log() << requester << " forcing resync from " << (*i)->hostName << endl;
- (*i)->forceResync( txn, requester );
- }
- replAllDead = 0;
- }
-
- class HandshakeCmd : public Command {
- public:
- void help(stringstream& h) const { h << "internal"; }
- HandshakeCmd() : Command("handshake") {}
- virtual bool isWriteCommandForConfigServer() const { return false; }
- virtual bool slaveOk() const { return true; }
- virtual bool adminOnly() const { return false; }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ virtual bool run(OperationContext* txn,
+ const string& ns,
+ BSONObj& cmdObj,
+ int options,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ HandshakeArgs handshake;
+ Status status = handshake.initialize(cmdObj);
+ if (!status.isOK()) {
+ return appendCommandStatus(result, status);
}
- virtual bool run(OperationContext* txn,
- const string& ns,
- BSONObj& cmdObj,
- int options,
- string& errmsg,
- BSONObjBuilder& result) {
-
- HandshakeArgs handshake;
- Status status = handshake.initialize(cmdObj);
- if (!status.isOK()) {
- return appendCommandStatus(result, status);
- }
+ ReplClientInfo::forClient(txn->getClient()).setRemoteID(handshake.getRid());
- ReplClientInfo::forClient(txn->getClient()).setRemoteID(handshake.getRid());
+ status = getGlobalReplicationCoordinator()->processHandshake(txn, handshake);
+ return appendCommandStatus(result, status);
+ }
- status = getGlobalReplicationCoordinator()->processHandshake(txn, handshake);
- return appendCommandStatus(result, status);
- }
+} handshakeCmd;
- } handshakeCmd;
+bool replHandshake(DBClientConnection* conn, const OID& myRID) {
+ string myname = getHostName();
- bool replHandshake(DBClientConnection *conn, const OID& myRID) {
- string myname = getHostName();
+ BSONObjBuilder cmd;
+ cmd.append("handshake", myRID);
- BSONObjBuilder cmd;
- cmd.append("handshake", myRID);
+ BSONObj res;
+ bool ok = conn->runCommand("admin", cmd.obj(), res);
+ // ignoring for now on purpose for older versions
+ LOG(ok ? 1 : 0) << "replHandshake result: " << res << endl;
+ return true;
+}
- BSONObj res;
- bool ok = conn->runCommand( "admin" , cmd.obj() , res );
- // ignoring for now on purpose for older versions
- LOG( ok ? 1 : 0 ) << "replHandshake result: " << res << endl;
+bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OID& myRID) {
+ if (reader->conn()) {
return true;
}
- bool ReplSource::_connect(OplogReader* reader, const HostAndPort& host, const OID& myRID) {
- if (reader->conn()) {
- return true;
- }
-
- if (!reader->connect(host)) {
- return false;
- }
-
- if (!replHandshake(reader->conn(), myRID)) {
- return false;
- }
+ if (!reader->connect(host)) {
+ return false;
+ }
- return true;
+ if (!replHandshake(reader->conn(), myRID)) {
+ return false;
}
+ return true;
+}
- void ReplSource::forceResync( OperationContext* txn, const char *requester ) {
- BSONObj info;
- {
- // This is always a GlobalWrite lock (so no ns/db used from the context)
- invariant(txn->lockState()->isW());
- Lock::TempRelease tempRelease(txn->lockState());
- if (!_connect(&oplogReader, HostAndPort(hostName),
- getGlobalReplicationCoordinator()->getMyRID())) {
- msgassertedNoTrace( 14051 , "unable to connect to resync");
- }
- /* todo use getDatabaseNames() method here */
- bool ok = oplogReader.conn()->runCommand("admin",
- BSON("listDatabases" << 1),
- info,
- QueryOption_SlaveOk);
- massert( 10385 , "Unable to get database list", ok );
+void ReplSource::forceResync(OperationContext* txn, const char* requester) {
+ BSONObj info;
+ {
+ // This is always a GlobalWrite lock (so no ns/db used from the context)
+ invariant(txn->lockState()->isW());
+ Lock::TempRelease tempRelease(txn->lockState());
+
+ if (!_connect(&oplogReader,
+ HostAndPort(hostName),
+ getGlobalReplicationCoordinator()->getMyRID())) {
+ msgassertedNoTrace(14051, "unable to connect to resync");
}
+ /* todo use getDatabaseNames() method here */
+ bool ok = oplogReader.conn()->runCommand(
+ "admin", BSON("listDatabases" << 1), info, QueryOption_SlaveOk);
+ massert(10385, "Unable to get database list", ok);
+ }
- BSONObjIterator i( info.getField( "databases" ).embeddedObject() );
- while( i.moreWithEOO() ) {
- BSONElement e = i.next();
- if ( e.eoo() )
- break;
- string name = e.embeddedObject().getField( "name" ).valuestr();
- if ( !e.embeddedObject().getBoolField( "empty" ) ) {
- if ( name != "local" ) {
- if ( only.empty() || only == name ) {
- resyncDrop( txn, name );
- }
+ BSONObjIterator i(info.getField("databases").embeddedObject());
+ while (i.moreWithEOO()) {
+ BSONElement e = i.next();
+ if (e.eoo())
+ break;
+ string name = e.embeddedObject().getField("name").valuestr();
+ if (!e.embeddedObject().getBoolField("empty")) {
+ if (name != "local") {
+ if (only.empty() || only == name) {
+ resyncDrop(txn, name);
}
}
}
- syncedTo = Timestamp();
- addDbNextPass.clear();
- save(txn);
- }
-
- void ReplSource::resyncDrop( OperationContext* txn, const string& db ) {
- log() << "resync: dropping database " << db;
- OldClientContext ctx(txn, db);
- dropDatabase(txn, ctx.db());
}
-
- /* grab initial copy of a database from the master */
- void ReplSource::resync(OperationContext* txn, const std::string& dbName) {
- const std::string db(dbName); // need local copy of the name, we're dropping the original
- resyncDrop( txn, db );
-
- {
- log() << "resync: cloning database " << db << " to get an initial copy" << endl;
- ReplInfo r("resync: cloning a database");
-
- CloneOptions cloneOptions;
- cloneOptions.fromDB = db;
- cloneOptions.slaveOk = true;
- cloneOptions.useReplAuth = true;
- cloneOptions.snapshot = true;
- cloneOptions.mayYield = true;
- cloneOptions.mayBeInterrupted = false;
-
- Cloner cloner;
- Status status = cloner.copyDb(txn,
- db,
- hostName.c_str(),
- cloneOptions,
- NULL);
-
- if (!status.isOK()) {
- if (status.code() == ErrorCodes::DatabaseDifferCase) {
- resyncDrop( txn, db );
- log() << "resync: database " << db
- << " not valid on the master due to a name conflict, dropping.";
- return;
- }
- else {
- log() << "resync of " << db << " from " << hostName
- << " failed due to: " << status.toString();
- throw SyncException();
- }
+ syncedTo = Timestamp();
+ addDbNextPass.clear();
+ save(txn);
+}
+
+void ReplSource::resyncDrop(OperationContext* txn, const string& db) {
+ log() << "resync: dropping database " << db;
+ OldClientContext ctx(txn, db);
+ dropDatabase(txn, ctx.db());
+}
+
+/* grab initial copy of a database from the master */
+void ReplSource::resync(OperationContext* txn, const std::string& dbName) {
+ const std::string db(dbName); // need local copy of the name, we're dropping the original
+ resyncDrop(txn, db);
+
+ {
+ log() << "resync: cloning database " << db << " to get an initial copy" << endl;
+ ReplInfo r("resync: cloning a database");
+
+ CloneOptions cloneOptions;
+ cloneOptions.fromDB = db;
+ cloneOptions.slaveOk = true;
+ cloneOptions.useReplAuth = true;
+ cloneOptions.snapshot = true;
+ cloneOptions.mayYield = true;
+ cloneOptions.mayBeInterrupted = false;
+
+ Cloner cloner;
+ Status status = cloner.copyDb(txn, db, hostName.c_str(), cloneOptions, NULL);
+
+ if (!status.isOK()) {
+ if (status.code() == ErrorCodes::DatabaseDifferCase) {
+ resyncDrop(txn, db);
+ log() << "resync: database " << db
+ << " not valid on the master due to a name conflict, dropping.";
+ return;
+ } else {
+ log() << "resync of " << db << " from " << hostName
+ << " failed due to: " << status.toString();
+ throw SyncException();
}
}
-
- log() << "resync: done with initial clone for db: " << db << endl;
}
- static DatabaseIgnorer ___databaseIgnorer;
+ log() << "resync: done with initial clone for db: " << db << endl;
+}
- void DatabaseIgnorer::doIgnoreUntilAfter( const string &db, const Timestamp &futureOplogTime ) {
- if ( futureOplogTime > _ignores[ db ] ) {
- _ignores[ db ] = futureOplogTime;
- }
+static DatabaseIgnorer ___databaseIgnorer;
+
+void DatabaseIgnorer::doIgnoreUntilAfter(const string& db, const Timestamp& futureOplogTime) {
+ if (futureOplogTime > _ignores[db]) {
+ _ignores[db] = futureOplogTime;
}
+}
- bool DatabaseIgnorer::ignoreAt( const string &db, const Timestamp &currentOplogTime ) {
- if ( _ignores[ db ].isNull() ) {
- return false;
- }
- if ( _ignores[ db ] >= currentOplogTime ) {
- return true;
- } else {
- // The ignore state has expired, so clear it.
- _ignores.erase( db );
- return false;
- }
+bool DatabaseIgnorer::ignoreAt(const string& db, const Timestamp& currentOplogTime) {
+ if (_ignores[db].isNull()) {
+ return false;
+ }
+ if (_ignores[db] >= currentOplogTime) {
+ return true;
+ } else {
+ // The ignore state has expired, so clear it.
+ _ignores.erase(db);
+ return false;
+ }
+}
+
+bool ReplSource::handleDuplicateDbName(OperationContext* txn,
+ const BSONObj& op,
+ const char* ns,
+ const char* db) {
+ // We are already locked at this point
+ if (dbHolder().get(txn, ns) != NULL) {
+ // Database is already present.
+ return true;
+ }
+ BSONElement ts = op.getField("ts");
+ if ((ts.type() == Date || ts.type() == bsonTimestamp) &&
+ ___databaseIgnorer.ignoreAt(db, ts.timestamp())) {
+ // Database is ignored due to a previous indication that it is
+ // missing from master after optime "ts".
+ return false;
+ }
+ if (Database::duplicateUncasedName(db).empty()) {
+ // No duplicate database names are present.
+ return true;
}
- bool ReplSource::handleDuplicateDbName( OperationContext* txn,
- const BSONObj &op,
- const char* ns,
- const char* db ) {
- // We are already locked at this point
- if (dbHolder().get(txn, ns) != NULL) {
- // Database is already present.
- return true;
- }
- BSONElement ts = op.getField( "ts" );
- if ( ( ts.type() == Date || ts.type() == bsonTimestamp ) && ___databaseIgnorer.ignoreAt( db, ts.timestamp() ) ) {
- // Database is ignored due to a previous indication that it is
- // missing from master after optime "ts".
- return false;
- }
- if (Database::duplicateUncasedName(db).empty()) {
- // No duplicate database names are present.
- return true;
+ Timestamp lastTime;
+ bool dbOk = false;
+ {
+ // This is always a GlobalWrite lock (so no ns/db used from the context)
+ invariant(txn->lockState()->isW());
+ Lock::TempRelease(txn->lockState());
+
+ // We always log an operation after executing it (never before), so
+ // a database list will always be valid as of an oplog entry generated
+ // before it was retrieved.
+
+ BSONObj last =
+ oplogReader.findOne(this->ns().c_str(), Query().sort(BSON("$natural" << -1)));
+ if (!last.isEmpty()) {
+ BSONElement ts = last.getField("ts");
+ massert(14032,
+ "Invalid 'ts' in remote log",
+ ts.type() == Date || ts.type() == bsonTimestamp);
+ lastTime = Timestamp(ts.date());
}
- Timestamp lastTime;
- bool dbOk = false;
- {
- // This is always a GlobalWrite lock (so no ns/db used from the context)
- invariant(txn->lockState()->isW());
- Lock::TempRelease(txn->lockState());
-
- // We always log an operation after executing it (never before), so
- // a database list will always be valid as of an oplog entry generated
- // before it was retrieved.
-
- BSONObj last = oplogReader.findOne( this->ns().c_str(), Query().sort( BSON( "$natural" << -1 ) ) );
- if ( !last.isEmpty() ) {
- BSONElement ts = last.getField( "ts" );
- massert(14032, "Invalid 'ts' in remote log",
- ts.type() == Date || ts.type() == bsonTimestamp);
- lastTime = Timestamp( ts.date() );
- }
-
- BSONObj info;
- bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info );
- massert( 14033, "Unable to get database list", ok );
- BSONObjIterator i( info.getField( "databases" ).embeddedObject() );
- while( i.more() ) {
- BSONElement e = i.next();
-
- const char * name = e.embeddedObject().getField( "name" ).valuestr();
- if ( strcasecmp( name, db ) != 0 )
- continue;
+ BSONObj info;
+ bool ok = oplogReader.conn()->runCommand("admin", BSON("listDatabases" << 1), info);
+ massert(14033, "Unable to get database list", ok);
+ BSONObjIterator i(info.getField("databases").embeddedObject());
+ while (i.more()) {
+ BSONElement e = i.next();
- if ( strcmp( name, db ) == 0 ) {
- // The db exists on master, still need to check that no conflicts exist there.
- dbOk = true;
- continue;
- }
+ const char* name = e.embeddedObject().getField("name").valuestr();
+ if (strcasecmp(name, db) != 0)
+ continue;
- // The master has a db name that conflicts with the requested name.
- dbOk = false;
- break;
+ if (strcmp(name, db) == 0) {
+ // The db exists on master, still need to check that no conflicts exist there.
+ dbOk = true;
+ continue;
}
- }
- if ( !dbOk ) {
- ___databaseIgnorer.doIgnoreUntilAfter( db, lastTime );
- incompleteCloneDbs.erase(db);
- addDbNextPass.erase(db);
- return false;
+ // The master has a db name that conflicts with the requested name.
+ dbOk = false;
+ break;
}
+ }
- // Check for duplicates again, since we released the lock above.
- set< string > duplicates;
- Database::duplicateUncasedName(db, &duplicates);
+ if (!dbOk) {
+ ___databaseIgnorer.doIgnoreUntilAfter(db, lastTime);
+ incompleteCloneDbs.erase(db);
+ addDbNextPass.erase(db);
+ return false;
+ }
- // The database is present on the master and no conflicting databases
- // are present on the master. Drop any local conflicts.
- for( set< string >::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i ) {
- ___databaseIgnorer.doIgnoreUntilAfter( *i, lastTime );
- incompleteCloneDbs.erase(*i);
- addDbNextPass.erase(*i);
+ // Check for duplicates again, since we released the lock above.
+ set<string> duplicates;
+ Database::duplicateUncasedName(db, &duplicates);
- OldClientContext ctx(txn, *i);
- dropDatabase(txn, ctx.db());
- }
+ // The database is present on the master and no conflicting databases
+ // are present on the master. Drop any local conflicts.
+ for (set<string>::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i) {
+ ___databaseIgnorer.doIgnoreUntilAfter(*i, lastTime);
+ incompleteCloneDbs.erase(*i);
+ addDbNextPass.erase(*i);
- massert(14034, "Duplicate database names present after attempting to delete duplicates",
- Database::duplicateUncasedName(db).empty());
- return true;
+ OldClientContext ctx(txn, *i);
+ dropDatabase(txn, ctx.db());
}
- void ReplSource::applyCommand(OperationContext* txn, const BSONObj& op) {
- try {
- Status status = applyCommand_inlock(txn, op);
- if (!status.isOK()) {
- SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc());
- sync.setHostname(hostName);
- if (sync.shouldRetry(txn, op)) {
- uassert(28639,
- "Failure retrying initial sync update",
- applyCommand_inlock(txn, op).isOK());
- }
+ massert(14034,
+ "Duplicate database names present after attempting to delete duplicates",
+ Database::duplicateUncasedName(db).empty());
+ return true;
+}
+
+void ReplSource::applyCommand(OperationContext* txn, const BSONObj& op) {
+ try {
+ Status status = applyCommand_inlock(txn, op);
+ if (!status.isOK()) {
+ SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc());
+ sync.setHostname(hostName);
+ if (sync.shouldRetry(txn, op)) {
+ uassert(28639,
+ "Failure retrying initial sync update",
+ applyCommand_inlock(txn, op).isOK());
}
}
- catch ( UserException& e ) {
- log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;;
- }
- catch ( DBException& e ) {
- log() << "sync: caught db exception " << e << " while applying op: " << op << endl;;
- }
-
+ } catch (UserException& e) {
+ log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;
+ ;
+ } catch (DBException& e) {
+ log() << "sync: caught db exception " << e << " while applying op: " << op << endl;
+ ;
}
-
- void ReplSource::applyOperation(OperationContext* txn, Database* db, const BSONObj& op) {
- try {
- Status status = applyOperation_inlock( txn, db, op );
- if (!status.isOK()) {
- SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc());
- sync.setHostname(hostName);
- if (sync.shouldRetry(txn, op)) {
- uassert(15914,
- "Failure retrying initial sync update",
- applyOperation_inlock(txn, db, op).isOK());
- }
+}
+
+void ReplSource::applyOperation(OperationContext* txn, Database* db, const BSONObj& op) {
+ try {
+ Status status = applyOperation_inlock(txn, db, op);
+ if (!status.isOK()) {
+ SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc());
+ sync.setHostname(hostName);
+ if (sync.shouldRetry(txn, op)) {
+ uassert(15914,
+ "Failure retrying initial sync update",
+ applyOperation_inlock(txn, db, op).isOK());
}
}
- catch ( UserException& e ) {
- log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;;
- }
- catch ( DBException& e ) {
- log() << "sync: caught db exception " << e << " while applying op: " << op << endl;;
- }
-
+ } catch (UserException& e) {
+ log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;
+ ;
+ } catch (DBException& e) {
+ log() << "sync: caught db exception " << e << " while applying op: " << op << endl;
+ ;
}
+}
- /* local.$oplog.main is of the form:
- { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
- ...
- see logOp() comments.
+/* local.$oplog.main is of the form:
+ { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
+ ...
+ see logOp() comments.
- @param alreadyLocked caller already put us in write lock if true
- */
- void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn, BSONObj& op, bool alreadyLocked) {
- LOG(6) << "processing op: " << op << endl;
-
- if( op.getStringField("op")[0] == 'n' )
- return;
-
- char clientName[MaxDatabaseNameLen];
- const char *ns = op.getStringField("ns");
- nsToDatabase(ns, clientName);
-
- if ( *ns == '.' ) {
- log() << "skipping bad op in oplog: " << op.toString() << endl;
- return;
- }
- else if ( *ns == 0 ) {
- /*if( op.getStringField("op")[0] != 'n' )*/ {
- log() << "halting replication, bad op in oplog:\n " << op.toString() << endl;
- replAllDead = "bad object in oplog";
- throw SyncException();
- }
- //ns = "local.system.x";
- //nsToDatabase(ns, clientName);
+ @param alreadyLocked caller already put us in write lock if true
+*/
+void ReplSource::_sync_pullOpLog_applyOperation(OperationContext* txn,
+ BSONObj& op,
+ bool alreadyLocked) {
+ LOG(6) << "processing op: " << op << endl;
+
+ if (op.getStringField("op")[0] == 'n')
+ return;
+
+ char clientName[MaxDatabaseNameLen];
+ const char* ns = op.getStringField("ns");
+ nsToDatabase(ns, clientName);
+
+ if (*ns == '.') {
+ log() << "skipping bad op in oplog: " << op.toString() << endl;
+ return;
+ } else if (*ns == 0) {
+ /*if( op.getStringField("op")[0] != 'n' )*/ {
+ log() << "halting replication, bad op in oplog:\n " << op.toString() << endl;
+ replAllDead = "bad object in oplog";
+ throw SyncException();
}
+ // ns = "local.system.x";
+ // nsToDatabase(ns, clientName);
+ }
- if ( !only.empty() && only != clientName )
- return;
-
- // Push the CurOp stack for "txn" so each individual oplog entry application is separately
- // reported.
- CurOp individualOp(txn);
- txn->setReplicatedWrites(false);
- const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
- if (replSettings.pretouch &&
- !alreadyLocked/*doesn't make sense if in write lock already*/) {
- if (replSettings.pretouch > 1) {
- /* note: this is bad - should be put in ReplSource. but this is first test... */
- static int countdown;
- verify( countdown >= 0 );
- if( countdown > 0 ) {
- countdown--; // was pretouched on a prev pass
+ if (!only.empty() && only != clientName)
+ return;
+
+ // Push the CurOp stack for "txn" so each individual oplog entry application is separately
+ // reported.
+ CurOp individualOp(txn);
+ txn->setReplicatedWrites(false);
+ const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
+ if (replSettings.pretouch && !alreadyLocked /*doesn't make sense if in write lock already*/) {
+ if (replSettings.pretouch > 1) {
+ /* note: this is bad - should be put in ReplSource. but this is first test... */
+ static int countdown;
+ verify(countdown >= 0);
+ if (countdown > 0) {
+ countdown--; // was pretouched on a prev pass
+ } else {
+ const int m = 4;
+ if (tp.get() == 0) {
+ int nthr = min(8, replSettings.pretouch);
+ nthr = max(nthr, 1);
+ tp.reset(new OldThreadPool(nthr));
}
- else {
- const int m = 4;
- if( tp.get() == 0 ) {
- int nthr = min(8, replSettings.pretouch);
- nthr = max(nthr, 1);
- tp.reset( new OldThreadPool(nthr) );
- }
- vector<BSONObj> v;
- oplogReader.peek(v, replSettings.pretouch);
- unsigned a = 0;
- while( 1 ) {
- if( a >= v.size() ) break;
- unsigned b = a + m - 1; // v[a..b]
- if( b >= v.size() ) b = v.size() - 1;
- tp->schedule(pretouchN, v, a, b);
- DEV cout << "pretouch task: " << a << ".." << b << endl;
- a += m;
- }
- // we do one too...
- pretouchOperation(txn, op);
- tp->join();
- countdown = v.size();
+ vector<BSONObj> v;
+ oplogReader.peek(v, replSettings.pretouch);
+ unsigned a = 0;
+ while (1) {
+ if (a >= v.size())
+ break;
+ unsigned b = a + m - 1; // v[a..b]
+ if (b >= v.size())
+ b = v.size() - 1;
+ tp->schedule(pretouchN, v, a, b);
+ DEV cout << "pretouch task: " << a << ".." << b << endl;
+ a += m;
}
- }
- else {
+ // we do one too...
pretouchOperation(txn, op);
+ tp->join();
+ countdown = v.size();
}
+ } else {
+ pretouchOperation(txn, op);
}
+ }
- unique_ptr<Lock::GlobalWrite> lk(alreadyLocked ? 0 : new Lock::GlobalWrite(txn->lockState()));
+ unique_ptr<Lock::GlobalWrite> lk(alreadyLocked ? 0 : new Lock::GlobalWrite(txn->lockState()));
- if ( replAllDead ) {
- // hmmm why is this check here and not at top of this function? does it get set between top and here?
- log() << "replAllDead, throwing SyncException: " << replAllDead << endl;
- throw SyncException();
- }
+ if (replAllDead) {
+ // hmmm why is this check here and not at top of this function? does it get set between top and here?
+ log() << "replAllDead, throwing SyncException: " << replAllDead << endl;
+ throw SyncException();
+ }
- if (!handleDuplicateDbName(txn, op, ns, clientName)) {
- return;
- }
+ if (!handleDuplicateDbName(txn, op, ns, clientName)) {
+ return;
+ }
- // special case apply for commands to avoid implicit database creation
- if (*op.getStringField("op") == 'c') {
- applyCommand(txn, op);
- return;
- }
+ // special case apply for commands to avoid implicit database creation
+ if (*op.getStringField("op") == 'c') {
+ applyCommand(txn, op);
+ return;
+ }
- // This code executes on the slaves only, so it doesn't need to be sharding-aware since
- // mongos will not send requests there. That's why the last argument is false (do not do
- // version checking).
- OldClientContext ctx(txn, ns, false);
-
- bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData();
- bool incompleteClone = incompleteCloneDbs.count( clientName ) != 0;
-
- LOG(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty << ", incompleteClone: " << incompleteClone << endl;
-
- if ( ctx.justCreated() || empty || incompleteClone ) {
- // we must add to incomplete list now that setClient has been called
- incompleteCloneDbs.insert( clientName );
- if ( nClonedThisPass ) {
- /* we only clone one database per pass, even if a lot need done. This helps us
- avoid overflowing the master's transaction log by doing too much work before going
- back to read more transactions. (Imagine a scenario of slave startup where we try to
- clone 100 databases in one pass.)
- */
- addDbNextPass.insert( clientName );
- }
- else {
- if ( incompleteClone ) {
- log() << "An earlier initial clone of '" << clientName << "' did not complete, now resyncing." << endl;
- }
- save(txn);
- OldClientContext ctx(txn, ns);
- nClonedThisPass++;
- resync(txn, ctx.db()->name());
- addDbNextPass.erase(clientName);
- incompleteCloneDbs.erase( clientName );
+ // This code executes on the slaves only, so it doesn't need to be sharding-aware since
+ // mongos will not send requests there. That's why the last argument is false (do not do
+ // version checking).
+ OldClientContext ctx(txn, ns, false);
+
+ bool empty = !ctx.db()->getDatabaseCatalogEntry()->hasUserData();
+ bool incompleteClone = incompleteCloneDbs.count(clientName) != 0;
+
+ LOG(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty
+ << ", incompleteClone: " << incompleteClone << endl;
+
+ if (ctx.justCreated() || empty || incompleteClone) {
+ // we must add to incomplete list now that setClient has been called
+ incompleteCloneDbs.insert(clientName);
+ if (nClonedThisPass) {
+ /* we only clone one database per pass, even if a lot need done. This helps us
+ avoid overflowing the master's transaction log by doing too much work before going
+ back to read more transactions. (Imagine a scenario of slave startup where we try to
+ clone 100 databases in one pass.)
+ */
+ addDbNextPass.insert(clientName);
+ } else {
+ if (incompleteClone) {
+ log() << "An earlier initial clone of '" << clientName
+ << "' did not complete, now resyncing." << endl;
}
save(txn);
+ OldClientContext ctx(txn, ns);
+ nClonedThisPass++;
+ resync(txn, ctx.db()->name());
+ addDbNextPass.erase(clientName);
+ incompleteCloneDbs.erase(clientName);
}
- else {
- applyOperation(txn, ctx.db(), op);
- addDbNextPass.erase( clientName );
- }
+ save(txn);
+ } else {
+ applyOperation(txn, ctx.db(), op);
+ addDbNextPass.erase(clientName);
}
+}
- void ReplSource::syncToTailOfRemoteLog() {
- string _ns = ns();
- BSONObjBuilder b;
- if ( !only.empty() ) {
- b.appendRegex("ns", string("^") + pcrecpp::RE::QuoteMeta( only ));
- }
- BSONObj last = oplogReader.findOne( _ns.c_str(), Query( b.done() ).sort( BSON( "$natural" << -1 ) ) );
- if ( !last.isEmpty() ) {
- BSONElement ts = last.getField( "ts" );
- massert(10386, "non Date ts found: " + last.toString(),
- ts.type() == Date || ts.type() == bsonTimestamp);
- syncedTo = Timestamp( ts.date() );
- }
+void ReplSource::syncToTailOfRemoteLog() {
+ string _ns = ns();
+ BSONObjBuilder b;
+ if (!only.empty()) {
+ b.appendRegex("ns", string("^") + pcrecpp::RE::QuoteMeta(only));
+ }
+ BSONObj last = oplogReader.findOne(_ns.c_str(), Query(b.done()).sort(BSON("$natural" << -1)));
+ if (!last.isEmpty()) {
+ BSONElement ts = last.getField("ts");
+ massert(10386,
+ "non Date ts found: " + last.toString(),
+ ts.type() == Date || ts.type() == bsonTimestamp);
+ syncedTo = Timestamp(ts.date());
}
+}
- class ReplApplyBatchSize : public ServerParameter {
- public:
- ReplApplyBatchSize()
- : ServerParameter( ServerParameterSet::getGlobal(), "replApplyBatchSize" ),
- _value( 1 ) {
- }
+class ReplApplyBatchSize : public ServerParameter {
+public:
+ ReplApplyBatchSize()
+ : ServerParameter(ServerParameterSet::getGlobal(), "replApplyBatchSize"), _value(1) {}
- int get() const { return _value; }
+ int get() const {
+ return _value;
+ }
+
+ virtual void append(OperationContext* txn, BSONObjBuilder& b, const string& name) {
+ b.append(name, _value);
+ }
- virtual void append(OperationContext* txn, BSONObjBuilder& b, const string& name) {
- b.append( name, _value );
+ virtual Status set(const BSONElement& newValuElement) {
+ return set(newValuElement.numberInt());
+ }
+
+ virtual Status set(int b) {
+ if (b < 1 || b > 1024) {
+ return Status(ErrorCodes::BadValue, "replApplyBatchSize has to be >= 1 and < 1024");
}
- virtual Status set( const BSONElement& newValuElement ) {
- return set( newValuElement.numberInt() );
+ const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
+ if (replSettings.slavedelay != 0 && b > 1) {
+ return Status(ErrorCodes::BadValue, "can't use a batch size > 1 with slavedelay");
+ }
+ if (!replSettings.slave) {
+ return Status(ErrorCodes::BadValue,
+ "can't set replApplyBatchSize on a non-slave machine");
}
- virtual Status set( int b ) {
- if( b < 1 || b > 1024 ) {
- return Status( ErrorCodes::BadValue,
- "replApplyBatchSize has to be >= 1 and < 1024" );
- }
+ _value = b;
+ return Status::OK();
+ }
- const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
- if ( replSettings.slavedelay != 0 && b > 1 ) {
- return Status( ErrorCodes::BadValue,
- "can't use a batch size > 1 with slavedelay" );
- }
- if ( ! replSettings.slave ) {
- return Status( ErrorCodes::BadValue,
- "can't set replApplyBatchSize on a non-slave machine" );
- }
+ virtual Status setFromString(const string& str) {
+ return set(atoi(str.c_str()));
+ }
- _value = b;
- return Status::OK();
- }
+ int _value;
- virtual Status setFromString( const string& str ) {
- return set( atoi( str.c_str() ) );
- }
+} replApplyBatchSize;
- int _value;
-
- } replApplyBatchSize;
-
- /* slave: pull some data from the master's oplog
- note: not yet in db mutex at this point.
- @return -1 error
- 0 ok, don't sleep
- 1 ok, sleep
- */
- int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) {
- int okResultCode = 1;
- string ns = string("local.oplog.$") + sourceName();
- LOG(2) << "sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n';
-
- bool tailing = true;
- oplogReader.tailCheck();
-
- bool initial = syncedTo.isNull();
-
- if ( !oplogReader.haveCursor() || initial ) {
- if ( initial ) {
- // Important to grab last oplog timestamp before listing databases.
- syncToTailOfRemoteLog();
- BSONObj info;
- bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info );
- massert( 10389 , "Unable to get database list", ok );
- BSONObjIterator i( info.getField( "databases" ).embeddedObject() );
- while( i.moreWithEOO() ) {
- BSONElement e = i.next();
- if ( e.eoo() )
- break;
- string name = e.embeddedObject().getField( "name" ).valuestr();
- if ( !e.embeddedObject().getBoolField( "empty" ) ) {
- if ( name != "local" ) {
- if ( only.empty() || only == name ) {
- LOG( 2 ) << "adding to 'addDbNextPass': " << name << endl;
- addDbNextPass.insert( name );
- }
+/* slave: pull some data from the master's oplog
+ note: not yet in db mutex at this point.
+ @return -1 error
+ 0 ok, don't sleep
+ 1 ok, sleep
+*/
+int ReplSource::_sync_pullOpLog(OperationContext* txn, int& nApplied) {
+ int okResultCode = 1;
+ string ns = string("local.oplog.$") + sourceName();
+ LOG(2) << "sync_pullOpLog " << ns << " syncedTo:" << syncedTo.toStringLong() << '\n';
+
+ bool tailing = true;
+ oplogReader.tailCheck();
+
+ bool initial = syncedTo.isNull();
+
+ if (!oplogReader.haveCursor() || initial) {
+ if (initial) {
+ // Important to grab last oplog timestamp before listing databases.
+ syncToTailOfRemoteLog();
+ BSONObj info;
+ bool ok = oplogReader.conn()->runCommand("admin", BSON("listDatabases" << 1), info);
+ massert(10389, "Unable to get database list", ok);
+ BSONObjIterator i(info.getField("databases").embeddedObject());
+ while (i.moreWithEOO()) {
+ BSONElement e = i.next();
+ if (e.eoo())
+ break;
+ string name = e.embeddedObject().getField("name").valuestr();
+ if (!e.embeddedObject().getBoolField("empty")) {
+ if (name != "local") {
+ if (only.empty() || only == name) {
+ LOG(2) << "adding to 'addDbNextPass': " << name << endl;
+ addDbNextPass.insert(name);
}
}
}
- // obviously global isn't ideal, but non-repl set is old so
- // keeping it simple
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lk(txn->lockState());
- save(txn);
}
+ // obviously global isn't ideal, but non-repl set is old so
+ // keeping it simple
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
+ save(txn);
+ }
- BSONObjBuilder gte;
- gte.append("$gte", syncedTo);
- BSONObjBuilder query;
- query.append("ts", gte.done());
- if ( !only.empty() ) {
- // note we may here skip a LOT of data table scanning, a lot of work for the master.
- // maybe append "\\." here?
- query.appendRegex("ns", string("^") + pcrecpp::RE::QuoteMeta( only ));
- }
- BSONObj queryObj = query.done();
- // e.g. queryObj = { ts: { $gte: syncedTo } }
+ BSONObjBuilder gte;
+ gte.append("$gte", syncedTo);
+ BSONObjBuilder query;
+ query.append("ts", gte.done());
+ if (!only.empty()) {
+ // note we may here skip a LOT of data table scanning, a lot of work for the master.
+ // maybe append "\\." here?
+ query.appendRegex("ns", string("^") + pcrecpp::RE::QuoteMeta(only));
+ }
+ BSONObj queryObj = query.done();
+ // e.g. queryObj = { ts: { $gte: syncedTo } }
- oplogReader.tailingQuery(ns.c_str(), queryObj);
- tailing = false;
+ oplogReader.tailingQuery(ns.c_str(), queryObj);
+ tailing = false;
+ } else {
+ LOG(2) << "tailing=true\n";
+ }
+
+ if (!oplogReader.haveCursor()) {
+ log() << "dbclient::query returns null (conn closed?)" << endl;
+ oplogReader.resetConnection();
+ return -1;
+ }
+
+ // show any deferred database creates from a previous pass
+ {
+ set<string>::iterator i = addDbNextPass.begin();
+ if (i != addDbNextPass.end()) {
+ BSONObjBuilder b;
+ b.append("ns", *i + '.');
+ b.append("op", "db");
+ BSONObj op = b.done();
+ _sync_pullOpLog_applyOperation(txn, op, false);
+ }
+ }
+
+ if (!oplogReader.more()) {
+ if (tailing) {
+ LOG(2) << "tailing & no new activity\n";
+ okResultCode = 0; // don't sleep
+
+ } else {
+ log() << ns << " oplog is empty" << endl;
}
- else {
- LOG(2) << "tailing=true\n";
+ {
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
+ save(txn);
}
+ return okResultCode;
+ }
- if( !oplogReader.haveCursor() ) {
- log() << "dbclient::query returns null (conn closed?)" << endl;
- oplogReader.resetConnection();
- return -1;
+ Timestamp nextOpTime;
+ {
+ BSONObj op = oplogReader.next();
+ BSONElement ts = op.getField("ts");
+ if (ts.type() != Date && ts.type() != bsonTimestamp) {
+ string err = op.getStringField("$err");
+ if (!err.empty()) {
+ // 13051 is "tailable cursor requested on non capped collection"
+ if (op.getIntField("code") == 13051) {
+ log() << "trying to slave off of a non-master" << '\n';
+ massert(13344, "trying to slave off of a non-master", false);
+ } else {
+ error() << "$err reading remote oplog: " + err << '\n';
+ massert(10390, "got $err reading remote oplog", false);
+ }
+ } else {
+ error() << "bad object read from remote oplog: " << op.toString() << '\n';
+ massert(10391, "bad object read from remote oplog", false);
+ }
}
- // show any deferred database creates from a previous pass
- {
- set<string>::iterator i = addDbNextPass.begin();
- if ( i != addDbNextPass.end() ) {
- BSONObjBuilder b;
- b.append("ns", *i + '.');
- b.append("op", "db");
- BSONObj op = b.done();
- _sync_pullOpLog_applyOperation(txn, op, false);
+ nextOpTime = Timestamp(ts.date());
+ LOG(2) << "first op time received: " << nextOpTime.toString() << '\n';
+ if (initial) {
+ LOG(1) << "initial run\n";
+ }
+ if (tailing) {
+ if (!(syncedTo < nextOpTime)) {
+ warning() << "ASSERTION failed : syncedTo < nextOpTime" << endl;
+ log() << "syncTo: " << syncedTo.toStringLong() << endl;
+ log() << "nextOpTime: " << nextOpTime.toStringLong() << endl;
+ verify(false);
}
+ oplogReader.putBack(op); // op will be processed in the loop below
+ nextOpTime = Timestamp(); // will reread the op below
+ } else if (nextOpTime != syncedTo) { // didn't get what we queried for - error
+ log() << "nextOpTime " << nextOpTime.toStringLong() << ' '
+ << ((nextOpTime < syncedTo) ? "<??" : ">") << " syncedTo "
+ << syncedTo.toStringLong() << '\n'
+ << "time diff: " << (nextOpTime.getSecs() - syncedTo.getSecs()) << "sec\n"
+ << "tailing: " << tailing << '\n' << "data too stale, halting replication"
+ << endl;
+ replInfo = replAllDead = "data too stale halted replication";
+ verify(syncedTo < nextOpTime);
+ throw SyncException();
+ } else {
+ /* t == syncedTo, so the first op was applied previously or it is the first op of initial query and need not be applied. */
}
+ }
- if ( !oplogReader.more() ) {
- if ( tailing ) {
- LOG(2) << "tailing & no new activity\n";
- okResultCode = 0; // don't sleep
+ // apply operations
+ {
+ int n = 0;
+ time_t saveLast = time(0);
+ while (1) {
+ // we need "&& n" to assure we actually process at least one op to get a sync
+ // point recorded in the first place.
+ const bool moreInitialSyncsPending = !addDbNextPass.empty() && n;
- }
- else {
- log() << ns << " oplog is empty" << endl;
- }
- {
+ if (moreInitialSyncsPending || !oplogReader.more()) {
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite lk(txn->lockState());
- save(txn);
- }
- return okResultCode;
- }
- Timestamp nextOpTime;
- {
- BSONObj op = oplogReader.next();
- BSONElement ts = op.getField("ts");
- if ( ts.type() != Date && ts.type() != bsonTimestamp ) {
- string err = op.getStringField("$err");
- if ( !err.empty() ) {
- // 13051 is "tailable cursor requested on non capped collection"
- if (op.getIntField("code") == 13051) {
- log() << "trying to slave off of a non-master" << '\n';
- massert( 13344 , "trying to slave off of a non-master", false );
- }
- else {
- error() << "$err reading remote oplog: " + err << '\n';
- massert( 10390 , "got $err reading remote oplog", false );
- }
+ if (tailing) {
+ okResultCode = 0; // don't sleep
}
- else {
- error() << "bad object read from remote oplog: " << op.toString() << '\n';
- massert( 10391 , "bad object read from remote oplog", false);
- }
- }
- nextOpTime = Timestamp( ts.date() );
- LOG(2) << "first op time received: " << nextOpTime.toString() << '\n';
- if ( initial ) {
- LOG(1) << "initial run\n";
- }
- if( tailing ) {
- if( !( syncedTo < nextOpTime ) ) {
- warning() << "ASSERTION failed : syncedTo < nextOpTime" << endl;
- log() << "syncTo: " << syncedTo.toStringLong() << endl;
- log() << "nextOpTime: " << nextOpTime.toStringLong() << endl;
- verify(false);
- }
- oplogReader.putBack( op ); // op will be processed in the loop below
- nextOpTime = Timestamp(); // will reread the op below
- }
- else if ( nextOpTime != syncedTo ) { // didn't get what we queried for - error
- log()
- << "nextOpTime " << nextOpTime.toStringLong() << ' '
- << ((nextOpTime < syncedTo) ? "<??" : ">")
- << " syncedTo " << syncedTo.toStringLong() << '\n'
- << "time diff: " << (nextOpTime.getSecs() - syncedTo.getSecs())
- << "sec\n"
- << "tailing: " << tailing << '\n'
- << "data too stale, halting replication" << endl;
- replInfo = replAllDead = "data too stale halted replication";
- verify( syncedTo < nextOpTime );
- throw SyncException();
- }
- else {
- /* t == syncedTo, so the first op was applied previously or it is the first op of initial query and need not be applied. */
+ syncedTo = nextOpTime;
+ save(txn); // note how far we are synced up to now
+ nApplied = n;
+ break;
}
- }
- // apply operations
- {
- int n = 0;
- time_t saveLast = time(0);
- while ( 1 ) {
- // we need "&& n" to assure we actually process at least one op to get a sync
- // point recorded in the first place.
- const bool moreInitialSyncsPending = !addDbNextPass.empty() && n;
-
- if ( moreInitialSyncsPending || !oplogReader.more() ) {
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lk(txn->lockState());
+ OCCASIONALLY if (n > 0 && (n > 100000 || time(0) - saveLast > 60)) {
+ // periodically note our progress, in case we are doing a lot of work and crash
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
+ syncedTo = nextOpTime;
+ // can't update local log ts since there are pending operations from our peer
+ save(txn);
+ log() << "checkpoint applied " << n << " operations" << endl;
+ log() << "syncedTo: " << syncedTo.toStringLong() << endl;
+ saveLast = time(0);
+ n = 0;
+ }
- if (tailing) {
- okResultCode = 0; // don't sleep
- }
+ BSONObj op = oplogReader.next();
- syncedTo = nextOpTime;
- save(txn); // note how far we are synced up to now
- nApplied = n;
- break;
+ int b = replApplyBatchSize.get();
+ bool justOne = b == 1;
+ unique_ptr<Lock::GlobalWrite> lk(justOne ? 0 : new Lock::GlobalWrite(txn->lockState()));
+ while (1) {
+ BSONElement ts = op.getField("ts");
+ if (!(ts.type() == Date || ts.type() == bsonTimestamp)) {
+ log() << "sync error: problem querying remote oplog record" << endl;
+ log() << "op: " << op.toString() << endl;
+ log() << "halting replication" << endl;
+ replInfo = replAllDead = "sync error: no ts found querying remote oplog record";
+ throw SyncException();
}
-
- OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) {
- // periodically note our progress, in case we are doing a lot of work and crash
+ Timestamp last = nextOpTime;
+ nextOpTime = Timestamp(ts.date());
+ if (!(last < nextOpTime)) {
+ log() << "sync error: last applied optime at slave >= nextOpTime from master"
+ << endl;
+ log() << " last: " << last.toStringLong() << endl;
+ log() << " nextOpTime: " << nextOpTime.toStringLong() << endl;
+ log() << " halting replication" << endl;
+ replInfo = replAllDead = "sync error last >= nextOpTime";
+ uassert(
+ 10123,
+ "replication error last applied optime at slave >= nextOpTime from master",
+ false);
+ }
+ const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
+ if (replSettings.slavedelay &&
+ (unsigned(time(0)) < nextOpTime.getSecs() + replSettings.slavedelay)) {
+ verify(justOne);
+ oplogReader.putBack(op);
+ _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1;
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite lk(txn->lockState());
- syncedTo = nextOpTime;
- // can't update local log ts since there are pending operations from our peer
- save(txn);
- log() << "checkpoint applied " << n << " operations" << endl;
+ if (n > 0) {
+ syncedTo = last;
+ save(txn);
+ }
+ log() << "applied " << n << " operations" << endl;
log() << "syncedTo: " << syncedTo.toStringLong() << endl;
- saveLast = time(0);
- n = 0;
+ log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl;
+ return okResultCode;
}
- BSONObj op = oplogReader.next();
-
- int b = replApplyBatchSize.get();
- bool justOne = b == 1;
- unique_ptr<Lock::GlobalWrite> lk(justOne ? 0 : new Lock::GlobalWrite(txn->lockState()));
- while( 1 ) {
-
- BSONElement ts = op.getField("ts");
- if( !( ts.type() == Date || ts.type() == bsonTimestamp ) ) {
- log() << "sync error: problem querying remote oplog record" << endl;
- log() << "op: " << op.toString() << endl;
- log() << "halting replication" << endl;
- replInfo = replAllDead = "sync error: no ts found querying remote oplog record";
- throw SyncException();
- }
- Timestamp last = nextOpTime;
- nextOpTime = Timestamp( ts.date() );
- if ( !( last < nextOpTime ) ) {
- log() << "sync error: last applied optime at slave >= nextOpTime from master" << endl;
- log() << " last: " << last.toStringLong() << endl;
- log() << " nextOpTime: " << nextOpTime.toStringLong() << endl;
- log() << " halting replication" << endl;
- replInfo = replAllDead = "sync error last >= nextOpTime";
- uassert( 10123 , "replication error last applied optime at slave >= nextOpTime from master", false);
- }
- const ReplSettings& replSettings =
- getGlobalReplicationCoordinator()->getSettings();
- if ( replSettings.slavedelay && ( unsigned( time( 0 ) ) < nextOpTime.getSecs() + replSettings.slavedelay ) ) {
- verify( justOne );
- oplogReader.putBack( op );
- _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1;
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lk(txn->lockState());
- if ( n > 0 ) {
- syncedTo = last;
- save(txn);
- }
- log() << "applied " << n << " operations" << endl;
- log() << "syncedTo: " << syncedTo.toStringLong() << endl;
- log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl;
- return okResultCode;
- }
-
- _sync_pullOpLog_applyOperation(txn, op, !justOne);
- n++;
+ _sync_pullOpLog_applyOperation(txn, op, !justOne);
+ n++;
- if( --b == 0 )
- break;
- // if to here, we are doing mulpile applications in a singel write lock acquisition
- if( !oplogReader.moreInCurrentBatch() ) {
- // break if no more in batch so we release lock while reading from the master
- break;
- }
- op = oplogReader.next();
+ if (--b == 0)
+ break;
+ // if to here, we are doing mulpile applications in a singel write lock acquisition
+ if (!oplogReader.moreInCurrentBatch()) {
+ // break if no more in batch so we release lock while reading from the master
+ break;
}
+ op = oplogReader.next();
}
}
-
- return okResultCode;
}
+ return okResultCode;
+}
- /* note: not yet in mutex at this point.
- returns >= 0 if ok. return -1 if you want to reconnect.
- return value of zero indicates no sleep necessary before next call
- */
- int ReplSource::sync(OperationContext* txn, int& nApplied) {
- _sleepAdviceTime = 0;
- ReplInfo r("sync");
- if (!serverGlobalParams.quiet) {
- LogstreamBuilder l = log();
- l << "syncing from ";
- if( sourceName() != "main" ) {
- l << "source:" << sourceName() << ' ';
- }
- l << "host:" << hostName << endl;
- }
- nClonedThisPass = 0;
-
- // FIXME Handle cases where this db isn't on default port, or default port is spec'd in hostName.
- if ((string("localhost") == hostName || string("127.0.0.1") == hostName) &&
- serverGlobalParams.port == ServerGlobalParams::DefaultDBPort) {
- log() << "can't sync from self (localhost). sources configuration may be wrong." << endl;
- sleepsecs(5);
- return -1;
- }
- if ( !_connect(&oplogReader,
- HostAndPort(hostName),
- getGlobalReplicationCoordinator()->getMyRID()) ) {
- LOG(4) << "can't connect to sync source" << endl;
- return -1;
- }
+/* note: not yet in mutex at this point.
+ returns >= 0 if ok. return -1 if you want to reconnect.
+ return value of zero indicates no sleep necessary before next call
+*/
+int ReplSource::sync(OperationContext* txn, int& nApplied) {
+ _sleepAdviceTime = 0;
+ ReplInfo r("sync");
+ if (!serverGlobalParams.quiet) {
+ LogstreamBuilder l = log();
+ l << "syncing from ";
+ if (sourceName() != "main") {
+ l << "source:" << sourceName() << ' ';
+ }
+ l << "host:" << hostName << endl;
+ }
+ nClonedThisPass = 0;
+
+ // FIXME Handle cases where this db isn't on default port, or default port is spec'd in hostName.
+ if ((string("localhost") == hostName || string("127.0.0.1") == hostName) &&
+ serverGlobalParams.port == ServerGlobalParams::DefaultDBPort) {
+ log() << "can't sync from self (localhost). sources configuration may be wrong." << endl;
+ sleepsecs(5);
+ return -1;
+ }
- return _sync_pullOpLog(txn, nApplied);
+ if (!_connect(
+ &oplogReader, HostAndPort(hostName), getGlobalReplicationCoordinator()->getMyRID())) {
+ LOG(4) << "can't connect to sync source" << endl;
+ return -1;
}
- /* --------------------------------------------------------------*/
+ return _sync_pullOpLog(txn, nApplied);
+}
- static bool _replMainStarted = false;
+/* --------------------------------------------------------------*/
- /*
- TODO:
- _ source has autoptr to the cursor
- _ reuse that cursor when we can
- */
+static bool _replMainStarted = false;
- /* returns: # of seconds to sleep before next pass
- 0 = no sleep recommended
- 1 = special sentinel indicating adaptive sleep recommended
- */
- int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nApplied) {
- {
- ReplInfo r("replMain load sources");
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lk(txn->lockState());
- ReplSource::loadAll(txn, sources);
+/*
+TODO:
+_ source has autoptr to the cursor
+_ reuse that cursor when we can
+*/
- // only need this param for initial reset
- _replMainStarted = true;
- }
+/* returns: # of seconds to sleep before next pass
+ 0 = no sleep recommended
+ 1 = special sentinel indicating adaptive sleep recommended
+*/
+int _replMain(OperationContext* txn, ReplSource::SourceVector& sources, int& nApplied) {
+ {
+ ReplInfo r("replMain load sources");
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
+ ReplSource::loadAll(txn, sources);
- if ( sources.empty() ) {
- /* replication is not configured yet (for --slave) in local.sources. Poll for config it
- every 20 seconds.
- */
- log() << "no source given, add a master to local.sources to start replication" << endl;
- return 20;
- }
+ // only need this param for initial reset
+ _replMainStarted = true;
+ }
- int sleepAdvice = 1;
- for ( ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++ ) {
- ReplSource *s = i->get();
- int res = -1;
- try {
- res = s->sync(txn, nApplied);
- bool moreToSync = s->haveMoreDbsToSync();
- if( res < 0 ) {
- sleepAdvice = 3;
- }
- else if( moreToSync ) {
- sleepAdvice = 0;
- }
- else if ( s->sleepAdvice() ) {
- sleepAdvice = s->sleepAdvice();
- }
- else
- sleepAdvice = res;
- }
- catch ( const SyncException& ) {
- log() << "caught SyncException" << endl;
- return 10;
- }
- catch ( AssertionException& e ) {
- if ( e.severe() ) {
- log() << "replMain AssertionException " << e.what() << endl;
- return 60;
- }
- else {
- log() << "AssertionException " << e.what() << endl;
- }
- replInfo = "replMain caught AssertionException";
- }
- catch ( const DBException& e ) {
- log() << "DBException " << e.what() << endl;
- replInfo = "replMain caught DBException";
- }
- catch ( const std::exception &e ) {
- log() << "std::exception " << e.what() << endl;
- replInfo = "replMain caught std::exception";
- }
- catch ( ... ) {
- log() << "unexpected exception during replication. replication will halt" << endl;
- replAllDead = "caught unexpected exception during replication";
- }
- if ( res < 0 )
- s->oplogReader.resetConnection();
- }
- return sleepAdvice;
+ if (sources.empty()) {
+ /* replication is not configured yet (for --slave) in local.sources. Poll for config it
+ every 20 seconds.
+ */
+ log() << "no source given, add a master to local.sources to start replication" << endl;
+ return 20;
}
- static void replMain(OperationContext* txn) {
- ReplSource::SourceVector sources;
- while ( 1 ) {
- int s = 0;
- {
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lk(txn->lockState());
- if ( replAllDead ) {
- // throttledForceResyncDead can throw
- if ( !getGlobalReplicationCoordinator()->getSettings().autoresync ||
- !ReplSource::throttledForceResyncDead( txn, "auto" ) ) {
- log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds" << endl;
- break;
- }
- }
- verify( syncing == 0 ); // i.e., there is only one sync thread running. we will want to change/fix this.
- syncing++;
+ int sleepAdvice = 1;
+ for (ReplSource::SourceVector::iterator i = sources.begin(); i != sources.end(); i++) {
+ ReplSource* s = i->get();
+ int res = -1;
+ try {
+ res = s->sync(txn, nApplied);
+ bool moreToSync = s->haveMoreDbsToSync();
+ if (res < 0) {
+ sleepAdvice = 3;
+ } else if (moreToSync) {
+ sleepAdvice = 0;
+ } else if (s->sleepAdvice()) {
+ sleepAdvice = s->sleepAdvice();
+ } else
+ sleepAdvice = res;
+ } catch (const SyncException&) {
+ log() << "caught SyncException" << endl;
+ return 10;
+ } catch (AssertionException& e) {
+ if (e.severe()) {
+ log() << "replMain AssertionException " << e.what() << endl;
+ return 60;
+ } else {
+ log() << "AssertionException " << e.what() << endl;
}
+ replInfo = "replMain caught AssertionException";
+ } catch (const DBException& e) {
+ log() << "DBException " << e.what() << endl;
+ replInfo = "replMain caught DBException";
+ } catch (const std::exception& e) {
+ log() << "std::exception " << e.what() << endl;
+ replInfo = "replMain caught std::exception";
+ } catch (...) {
+ log() << "unexpected exception during replication. replication will halt" << endl;
+ replAllDead = "caught unexpected exception during replication";
+ }
+ if (res < 0)
+ s->oplogReader.resetConnection();
+ }
+ return sleepAdvice;
+}
- try {
- int nApplied = 0;
- s = _replMain(txn, sources, nApplied);
- if( s == 1 ) {
- if( nApplied == 0 ) s = 2;
- else if( nApplied > 100 ) {
- // sleep very little - just enough that we aren't truly hammering master
- sleepmillis(75);
- s = 0;
- }
+static void replMain(OperationContext* txn) {
+ ReplSource::SourceVector sources;
+ while (1) {
+ int s = 0;
+ {
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
+ if (replAllDead) {
+ // throttledForceResyncDead can throw
+ if (!getGlobalReplicationCoordinator()->getSettings().autoresync ||
+ !ReplSource::throttledForceResyncDead(txn, "auto")) {
+ log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds"
+ << endl;
+ break;
}
}
- catch (...) {
- log() << "caught exception in _replMain" << endl;
- s = 4;
- }
-
- {
- ScopedTransaction transaction(txn, MODE_X);
- Lock::GlobalWrite lk(txn->lockState());
- verify( syncing == 1 );
- syncing--;
- }
-
- if( relinquishSyncingSome ) {
- relinquishSyncingSome = 0;
- s = 1; // sleep before going back in to syncing=1
- }
+ verify(
+ syncing ==
+ 0); // i.e., there is only one sync thread running. we will want to change/fix this.
+ syncing++;
+ }
- if ( s ) {
- stringstream ss;
- ss << "sleep " << s << " sec before next pass";
- string msg = ss.str();
- if (!serverGlobalParams.quiet)
- log() << msg << endl;
- ReplInfo r(msg.c_str());
- sleepsecs(s);
+ try {
+ int nApplied = 0;
+ s = _replMain(txn, sources, nApplied);
+ if (s == 1) {
+ if (nApplied == 0)
+ s = 2;
+ else if (nApplied > 100) {
+ // sleep very little - just enough that we aren't truly hammering master
+ sleepmillis(75);
+ s = 0;
+ }
}
+ } catch (...) {
+ log() << "caught exception in _replMain" << endl;
+ s = 4;
}
- }
-
- static void replMasterThread() {
- sleepsecs(4);
- Client::initThread("replmaster");
- int toSleep = 10;
- while( 1 ) {
- sleepsecs(toSleep);
- // Write a keep-alive like entry to the log. This will make things like
- // printReplicationStatus() and printSlaveReplicationStatus() stay up-to-date even
- // when things are idle.
- OperationContextImpl txn;
- AuthorizationSession::get(txn.getClient())->grantInternalAuthorization();
+ {
+ ScopedTransaction transaction(txn, MODE_X);
+ Lock::GlobalWrite lk(txn->lockState());
+ verify(syncing == 1);
+ syncing--;
+ }
- Lock::GlobalWrite globalWrite(txn.lockState(), 1);
- if (globalWrite.isLocked()) {
- toSleep = 10;
+ if (relinquishSyncingSome) {
+ relinquishSyncingSome = 0;
+ s = 1; // sleep before going back in to syncing=1
+ }
- try {
- WriteUnitOfWork wuow(&txn);
- getGlobalServiceContext()->getOpObserver()->onOpMessage(&txn, BSONObj());
- wuow.commit();
- }
- catch (...) {
- log() << "caught exception in replMasterThread()" << endl;
- }
- }
- else {
- LOG(5) << "couldn't logKeepalive" << endl;
- toSleep = 1;
- }
+ if (s) {
+ stringstream ss;
+ ss << "sleep " << s << " sec before next pass";
+ string msg = ss.str();
+ if (!serverGlobalParams.quiet)
+ log() << msg << endl;
+ ReplInfo r(msg.c_str());
+ sleepsecs(s);
}
}
-
- static void replSlaveThread() {
- sleepsecs(1);
- Client::initThread("replslave");
-
+}
+
+static void replMasterThread() {
+ sleepsecs(4);
+ Client::initThread("replmaster");
+ int toSleep = 10;
+ while (1) {
+ sleepsecs(toSleep);
+
+ // Write a keep-alive like entry to the log. This will make things like
+ // printReplicationStatus() and printSlaveReplicationStatus() stay up-to-date even
+ // when things are idle.
OperationContextImpl txn;
AuthorizationSession::get(txn.getClient())->grantInternalAuthorization();
- DisableDocumentValidation validationDisabler(&txn);
- while ( 1 ) {
+ Lock::GlobalWrite globalWrite(txn.lockState(), 1);
+ if (globalWrite.isLocked()) {
+ toSleep = 10;
+
try {
- replMain(&txn);
- sleepsecs(5);
- }
- catch ( AssertionException& ) {
- ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry");
- log() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
- }
- catch ( DBException& e ) {
- log() << "exception in replSlaveThread(): " << e.what()
- << ", sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
- }
- catch ( ... ) {
- log() << "error in replSlaveThread(): sleeping 5 minutes before retry" << endl;
- sleepsecs(300);
+ WriteUnitOfWork wuow(&txn);
+ getGlobalServiceContext()->getOpObserver()->onOpMessage(&txn, BSONObj());
+ wuow.commit();
+ } catch (...) {
+ log() << "caught exception in replMasterThread()" << endl;
}
+ } else {
+ LOG(5) << "couldn't logKeepalive" << endl;
+ toSleep = 1;
}
}
+}
- void startMasterSlave(OperationContext* txn) {
-
- const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
- if( !replSettings.slave && !replSettings.master )
- return;
+static void replSlaveThread() {
+ sleepsecs(1);
+ Client::initThread("replslave");
- AuthorizationSession::get(txn->getClient())->grantInternalAuthorization();
+ OperationContextImpl txn;
+ AuthorizationSession::get(txn.getClient())->grantInternalAuthorization();
+ DisableDocumentValidation validationDisabler(&txn);
- {
- ReplSource temp(txn); // Ensures local.me is populated
+ while (1) {
+ try {
+ replMain(&txn);
+ sleepsecs(5);
+ } catch (AssertionException&) {
+ ReplInfo r("Assertion in replSlaveThread(): sleeping 5 minutes before retry");
+ log() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ } catch (DBException& e) {
+ log() << "exception in replSlaveThread(): " << e.what()
+ << ", sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
+ } catch (...) {
+ log() << "error in replSlaveThread(): sleeping 5 minutes before retry" << endl;
+ sleepsecs(300);
}
+ }
+}
- if ( replSettings.slave ) {
- verify( replSettings.slave == SimpleSlave );
- LOG(1) << "slave=true" << endl;
- stdx::thread repl_thread(replSlaveThread);
- }
+void startMasterSlave(OperationContext* txn) {
+ const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
+ if (!replSettings.slave && !replSettings.master)
+ return;
- if ( replSettings.master ) {
- LOG(1) << "master=true" << endl;
- createOplog(txn);
- stdx::thread t(replMasterThread);
- }
+ AuthorizationSession::get(txn->getClient())->grantInternalAuthorization();
- if (replSettings.fastsync) {
- while(!_replMainStarted) // don't allow writes until we've set up from log
- sleepmillis( 50 );
- }
+ {
+ ReplSource temp(txn); // Ensures local.me is populated
}
- int _dummy_z;
-
- void pretouchN(vector<BSONObj>& v, unsigned a, unsigned b) {
- Client::initThreadIfNotAlready("pretouchN");
- OperationContextImpl txn; // XXX
- ScopedTransaction transaction(&txn, MODE_S);
- Lock::GlobalRead lk(txn.lockState());
+ if (replSettings.slave) {
+ verify(replSettings.slave == SimpleSlave);
+ LOG(1) << "slave=true" << endl;
+ stdx::thread repl_thread(replSlaveThread);
+ }
- for( unsigned i = a; i <= b; i++ ) {
- const BSONObj& op = v[i];
- const char *which = "o";
- const char *opType = op.getStringField("op");
- if ( *opType == 'i' )
- ;
- else if( *opType == 'u' )
- which = "o2";
- else
- continue;
- /* todo : other operations */
+ if (replSettings.master) {
+ LOG(1) << "master=true" << endl;
+ createOplog(txn);
+ stdx::thread t(replMasterThread);
+ }
- try {
- BSONObj o = op.getObjectField(which);
- BSONElement _id;
- if( o.getObjectID(_id) ) {
- const char *ns = op.getStringField("ns");
- BSONObjBuilder b;
- b.append(_id);
- BSONObj result;
- OldClientContext ctx(&txn, ns);
- if( Helpers::findById(&txn, ctx.db(), ns, b.done(), result) )
- _dummy_z += result.objsize(); // touch
- }
- }
- catch( DBException& e ) {
- log() << "ignoring assertion in pretouchN() " << a << ' ' << b << ' ' << i << ' ' << e.toString() << endl;
- }
- }
+ if (replSettings.fastsync) {
+ while (!_replMainStarted) // don't allow writes until we've set up from log
+ sleepmillis(50);
}
+}
+int _dummy_z;
- void pretouchOperation(OperationContext* txn, const BSONObj& op) {
+void pretouchN(vector<BSONObj>& v, unsigned a, unsigned b) {
+ Client::initThreadIfNotAlready("pretouchN");
- if (txn->lockState()->isWriteLocked()) {
- return; // no point pretouching if write locked. not sure if this will ever fire, but just in case.
- }
+ OperationContextImpl txn; // XXX
+ ScopedTransaction transaction(&txn, MODE_S);
+ Lock::GlobalRead lk(txn.lockState());
- const char *which = "o";
- const char *opType = op.getStringField("op");
- if ( *opType == 'i' )
+ for (unsigned i = a; i <= b; i++) {
+ const BSONObj& op = v[i];
+ const char* which = "o";
+ const char* opType = op.getStringField("op");
+ if (*opType == 'i')
;
- else if( *opType == 'u' )
+ else if (*opType == 'u')
which = "o2";
else
- return;
+ continue;
/* todo : other operations */
try {
BSONObj o = op.getObjectField(which);
BSONElement _id;
- if( o.getObjectID(_id) ) {
- const char *ns = op.getStringField("ns");
+ if (o.getObjectID(_id)) {
+ const char* ns = op.getStringField("ns");
BSONObjBuilder b;
b.append(_id);
BSONObj result;
- AutoGetCollectionForRead ctx(txn, ns );
- if (Helpers::findById(txn, ctx.getDb(), ns, b.done(), result)) {
- _dummy_z += result.objsize(); // touch
- }
+ OldClientContext ctx(&txn, ns);
+ if (Helpers::findById(&txn, ctx.db(), ns, b.done(), result))
+ _dummy_z += result.objsize(); // touch
}
+ } catch (DBException& e) {
+ log() << "ignoring assertion in pretouchN() " << a << ' ' << b << ' ' << i << ' '
+ << e.toString() << endl;
}
- catch( DBException& ) {
- log() << "ignoring assertion in pretouchOperation()" << endl;
+ }
+}
+
+void pretouchOperation(OperationContext* txn, const BSONObj& op) {
+ if (txn->lockState()->isWriteLocked()) {
+ return; // no point pretouching if write locked. not sure if this will ever fire, but just in case.
+ }
+
+ const char* which = "o";
+ const char* opType = op.getStringField("op");
+ if (*opType == 'i')
+ ;
+ else if (*opType == 'u')
+ which = "o2";
+ else
+ return;
+ /* todo : other operations */
+
+ try {
+ BSONObj o = op.getObjectField(which);
+ BSONElement _id;
+ if (o.getObjectID(_id)) {
+ const char* ns = op.getStringField("ns");
+ BSONObjBuilder b;
+ b.append(_id);
+ BSONObj result;
+ AutoGetCollectionForRead ctx(txn, ns);
+ if (Helpers::findById(txn, ctx.getDb(), ns, b.done(), result)) {
+ _dummy_z += result.objsize(); // touch
+ }
}
+ } catch (DBException&) {
+ log() << "ignoring assertion in pretouchOperation()" << endl;
}
+}
-} // namespace repl
-} // namespace mongo
+} // namespace repl
+} // namespace mongo