diff options
author | Eric Milkie <milkie@10gen.com> | 2014-09-23 14:46:55 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2014-09-26 11:27:06 -0400 |
commit | e86e08deff7293b5778fad27df9031c013595b12 (patch) | |
tree | 9c24931717b261980a0591ab40192cbac9d101ce /src/mongo/db/repl/oplogreader.cpp | |
parent | 128ef4c4bcf312fbe6339181e377d12744165cf9 (diff) | |
download | mongo-e86e08deff7293b5778fad27df9031c013595b12.tar.gz |
SERVER-15089 Add new Applier class and remove theReplSet references from BackgroundSync
Diffstat (limited to 'src/mongo/db/repl/oplogreader.cpp')
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 81 |
1 files changed, 79 insertions, 2 deletions
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index 23997eabeb3..eb542669ccf 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -40,11 +40,14 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/authorization_session.h" -#include "mongo/db/commands/server_status_metric.h" #include "mongo/db/auth/security_key.h" +#include "mongo/db/commands/server_status_metric.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" -#include "mongo/db/repl/rs.h" // theReplSet +#include "mongo/db/repl/minvalid.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/repl_coordinator.h" +#include "mongo/db/repl/rslog.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -135,5 +138,79 @@ namespace repl { return _host; } + void OplogReader::connectToSyncSource(OperationContext* txn, + OpTime lastOpTimeFetched, + ReplicationCoordinator* replCoord) { + const OpTime sentinel(Milliseconds(curTimeMillis64()).total_seconds(), 0); + OpTime oldestOpTimeSeen = sentinel; + + invariant(conn() == NULL); + + while (true) { + HostAndPort candidate = replCoord->chooseNewSyncSource(); + + if (candidate.empty()) { + if (oldestOpTimeSeen == sentinel) { + // If, in this invocation of connectToSyncSource(), we did not successfully + // connect to any node ahead of us, + // we apparently have no sync sources to connect to. + // This situation is common; e.g. if there are no writes to the primary at + // the moment. + return; + } + + // Connected to at least one member, but in all cases we were too stale to use them + // as a sync source. + log() << "replSet error RS102 too stale to catch up" << rsLog; + log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog; + log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() << + rsLog; + log() << "replSet " + "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember" + << rsLog; + setMinValid(txn, oldestOpTimeSeen); + replCoord->setFollowerMode(MemberState::RS_RECOVERING); + return; + } + + if (!connect(candidate)) { + LOG(2) << "replSet can't connect to " << candidate.toString() << + " to read operations" << rsLog; + resetConnection(); + replCoord->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000)); + continue; + } + // Read the first (oldest) op and confirm that it's not newer than our last + // fetched op. Otherwise, we have fallen off the back of that source's oplog. + BSONObj remoteOldestOp(findOne(rsoplog, Query())); + BSONElement tsElem(remoteOldestOp["ts"]); + if (tsElem.type() != Timestamp) { + // This member's got a bad op in its oplog. + warning() << "oplog invalid format on node " << candidate.toString(); + resetConnection(); + replCoord->blacklistSyncSource(candidate, + Date_t(curTimeMillis64() + 600*1000)); + continue; + } + OpTime remoteOldOpTime = tsElem._opTime(); + + if (lastOpTimeFetched < remoteOldOpTime) { + // We're too stale to use this sync source. + resetConnection(); + replCoord->blacklistSyncSource(candidate, + Date_t(curTimeMillis64() + 600*1000)); + if (oldestOpTimeSeen > remoteOldOpTime) { + warning() << "we are too stale to use " << candidate.toString() << + " as a sync source"; + oldestOpTimeSeen = remoteOldOpTime; + } + continue; + } + + // Got a valid sync source. + return; + } // while (true) + } + } // namespace repl } // namespace mongo |