summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/bgsync.cpp3
-rw-r--r--src/mongo/db/repl/oplogreader.cpp49
-rw-r--r--src/mongo/db/repl/oplogreader.h7
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp2
4 files changed, 59 insertions, 2 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index a461d5f2c8c..b2269fa5f2e 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -289,7 +289,8 @@ void BackgroundSync::_produce(OperationContext* txn) {
_syncSourceHost = HostAndPort();
}
OplogReader syncSourceReader;
- syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord);
+ OpTime minValid;
+ syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, minValid, _replCoord);
// no server found
if (syncSourceReader.getHost().empty()) {
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index f0cb34c7a1e..63832b201af 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -137,8 +137,34 @@ HostAndPort OplogReader::getHost() const {
return _host;
}
+Status OplogReader::_compareRequiredOpTimeWithQueryResponse(const OpTime& requiredOpTime) {
+ auto containsMinValid = more();
+ if (!containsMinValid) {
+ return Status(
+ ErrorCodes::NoMatchingDocument,
+ "remote oplog does not contain entry with optime matching our required optime");
+ }
+ auto doc = nextSafe();
+ const auto opTime = fassertStatusOK(40351, OpTime::parseFromOplogEntry(doc));
+ if (requiredOpTime != opTime) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "remote oplog contain entry with matching timestamp "
+ << opTime.getTimestamp().toString() << " but optime "
+ << opTime.toString() << " does not "
+ "match our required optime");
+ }
+ if (requiredOpTime.getTerm() != opTime.getTerm()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "remote oplog contain entry with term " << opTime.getTerm()
+ << " that does not "
+ "match the term in our required optime");
+ }
+ return Status::OK();
+}
+
void OplogReader::connectToSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched,
+ const OpTime& requiredOpTime,
ReplicationCoordinator* replCoord) {
const Timestamp sentinelTimestamp(duration_cast<Seconds>(Milliseconds(curTimeMillis64())), 0);
const OpTime sentinel(sentinelTimestamp, std::numeric_limits<long long>::max());
@@ -203,6 +229,29 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
continue;
}
+ // Check if sync source contains required optime.
+ if (!requiredOpTime.isNull()) {
+ // This query is structured so that it is executed on the sync source using the oplog
+ // start hack (oplogReplay=true and $gt/$gte predicate over "ts").
+ auto ts = requiredOpTime.getTimestamp();
+ tailingQuery(rsOplogName.c_str(), BSON("ts" << BSON("$gte" << ts << "$lte" << ts)));
+ auto status = _compareRequiredOpTimeWithQueryResponse(requiredOpTime);
+ if (!status.isOK()) {
+ const auto blacklistDuration = Seconds(60);
+ const auto until = Date_t::now() + blacklistDuration;
+ warning() << "We cannot use " << candidate.toString()
+ << " as a sync source because it does not contain the necessary "
+ "operations for us to reach a consistent state: " << status
+ << " last fetched optime: " << lastOpTimeFetched
+ << ". required optime: " << requiredOpTime
+ << ". Blacklisting this sync source for " << blacklistDuration
+ << " until: " << until;
+ resetConnection();
+ replCoord->blacklistSyncSource(candidate, until);
+ continue;
+ }
+ resetCursor();
+ }
// TODO: If we were too stale (recovering with maintenance mode on), then turn it off, to
// allow becoming secondary/etc.
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 65e101415a0..1434125697a 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -157,7 +157,14 @@ public:
*/
void connectToSyncSource(OperationContext* txn,
const OpTime& lastOpTimeFetched,
+ const OpTime& requiredOpTime,
ReplicationCoordinator* replCoord);
+
+private:
+ /**
+ * Checks query response for required optime.
+ */
+ Status _compareRequiredOpTimeWithQueryResponse(const OpTime& requiredOpTime);
};
} // namespace repl
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index a2b4bc1ccf5..d475d53d644 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -317,7 +317,7 @@ Status _initialSync() {
while (r.getHost().empty()) {
// We must prime the sync source selector so that it considers all candidates regardless
// of oplog position, by passing in null OpTime as the last op fetched time.
- r.connectToSyncSource(&txn, OpTime(), replCoord);
+ r.connectToSyncSource(&txn, OpTime(), OpTime(), replCoord);
if (r.getHost().empty()) {
std::string msg =
"no valid sync sources found in current replset to do an initial sync";