path: root/src/mongo/db/repl/sync_source_resolver.cpp
diff options
authorBenety Goh <>2016-10-30 21:26:25 -0400
committerBenety Goh <>2016-11-02 18:57:01 -0400
commit053a2dbc9c503bf55ba52bc3c937bc138cc67b8a (patch)
treec679dc006202681f545b21c79267be079f670983 /src/mongo/db/repl/sync_source_resolver.cpp
parent16b2afc48459fbd2670c5e824bd662d6ac7b584f (diff)
SERVER-25145 SyncSourceResolver selects sync sources based on required optime if given
Diffstat (limited to 'src/mongo/db/repl/sync_source_resolver.cpp')
1 files changed, 119 insertions, 1 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index cb7b272be20..c3c563e6649 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -52,19 +52,25 @@ const Seconds SyncSourceResolver::kOplogEmptyBlacklistDuration(10);
const Seconds SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration(10);
const Seconds SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration(10);
const Minutes SyncSourceResolver::kTooStaleBlacklistDuration(1);
+const Seconds SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration(60);
SyncSourceResolver::SyncSourceResolver(executor::TaskExecutor* taskExecutor,
SyncSourceSelector* syncSourceSelector,
const OpTime& lastOpTimeFetched,
+ const OpTime& requiredOpTime,
const OnCompletionFn& onCompletion)
: _taskExecutor(taskExecutor),
+ _requiredOpTime(requiredOpTime),
_onCompletion(onCompletion) {
uassert(ErrorCodes::BadValue, "task executor cannot be null", taskExecutor);
uassert(ErrorCodes::BadValue, "sync source selector cannot be null", syncSourceSelector);
ErrorCodes::BadValue, "last fetched optime cannot be null", !lastOpTimeFetched.isNull());
+ uassert(ErrorCodes::BadValue,
+ "required optime (if provided) must be more recent than last fetched optime",
+ requiredOpTime.isNull() || requiredOpTime > lastOpTimeFetched);
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
@@ -157,6 +163,26 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher(
+std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndPort candidate,
+ OpTime earliestOpTimeSeen) {
+ // This query is structured so that it is executed on the sync source using the oplog
+ // start hack (oplogReplay=true and $gt/$gte predicate over "ts").
+ return stdx::make_unique<Fetcher>(
+ _taskExecutor,
+ candidate,
+ kLocalOplogNss.db().toString(),
+ BSON("find" << kLocalOplogNss.coll() << "oplogReplay" << true << "filter"
+ << BSON("ts" << BSON("$gte" << _requiredOpTime.getTimestamp() << "$lte"
+ << _requiredOpTime.getTimestamp()))),
+ stdx::bind(&SyncSourceResolver::_requiredOpTimeFetcherCallback,
+ this,
+ stdx::placeholders::_1,
+ candidate,
+ earliestOpTimeSeen),
+ rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ kFetcherTimeout);
Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
// Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task
@@ -269,6 +295,93 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback(
+ // Schedules fetcher to look for '_requiredOpTime' in the remote oplog.
+ if (!_requiredOpTime.isNull()) {
+ auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen));
+ if (!status.isOK()) {
+ _finishCallback(status);
+ }
+ return;
+ }
+ _finishCallback(candidate);
+Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
+ const Fetcher::QueryResponse& queryResponse) {
+ if (queryResponse.documents.empty()) {
+ return Status(
+ ErrorCodes::NoMatchingDocument,
+ "remote oplog does not contain entry with optime matching our required optime");
+ }
+ const OplogEntry oplogEntry(queryResponse.documents.front());
+ const auto opTime = oplogEntry.getOpTime();
+ if (_requiredOpTime != opTime) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "remote oplog contain entry with matching timestamp "
+ << opTime.getTimestamp().toString()
+ << " but optime "
+ << opTime.toString()
+ << " does not "
+ "match our required optime");
+ }
+ if (_requiredOpTime.getTerm() != opTime.getTerm()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "remote oplog contain entry with term " << opTime.getTerm()
+ << " that does not "
+ "match the term in our required optime");
+ }
+ return Status::OK();
+void SyncSourceResolver::_requiredOpTimeFetcherCallback(
+ const StatusWith<Fetcher::QueryResponse>& queryResult,
+ HostAndPort candidate,
+ OpTime earliestOpTimeSeen) {
+ if (_isShuttingDown()) {
+ _finishCallback(Status(ErrorCodes::CallbackCanceled,
+ str::stream() << "sync source resolver shut down while looking for "
+ "required optime "
+ << _requiredOpTime.toString()
+ << " in candidate's oplog: "
+ << candidate));
+ return;
+ }
+ if (ErrorCodes::CallbackCanceled == queryResult.getStatus()) {
+ _finishCallback(queryResult.getStatus());
+ return;
+ }
+ if (!queryResult.isOK()) {
+ // We got an error.
+ const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration;
+ log() << "Blacklisting " << candidate << " due to required optime fetcher error: '"
+ << queryResult.getStatus() << "' for " << kFetcherErrorBlacklistDuration
+ << " until: " << until << ". required optime: " << _requiredOpTime;
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+ _chooseAndProbeNextSyncSource(earliestOpTimeSeen);
+ return;
+ }
+ const auto& queryResponse = queryResult.getValue();
+ auto status = _compareRequiredOpTimeWithQueryResponse(queryResponse);
+ if (!status.isOK()) {
+ const auto until = _taskExecutor->now() + kNoRequiredOpTimeBlacklistDuration;
+ warning() << "We cannot use " << candidate.toString()
+ << " as a sync source because it does not contain the necessary "
+ "operations for us to reach a consistent state: "
+ << status << " last fetched optime: " << _lastOpTimeFetched
+ << ". required optime: " << _requiredOpTime
+ << ". Blacklisting this sync source for " << kNoRequiredOpTimeBlacklistDuration
+ << " until: " << until;
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+ _chooseAndProbeNextSyncSource(earliestOpTimeSeen);
+ return;
+ }
@@ -305,7 +418,12 @@ Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) {
Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) {
- _onCompletion(response);
+ try {
+ _onCompletion(response);
+ } catch (...) {
+ warning() << "sync source resolver finish callback threw exception: "
+ << exceptionToStatus();
+ }
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(_state != State::kComplete);