summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplogreader.cpp
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2014-09-23 14:46:55 -0400
committerEric Milkie <milkie@10gen.com>2014-09-26 11:27:06 -0400
commite86e08deff7293b5778fad27df9031c013595b12 (patch)
tree9c24931717b261980a0591ab40192cbac9d101ce /src/mongo/db/repl/oplogreader.cpp
parent128ef4c4bcf312fbe6339181e377d12744165cf9 (diff)
downloadmongo-e86e08deff7293b5778fad27df9031c013595b12.tar.gz
SERVER-15089 Add new Applier class and remove theReplSet references from BackgroundSync
Diffstat (limited to 'src/mongo/db/repl/oplogreader.cpp')
-rw-r--r--src/mongo/db/repl/oplogreader.cpp81
1 files changed, 79 insertions, 2 deletions
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index 23997eabeb3..eb542669ccf 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -40,11 +40,14 @@
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/auth/security_key.h"
+#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
-#include "mongo/db/repl/rs.h" // theReplSet
+#include "mongo/db/repl/minvalid.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/repl_coordinator.h"
+#include "mongo/db/repl/rslog.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
@@ -135,5 +138,79 @@ namespace repl {
return _host;
}
+ void OplogReader::connectToSyncSource(OperationContext* txn,
+ OpTime lastOpTimeFetched,
+ ReplicationCoordinator* replCoord) {
+ const OpTime sentinel(Milliseconds(curTimeMillis64()).total_seconds(), 0);
+ OpTime oldestOpTimeSeen = sentinel;
+
+ invariant(conn() == NULL);
+
+ while (true) {
+ HostAndPort candidate = replCoord->chooseNewSyncSource();
+
+ if (candidate.empty()) {
+ if (oldestOpTimeSeen == sentinel) {
+ // If, in this invocation of connectToSyncSource(), we did not successfully
+ // connect to any node ahead of us,
+ // we apparently have no sync sources to connect to.
+ // This situation is common; e.g. if there are no writes to the primary at
+ // the moment.
+ return;
+ }
+
+ // Connected to at least one member, but in all cases we were too stale to use them
+ // as a sync source.
+ log() << "replSet error RS102 too stale to catch up" << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog;
+ log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() <<
+ rsLog;
+ log() << "replSet "
+ "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"
+ << rsLog;
+ setMinValid(txn, oldestOpTimeSeen);
+ replCoord->setFollowerMode(MemberState::RS_RECOVERING);
+ return;
+ }
+
+ if (!connect(candidate)) {
+ LOG(2) << "replSet can't connect to " << candidate.toString() <<
+ " to read operations" << rsLog;
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000));
+ continue;
+ }
+ // Read the first (oldest) op and confirm that it's not newer than our last
+ // fetched op. Otherwise, we have fallen off the back of that source's oplog.
+ BSONObj remoteOldestOp(findOne(rsoplog, Query()));
+ BSONElement tsElem(remoteOldestOp["ts"]);
+ if (tsElem.type() != Timestamp) {
+ // This member's got a bad op in its oplog.
+ warning() << "oplog invalid format on node " << candidate.toString();
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate,
+ Date_t(curTimeMillis64() + 600*1000));
+ continue;
+ }
+ OpTime remoteOldOpTime = tsElem._opTime();
+
+ if (lastOpTimeFetched < remoteOldOpTime) {
+ // We're too stale to use this sync source.
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate,
+ Date_t(curTimeMillis64() + 600*1000));
+ if (oldestOpTimeSeen > remoteOldOpTime) {
+ warning() << "we are too stale to use " << candidate.toString() <<
+ " as a sync source";
+ oldestOpTimeSeen = remoteOldOpTime;
+ }
+ continue;
+ }
+
+ // Got a valid sync source.
+ return;
+ } // while (true)
+ }
+
} // namespace repl
} // namespace mongo