diff options
author | Benety Goh <benety@mongodb.com> | 2016-10-30 21:26:25 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-11-02 18:57:01 -0400 |
commit | 053a2dbc9c503bf55ba52bc3c937bc138cc67b8a (patch) | |
tree | c679dc006202681f545b21c79267be079f670983 /src/mongo/db/repl/sync_source_resolver.cpp | |
parent | 16b2afc48459fbd2670c5e824bd662d6ac7b584f (diff) | |
download | mongo-053a2dbc9c503bf55ba52bc3c937bc138cc67b8a.tar.gz |
SERVER-25145 SyncSourceResolver selects sync sources based on required optime if given
Diffstat (limited to 'src/mongo/db/repl/sync_source_resolver.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 120 |
1 files changed, 119 insertions, 1 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index cb7b272be20..c3c563e6649 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -52,19 +52,25 @@ const Seconds SyncSourceResolver::kOplogEmptyBlacklistDuration(10); const Seconds SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration(10); const Seconds SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration(10); const Minutes SyncSourceResolver::kTooStaleBlacklistDuration(1); +const Seconds SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration(60); SyncSourceResolver::SyncSourceResolver(executor::TaskExecutor* taskExecutor, SyncSourceSelector* syncSourceSelector, const OpTime& lastOpTimeFetched, + const OpTime& requiredOpTime, const OnCompletionFn& onCompletion) : _taskExecutor(taskExecutor), _syncSourceSelector(syncSourceSelector), _lastOpTimeFetched(lastOpTimeFetched), + _requiredOpTime(requiredOpTime), _onCompletion(onCompletion) { uassert(ErrorCodes::BadValue, "task executor cannot be null", taskExecutor); uassert(ErrorCodes::BadValue, "sync source selector cannot be null", syncSourceSelector); uassert( ErrorCodes::BadValue, "last fetched optime cannot be null", !lastOpTimeFetched.isNull()); + uassert(ErrorCodes::BadValue, + "required optime (if provided) must be more recent than last fetched optime", + requiredOpTime.isNull() || requiredOpTime > lastOpTimeFetched); uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); } @@ -157,6 +163,26 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher( kFetcherTimeout); } +std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndPort candidate, + OpTime earliestOpTimeSeen) { + // This query is structured so that it is executed on the sync source using the oplog + // start hack (oplogReplay=true and $gt/$gte predicate over "ts"). + return stdx::make_unique<Fetcher>( + _taskExecutor, + candidate, + kLocalOplogNss.db().toString(), + BSON("find" << kLocalOplogNss.coll() << "oplogReplay" << true << "filter" + << BSON("ts" << BSON("$gte" << _requiredOpTime.getTimestamp() << "$lte" + << _requiredOpTime.getTimestamp()))), + stdx::bind(&SyncSourceResolver::_requiredOpTimeFetcherCallback, + this, + stdx::placeholders::_1, + candidate, + earliestOpTimeSeen), + rpc::ServerSelectionMetadata(true, boost::none).toBSON(), + kFetcherTimeout); +} + Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) { stdx::lock_guard<stdx::mutex> lk(_mutex); // Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task @@ -269,6 +295,93 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback( return; } + // Schedules fetcher to look for '_requiredOpTime' in the remote oplog. + if (!_requiredOpTime.isNull()) { + auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen)); + if (!status.isOK()) { + _finishCallback(status); + } + return; + } + + _finishCallback(candidate); +} + +Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse( + const Fetcher::QueryResponse& queryResponse) { + if (queryResponse.documents.empty()) { + return Status( + ErrorCodes::NoMatchingDocument, + "remote oplog does not contain entry with optime matching our required optime"); + } + const OplogEntry oplogEntry(queryResponse.documents.front()); + const auto opTime = oplogEntry.getOpTime(); + if (_requiredOpTime != opTime) { + return Status(ErrorCodes::BadValue, + str::stream() << "remote oplog contain entry with matching timestamp " + << opTime.getTimestamp().toString() + << " but optime " + << opTime.toString() + << " does not " + "match our required optime"); + } + if (_requiredOpTime.getTerm() != opTime.getTerm()) { + return Status(ErrorCodes::BadValue, + str::stream() << "remote oplog contain entry with term " << opTime.getTerm() + << " that does not " + "match the term in our required optime"); + } + return Status::OK(); +} + +void SyncSourceResolver::_requiredOpTimeFetcherCallback( + const StatusWith<Fetcher::QueryResponse>& queryResult, + HostAndPort candidate, + OpTime earliestOpTimeSeen) { + if (_isShuttingDown()) { + _finishCallback(Status(ErrorCodes::CallbackCanceled, + str::stream() << "sync source resolver shut down while looking for " + "required optime " + << _requiredOpTime.toString() + << " in candidate's oplog: " + << candidate)); + return; + } + + if (ErrorCodes::CallbackCanceled == queryResult.getStatus()) { + _finishCallback(queryResult.getStatus()); + return; + } + + if (!queryResult.isOK()) { + // We got an error. + const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration; + log() << "Blacklisting " << candidate << " due to required optime fetcher error: '" + << queryResult.getStatus() << "' for " << kFetcherErrorBlacklistDuration + << " until: " << until << ". required optime: " << _requiredOpTime; + _syncSourceSelector->blacklistSyncSource(candidate, until); + + _chooseAndProbeNextSyncSource(earliestOpTimeSeen); + return; + } + + const auto& queryResponse = queryResult.getValue(); + auto status = _compareRequiredOpTimeWithQueryResponse(queryResponse); + if (!status.isOK()) { + const auto until = _taskExecutor->now() + kNoRequiredOpTimeBlacklistDuration; + warning() << "We cannot use " << candidate.toString() + << " as a sync source because it does not contain the necessary " + "operations for us to reach a consistent state: " + << status << " last fetched optime: " << _lastOpTimeFetched + << ". required optime: " << _requiredOpTime + << ". Blacklisting this sync source for " << kNoRequiredOpTimeBlacklistDuration + << " until: " << until; + _syncSourceSelector->blacklistSyncSource(candidate, until); + + _chooseAndProbeNextSyncSource(earliestOpTimeSeen); + return; + } + _finishCallback(candidate); } @@ -305,7 +418,12 @@ Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) { } Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) { - _onCompletion(response); + try { + _onCompletion(response); + } catch (...) { + warning() << "sync source resolver finish callback threw exception: " + << exceptionToStatus(); + } stdx::lock_guard<stdx::mutex> lock(_mutex); invariant(_state != State::kComplete); |