diff options
author | Benety Goh <benety@mongodb.com> | 2016-10-30 21:26:25 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-11-02 18:57:01 -0400 |
commit | 053a2dbc9c503bf55ba52bc3c937bc138cc67b8a (patch) | |
tree | c679dc006202681f545b21c79267be079f670983 /src/mongo/db/repl | |
parent | 16b2afc48459fbd2670c5e824bd662d6ac7b584f (diff) | |
download | mongo-053a2dbc9c503bf55ba52bc3c937bc138cc67b8a.tar.gz |
SERVER-25145 SyncSourceResolver selects sync sources based on required optime if given
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 120 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.h | 40 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver_test.cpp | 372 |
4 files changed, 522 insertions, 12 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index d190db0e2e5..63500f6994e 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -285,6 +285,7 @@ void BackgroundSync::_produce(OperationContext* txn) { HostAndPort source; SyncSourceResolverResponse syncSourceResp; SyncSourceResolver* syncSourceResolver; + OpTime minValid; { stdx::unique_lock<stdx::mutex> lock(_mutex); lastOpTimeFetched = _lastOpTimeFetched; @@ -293,6 +294,7 @@ void BackgroundSync::_produce(OperationContext* txn) { _replicationCoordinatorExternalState->getTaskExecutor(), _replCoord, lastOpTimeFetched, + minValid, [&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; }); syncSourceResolver = _syncSourceResolver.get(); } diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index cb7b272be20..c3c563e6649 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -52,19 +52,25 @@ const Seconds SyncSourceResolver::kOplogEmptyBlacklistDuration(10); const Seconds SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration(10); const Seconds SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration(10); const Minutes SyncSourceResolver::kTooStaleBlacklistDuration(1); +const Seconds SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration(60); SyncSourceResolver::SyncSourceResolver(executor::TaskExecutor* taskExecutor, SyncSourceSelector* syncSourceSelector, const OpTime& lastOpTimeFetched, + const OpTime& requiredOpTime, const OnCompletionFn& onCompletion) : _taskExecutor(taskExecutor), _syncSourceSelector(syncSourceSelector), _lastOpTimeFetched(lastOpTimeFetched), + _requiredOpTime(requiredOpTime), _onCompletion(onCompletion) { uassert(ErrorCodes::BadValue, "task executor cannot be null", taskExecutor); uassert(ErrorCodes::BadValue, "sync source selector cannot be null", syncSourceSelector); uassert( ErrorCodes::BadValue, "last fetched optime cannot be null", !lastOpTimeFetched.isNull()); + uassert(ErrorCodes::BadValue, + "required optime (if provided) must be more recent than last fetched optime", + requiredOpTime.isNull() || requiredOpTime > lastOpTimeFetched); uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); } @@ -157,6 +163,26 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher( kFetcherTimeout); } +std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndPort candidate, + OpTime earliestOpTimeSeen) { + // This query is structured so that it is executed on the sync source using the oplog + // start hack (oplogReplay=true and $gt/$gte predicate over "ts"). + return stdx::make_unique<Fetcher>( + _taskExecutor, + candidate, + kLocalOplogNss.db().toString(), + BSON("find" << kLocalOplogNss.coll() << "oplogReplay" << true << "filter" + << BSON("ts" << BSON("$gte" << _requiredOpTime.getTimestamp() << "$lte" + << _requiredOpTime.getTimestamp()))), + stdx::bind(&SyncSourceResolver::_requiredOpTimeFetcherCallback, + this, + stdx::placeholders::_1, + candidate, + earliestOpTimeSeen), + rpc::ServerSelectionMetadata(true, boost::none).toBSON(), + kFetcherTimeout); +} + Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) { stdx::lock_guard<stdx::mutex> lk(_mutex); // Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task @@ -269,6 +295,93 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback( return; } + // Schedules fetcher to look for '_requiredOpTime' in the remote oplog. + if (!_requiredOpTime.isNull()) { + auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen)); + if (!status.isOK()) { + _finishCallback(status); + } + return; + } + + _finishCallback(candidate); +} + +Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse( + const Fetcher::QueryResponse& queryResponse) { + if (queryResponse.documents.empty()) { + return Status( + ErrorCodes::NoMatchingDocument, + "remote oplog does not contain entry with optime matching our required optime"); + } + const OplogEntry oplogEntry(queryResponse.documents.front()); + const auto opTime = oplogEntry.getOpTime(); + if (_requiredOpTime != opTime) { + return Status(ErrorCodes::BadValue, + str::stream() << "remote oplog contain entry with matching timestamp " + << opTime.getTimestamp().toString() + << " but optime " + << opTime.toString() + << " does not " + "match our required optime"); + } + if (_requiredOpTime.getTerm() != opTime.getTerm()) { + return Status(ErrorCodes::BadValue, + str::stream() << "remote oplog contain entry with term " << opTime.getTerm() + << " that does not " + "match the term in our required optime"); + } + return Status::OK(); +} + +void SyncSourceResolver::_requiredOpTimeFetcherCallback( + const StatusWith<Fetcher::QueryResponse>& queryResult, + HostAndPort candidate, + OpTime earliestOpTimeSeen) { + if (_isShuttingDown()) { + _finishCallback(Status(ErrorCodes::CallbackCanceled, + str::stream() << "sync source resolver shut down while looking for " + "required optime " + << _requiredOpTime.toString() + << " in candidate's oplog: " + << candidate)); + return; + } + + if (ErrorCodes::CallbackCanceled == queryResult.getStatus()) { + _finishCallback(queryResult.getStatus()); + return; + } + + if (!queryResult.isOK()) { + // We got an error. + const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration; + log() << "Blacklisting " << candidate << " due to required optime fetcher error: '" + << queryResult.getStatus() << "' for " << kFetcherErrorBlacklistDuration + << " until: " << until << ". required optime: " << _requiredOpTime; + _syncSourceSelector->blacklistSyncSource(candidate, until); + + _chooseAndProbeNextSyncSource(earliestOpTimeSeen); + return; + } + + const auto& queryResponse = queryResult.getValue(); + auto status = _compareRequiredOpTimeWithQueryResponse(queryResponse); + if (!status.isOK()) { + const auto until = _taskExecutor->now() + kNoRequiredOpTimeBlacklistDuration; + warning() << "We cannot use " << candidate.toString() + << " as a sync source because it does not contain the necessary " + "operations for us to reach a consistent state: " + << status << " last fetched optime: " << _lastOpTimeFetched + << ". required optime: " << _requiredOpTime + << ". Blacklisting this sync source for " << kNoRequiredOpTimeBlacklistDuration + << " until: " << until; + _syncSourceSelector->blacklistSyncSource(candidate, until); + + _chooseAndProbeNextSyncSource(earliestOpTimeSeen); + return; + } + _finishCallback(candidate); } @@ -305,7 +418,12 @@ Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) { } Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) { - _onCompletion(response); + try { + _onCompletion(response); + } catch (...) { + warning() << "sync source resolver finish callback threw exception: " + << exceptionToStatus(); + } stdx::lock_guard<stdx::mutex> lock(_mutex); invariant(_state != State::kComplete); diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h index e0365c7ad1a..099b56994e2 100644 --- a/src/mongo/db/repl/sync_source_resolver.h +++ b/src/mongo/db/repl/sync_source_resolver.h @@ -96,6 +96,7 @@ public: static const Seconds kFirstOplogEntryEmptyBlacklistDuration; static const Seconds kFirstOplogEntryNullTimestampBlacklistDuration; static const Minutes kTooStaleBlacklistDuration; + static const Seconds kNoRequiredOpTimeBlacklistDuration; /** * Callback function to report final status of resolving sync source. @@ -105,6 +106,7 @@ public: SyncSourceResolver(executor::TaskExecutor* taskExecutor, SyncSourceSelector* syncSourceSelector, const OpTime& lastOpTimeFetched, + const OpTime& requiredOpTime, const OnCompletionFn& onCompletion); virtual ~SyncSourceResolver(); @@ -144,6 +146,12 @@ private: OpTime earliestOpTimeSeen); /** + * Creates fetcher to check the remote oplog for '_requiredOpTime'. + */ + std::unique_ptr<Fetcher> _makeRequiredOpTimeFetcher(HostAndPort candidate, + OpTime earliestOpTimeSeen); + + /** * Schedules fetcher to read oplog on sync source. * Saves fetcher in '_fetcher' on success. */ @@ -164,6 +172,18 @@ private: OpTime earliestOpTimeSeen); /** + * Checks query response for required optime. + */ + Status _compareRequiredOpTimeWithQueryResponse(const Fetcher::QueryResponse& queryResponse); + + /** + * Callback for checking if the remote oplog contains '_requiredOpTime'. + */ + void _requiredOpTimeFetcherCallback(const StatusWith<Fetcher::QueryResponse>& queryResult, + HostAndPort candidate, + OpTime earliestOpTimeSeen); + + /** * Obtains new sync source candidate and schedules remote command to fetcher first oplog entry. * May transition state to Complete. * Returns status that could be used as result for startup(). @@ -177,14 +197,34 @@ private: Status _finishCallback(StatusWith<HostAndPort> result); Status _finishCallback(const SyncSourceResolverResponse& response); + // Executor used to send remote commands to sync source candidates. executor::TaskExecutor* const _taskExecutor; + + // Sync source selector used to obtain sync source candidates and for us to blacklist non-viable + // candidates. SyncSourceSelector* const _syncSourceSelector; + + // A viable sync source must contain a starting oplog entry with a timestamp equal or earlier + // than the timestamp in '_lastOpTimeFetched'. const OpTime _lastOpTimeFetched; + + // If '_requiredOpTime' is not null, a viable sync source must contain an oplog entry with an + // optime equal to this value. + const OpTime _requiredOpTime; + + // This is invoked exactly once after startup. The caller gets the results of the sync source + // resolver via this callback in a SyncSourceResolverResponse struct when the resolver finishes. const OnCompletionFn _onCompletion; // Protects members of this sync source resolver. mutable stdx::mutex _mutex; mutable stdx::condition_variable _condition; + + // State transitions: + // PreStart --> Running --> ShuttingDown --> Complete + // It is possible to skip intermediate states. For example, + // Calling shutdown() when the resolver has not started will transition from PreStart directly + // to Complete. enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; State _state = State::kPreStart; diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index a01a59ffff6..f94da2e3eb9 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -97,6 +97,8 @@ private: void tearDown() override; protected: + std::unique_ptr<SyncSourceResolver> _makeResolver(const OpTime& lastOpTimeFetched, + const OpTime& requiredOpTime); TaskExecutorWithFailureInScheduleRemoteCommand::ShouldFailRequestFn _shouldFailRequest; std::unique_ptr<TaskExecutorWithFailureInScheduleRemoteCommand> _executorProxy; @@ -122,11 +124,7 @@ void SyncSourceResolverTest::setUp() { _onCompletion = [this](const SyncSourceResolverResponse& response) { _response = response; }; _selector = stdx::make_unique<SyncSourceSelectorMock>(); - _resolver = stdx::make_unique<SyncSourceResolver>( - _executorProxy.get(), - _selector.get(), - lastOpTimeFetched, - [this](const SyncSourceResolverResponse& response) { _onCompletion(response); }); + _resolver = _makeResolver(lastOpTimeFetched, OpTime()); launchExecutorThread(); } @@ -142,6 +140,16 @@ void SyncSourceResolverTest::tearDown() { executor::ThreadPoolExecutorTest::tearDown(); } +std::unique_ptr<SyncSourceResolver> SyncSourceResolverTest::_makeResolver( + const OpTime& lastOpTimeFetched, const OpTime& requiredOpTime) { + return stdx::make_unique<SyncSourceResolver>( + _executorProxy.get(), + _selector.get(), + lastOpTimeFetched, + requiredOpTime, + [this](const SyncSourceResolverResponse& response) { _onCompletion(response); }); +} + const NamespaceString nss("local.oplog.rs"); BSONObj makeCursorResponse(CursorId cursorId, @@ -168,36 +176,57 @@ BSONObj makeCursorResponse(CursorId cursorId, TEST_F(SyncSourceResolverTest, InvalidConstruction) { SyncSourceSelectorMock selector; const OpTime lastOpTimeFetched(Timestamp(Seconds(100), 1U), 1LL); + const OpTime requiredOpTime; auto onCompletion = [](const SyncSourceResolverResponse&) {}; // Null task executor. ASSERT_THROWS_CODE_AND_WHAT( - SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, onCompletion), + SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, requiredOpTime, onCompletion), UserException, ErrorCodes::BadValue, "task executor cannot be null"); // Null sync source selector. ASSERT_THROWS_CODE_AND_WHAT( - SyncSourceResolver(&getExecutor(), nullptr, lastOpTimeFetched, onCompletion), + SyncSourceResolver( + &getExecutor(), nullptr, lastOpTimeFetched, requiredOpTime, onCompletion), UserException, ErrorCodes::BadValue, "sync source selector cannot be null"); // Null last fetched optime. ASSERT_THROWS_CODE_AND_WHAT( - SyncSourceResolver(&getExecutor(), &selector, OpTime(), onCompletion), + SyncSourceResolver(&getExecutor(), &selector, OpTime(), requiredOpTime, onCompletion), UserException, ErrorCodes::BadValue, "last fetched optime cannot be null"); - // Null callback function. + // If provided, required optime must be more recent than last fetched optime. + ASSERT_THROWS_CODE_AND_WHAT( + SyncSourceResolver(&getExecutor(), + &selector, + lastOpTimeFetched, + OpTime(Timestamp(Seconds(50), 1U), 1LL), + onCompletion), + UserException, + ErrorCodes::BadValue, + "required optime (if provided) must be more recent than last fetched optime"); ASSERT_THROWS_CODE_AND_WHAT( SyncSourceResolver( - &getExecutor(), &selector, lastOpTimeFetched, SyncSourceResolver::OnCompletionFn()), + &getExecutor(), &selector, lastOpTimeFetched, lastOpTimeFetched, onCompletion), UserException, ErrorCodes::BadValue, - "callback function cannot be null"); + "required optime (if provided) must be more recent than last fetched optime"); + + // Null callback function. + ASSERT_THROWS_CODE_AND_WHAT(SyncSourceResolver(&getExecutor(), + &selector, + lastOpTimeFetched, + requiredOpTime, + SyncSourceResolver::OnCompletionFn()), + UserException, + ErrorCodes::BadValue, + "callback function cannot be null"); } TEST_F(SyncSourceResolverTest, StartupReturnsIllegalOperationIfAlreadyActive) { @@ -302,6 +331,26 @@ TEST_F(SyncSourceResolverTest, } TEST_F(SyncSourceResolverTest, + SyncSourceResolverTransitionsToCompleteWhenFinishCallbackThrowsException) { + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + _onCompletion = [this](const SyncSourceResolverResponse& response) { + _response = response; + uassert(ErrorCodes::InternalError, "", false); + }; + + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, ResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingFetcher) { ASSERT_OK(_resolver->startup()); ASSERT_TRUE(_resolver->isActive()); @@ -530,4 +579,305 @@ TEST_F(SyncSourceResolverTest, ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); } +/** + * Constructs and schedules a network interface response using the given documents to the required + * optime on the sync source candidate. + */ +void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net, + SyncSourceSelectorMock* selector, + HostAndPort currentSyncSource, + OpTime requiredOpTime, + std::vector<BSONObj> docs) { + executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto request = net->scheduleSuccessfulResponse(makeCursorResponse(0, nss, docs)); + ASSERT_EQUALS(currentSyncSource, request.target); + ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.db(), request.dbname); + ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout); + auto firstElement = request.cmdObj.firstElement(); + ASSERT_EQUALS("find"_sd, firstElement.fieldNameStringData()); + ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.coll(), firstElement.String()); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + auto filter = request.cmdObj.getObjectField("filter"); + ASSERT_TRUE(filter.hasField("ts")) << request.cmdObj; + auto tsFilter = filter.getObjectField("ts"); + ASSERT_TRUE(tsFilter.hasField("$gte")) << request.cmdObj; + ASSERT_EQUALS(requiredOpTime.getTimestamp(), tsFilter["$gte"].timestamp()) << request.cmdObj; + ASSERT_TRUE(tsFilter.hasField("$lte")) << request.cmdObj; + ASSERT_EQUALS(requiredOpTime.getTimestamp(), tsFilter["$lte"].timestamp()) << request.cmdObj; + + + net->runReadyNetworkOperations(); +} + +/** + * Constructs and schedules a network interface response using the given optime to the required + * optime on the sync source candidate. + */ +void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net, + SyncSourceSelectorMock* selector, + HostAndPort currentSyncSource, + OpTime requiredOpTime) { + _scheduleRequiredOpTimeFetcherResponse( + net, + selector, + currentSyncSource, + requiredOpTime, + {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm())}); +} + +const OpTime requiredOpTime(Timestamp(200, 1U), 1LL); + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverWillCheckForRequiredOpTimeUsingOplogReplayQueryIfRequiredOpTimeIsProvided) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRemoteTermIsUninitialized) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse( + getNet(), + _selector.get(), + candidate1, + requiredOpTime, + {BSON("ts" << requiredOpTime.getTimestamp() << "t" << OpTime::kUninitializedTerm)}); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRequiredOpTimesTermIsUninitialized) { + auto requireOpTimeWithUninitializedTerm = + OpTime(requiredOpTime.getTimestamp(), OpTime::kUninitializedTerm); + _resolver = _makeResolver(lastOpTimeFetched, requireOpTimeWithUninitializedTerm); + + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRequiredOpTimeFetcherResponse( + getNet(), _selector.get(), candidate2, requireOpTimeWithUninitializedTerm); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimeIsNotFoundInRemoteOplog) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse( + getNet(), _selector.get(), candidate1, requiredOpTime, {}); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimesTermIsNotFoundInRemoteOplog) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse( + getNet(), + _selector.get(), + candidate1, + requiredOpTime, + {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm() + 1)}); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsScheduleErrorWhenSchedulingRequiredOpTimeFindCommandFails) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + _shouldFailRequest = [](const executor::RemoteCommandRequest& request) { + return request.cmdObj.getBoolField("oplogReplay"); + }; + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus); +} + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingRequiredOpTimeFetcher) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _resolver->shutdown(); + executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations(); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus); +} + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingRequiredOpTimeFetcher) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + getExecutor().shutdown(); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus); +} + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRequiredOpTimeCommand) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0)); + + _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort()); + + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration, + _selector->blacklistUntil); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus)); +} + } // namespace |