diff options
author | Benety Goh <benety@mongodb.com> | 2016-10-28 20:30:54 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-11-03 10:32:55 -0400 |
commit | aeca1c4b7eaeacff19a32facee8169662cbf3bd6 (patch) | |
tree | 45f9a8b96f56bc69fc72309bdb2ab09bf8fbee8e | |
parent | 7a7d6144bb18878895087d775ffefd5cfdbb0d65 (diff) | |
download | mongo-aeca1c4b7eaeacff19a32facee8169662cbf3bd6.tar.gz |
SERVER-25145 OplogReader::connectToSyncSource selects sync sources based on required optime if given
(cherry picked from commit d16d537fe262cc5f2e18b2fbe413ba80515e8c36)
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/oplogreader.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 2 |
4 files changed, 59 insertions, 2 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index a461d5f2c8c..b2269fa5f2e 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -289,7 +289,8 @@ void BackgroundSync::_produce(OperationContext* txn) { _syncSourceHost = HostAndPort(); } OplogReader syncSourceReader; - syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord); + OpTime minValid; + syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord); // no server found if (syncSourceReader.getHost().empty()) { diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index f0cb34c7a1e..63832b201af 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -137,8 +137,34 @@ HostAndPort OplogReader::getHost() const { return _host; } +Status OplogReader::_compareRequiredOpTimeWithQueryResponse(const OpTime& requiredOpTime) { + auto containsMinValid = more(); + if (!containsMinValid) { + return Status( + ErrorCodes::NoMatchingDocument, + "remote oplog does not contain entry with optime matching our required optime"); + } + auto doc = nextSafe(); + const auto opTime = fassertStatusOK(40351, OpTime::parseFromOplogEntry(doc)); + if (requiredOpTime != opTime) { + return Status(ErrorCodes::BadValue, + str::stream() << "remote oplog contain entry with matching timestamp " + << opTime.getTimestamp().toString() << " but optime " + << opTime.toString() << " does not " + "match our required optime"); + } + if (requiredOpTime.getTerm() != opTime.getTerm()) { + return Status(ErrorCodes::BadValue, + str::stream() << "remote oplog contain entry with term " << opTime.getTerm() + << " that does not " + "match the term in our required optime"); + } + return Status::OK(); +} + void OplogReader::connectToSyncSource(OperationContext* txn, const OpTime& lastOpTimeFetched, + const OpTime& requiredOpTime, ReplicationCoordinator* replCoord) { const Timestamp sentinelTimestamp(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0); const OpTime sentinel(sentinelTimestamp, std::numeric_limits<long long>::max()); @@ -203,6 +229,29 @@ void OplogReader::connectToSyncSource(OperationContext* txn, continue; } + // Check if sync source contains required optime. + if (!requiredOpTime.isNull()) { + // This query is structured so that it is executed on the sync source using the oplog + // start hack (oplogReplay=true and $gt/$gte predicate over "ts"). + auto ts = requiredOpTime.getTimestamp(); + tailingQuery(rsOplogName.c_str(), BSON("ts" << BSON("$gte" << ts << "$lte" << ts))); + auto status = _compareRequiredOpTimeWithQueryResponse(requiredOpTime); + if (!status.isOK()) { + const auto blacklistDuration = Seconds(60); + const auto until = Date_t::now() + blacklistDuration; + warning() << "We cannot use " << candidate.toString() + << " as a sync source because it does not contain the necessary " + "operations for us to reach a consistent state: " << status + << " last fetched optime: " << lastOpTimeFetched + << ". required optime: " << requiredOpTime + << ". Blacklisting this sync source for " << blacklistDuration + << " until: " << until; + resetConnection(); + replCoord->blacklistSyncSource(candidate, until); + continue; + } + resetCursor(); + } // TODO: If we were too stale (recovering with maintenance mode on), then turn it off, to // allow becoming secondary/etc. diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h index 65e101415a0..1434125697a 100644 --- a/src/mongo/db/repl/oplogreader.h +++ b/src/mongo/db/repl/oplogreader.h @@ -157,7 +157,14 @@ public: */ void connectToSyncSource(OperationContext* txn, const OpTime& lastOpTimeFetched, + const OpTime& requiredOpTime, ReplicationCoordinator* replCoord); + +private: + /** + * Checks query response for required optime. + */ + Status _compareRequiredOpTimeWithQueryResponse(const OpTime& requiredOpTime); }; } // namespace repl diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index a2b4bc1ccf5..d475d53d644 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -317,7 +317,7 @@ Status _initialSync() { while (r.getHost().empty()) { // We must prime the sync source selector so that it considers all candidates regardless // of oplog position, by passing in null OpTime as the last op fetched time. - r.connectToSyncSource(&txn, OpTime(), replCoord); + r.connectToSyncSource(&txn, OpTime(), OpTime(), replCoord); if (r.getHost().empty()) { std::string msg = "no valid sync sources found in current replset to do an initial sync"; |