diff options
author | Benety Goh <benety@mongodb.com> | 2016-10-30 21:26:25 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-11-02 18:57:01 -0400 |
commit | 053a2dbc9c503bf55ba52bc3c937bc138cc67b8a (patch) | |
tree | c679dc006202681f545b21c79267be079f670983 /src/mongo/db/repl/sync_source_resolver_test.cpp | |
parent | 16b2afc48459fbd2670c5e824bd662d6ac7b584f (diff) | |
download | mongo-053a2dbc9c503bf55ba52bc3c937bc138cc67b8a.tar.gz |
SERVER-25145 SyncSourceResolver selects sync sources based on required optime if given
Diffstat (limited to 'src/mongo/db/repl/sync_source_resolver_test.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver_test.cpp | 372 |
1 files changed, 361 insertions, 11 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index a01a59ffff6..f94da2e3eb9 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -97,6 +97,8 @@ private: void tearDown() override; protected: + std::unique_ptr<SyncSourceResolver> _makeResolver(const OpTime& lastOpTimeFetched, + const OpTime& requiredOpTime); TaskExecutorWithFailureInScheduleRemoteCommand::ShouldFailRequestFn _shouldFailRequest; std::unique_ptr<TaskExecutorWithFailureInScheduleRemoteCommand> _executorProxy; @@ -122,11 +124,7 @@ void SyncSourceResolverTest::setUp() { _onCompletion = [this](const SyncSourceResolverResponse& response) { _response = response; }; _selector = stdx::make_unique<SyncSourceSelectorMock>(); - _resolver = stdx::make_unique<SyncSourceResolver>( - _executorProxy.get(), - _selector.get(), - lastOpTimeFetched, - [this](const SyncSourceResolverResponse& response) { _onCompletion(response); }); + _resolver = _makeResolver(lastOpTimeFetched, OpTime()); launchExecutorThread(); } @@ -142,6 +140,16 @@ void SyncSourceResolverTest::tearDown() { executor::ThreadPoolExecutorTest::tearDown(); } +std::unique_ptr<SyncSourceResolver> SyncSourceResolverTest::_makeResolver( + const OpTime& lastOpTimeFetched, const OpTime& requiredOpTime) { + return stdx::make_unique<SyncSourceResolver>( + _executorProxy.get(), + _selector.get(), + lastOpTimeFetched, + requiredOpTime, + [this](const SyncSourceResolverResponse& response) { _onCompletion(response); }); +} + const NamespaceString nss("local.oplog.rs"); BSONObj makeCursorResponse(CursorId cursorId, @@ -168,36 +176,57 @@ BSONObj makeCursorResponse(CursorId cursorId, TEST_F(SyncSourceResolverTest, InvalidConstruction) { SyncSourceSelectorMock selector; const OpTime lastOpTimeFetched(Timestamp(Seconds(100), 1U), 1LL); + const OpTime requiredOpTime; auto onCompletion = [](const SyncSourceResolverResponse&) {}; // Null task executor. ASSERT_THROWS_CODE_AND_WHAT( - SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, onCompletion), + SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, requiredOpTime, onCompletion), UserException, ErrorCodes::BadValue, "task executor cannot be null"); // Null sync source selector. ASSERT_THROWS_CODE_AND_WHAT( - SyncSourceResolver(&getExecutor(), nullptr, lastOpTimeFetched, onCompletion), + SyncSourceResolver( + &getExecutor(), nullptr, lastOpTimeFetched, requiredOpTime, onCompletion), UserException, ErrorCodes::BadValue, "sync source selector cannot be null"); // Null last fetched optime. ASSERT_THROWS_CODE_AND_WHAT( - SyncSourceResolver(&getExecutor(), &selector, OpTime(), onCompletion), + SyncSourceResolver(&getExecutor(), &selector, OpTime(), requiredOpTime, onCompletion), UserException, ErrorCodes::BadValue, "last fetched optime cannot be null"); - // Null callback function. + // If provided, required optime must be more recent than last fetched optime. + ASSERT_THROWS_CODE_AND_WHAT( + SyncSourceResolver(&getExecutor(), + &selector, + lastOpTimeFetched, + OpTime(Timestamp(Seconds(50), 1U), 1LL), + onCompletion), + UserException, + ErrorCodes::BadValue, + "required optime (if provided) must be more recent than last fetched optime"); ASSERT_THROWS_CODE_AND_WHAT( SyncSourceResolver( - &getExecutor(), &selector, lastOpTimeFetched, SyncSourceResolver::OnCompletionFn()), + &getExecutor(), &selector, lastOpTimeFetched, lastOpTimeFetched, onCompletion), UserException, ErrorCodes::BadValue, - "callback function cannot be null"); + "required optime (if provided) must be more recent than last fetched optime"); + + // Null callback function. + ASSERT_THROWS_CODE_AND_WHAT(SyncSourceResolver(&getExecutor(), + &selector, + lastOpTimeFetched, + requiredOpTime, + SyncSourceResolver::OnCompletionFn()), + UserException, + ErrorCodes::BadValue, + "callback function cannot be null"); } TEST_F(SyncSourceResolverTest, StartupReturnsIllegalOperationIfAlreadyActive) { @@ -302,6 +331,26 @@ TEST_F(SyncSourceResolverTest, } TEST_F(SyncSourceResolverTest, + SyncSourceResolverTransitionsToCompleteWhenFinishCallbackThrowsException) { + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + _onCompletion = [this](const SyncSourceResolverResponse& response) { + _response = response; + uassert(ErrorCodes::InternalError, "", false); + }; + + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, ResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingFetcher) { ASSERT_OK(_resolver->startup()); ASSERT_TRUE(_resolver->isActive()); @@ -530,4 +579,305 @@ TEST_F(SyncSourceResolverTest, ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); } +/** + * Constructs and schedules a network interface response using the given documents to the required + * optime on the sync source candidate. + */ +void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net, + SyncSourceSelectorMock* selector, + HostAndPort currentSyncSource, + OpTime requiredOpTime, + std::vector<BSONObj> docs) { + executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto request = net->scheduleSuccessfulResponse(makeCursorResponse(0, nss, docs)); + ASSERT_EQUALS(currentSyncSource, request.target); + ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.db(), request.dbname); + ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout); + auto firstElement = request.cmdObj.firstElement(); + ASSERT_EQUALS("find"_sd, firstElement.fieldNameStringData()); + ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.coll(), firstElement.String()); + ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); + auto filter = request.cmdObj.getObjectField("filter"); + ASSERT_TRUE(filter.hasField("ts")) << request.cmdObj; + auto tsFilter = filter.getObjectField("ts"); + ASSERT_TRUE(tsFilter.hasField("$gte")) << request.cmdObj; + ASSERT_EQUALS(requiredOpTime.getTimestamp(), tsFilter["$gte"].timestamp()) << request.cmdObj; + ASSERT_TRUE(tsFilter.hasField("$lte")) << request.cmdObj; + ASSERT_EQUALS(requiredOpTime.getTimestamp(), tsFilter["$lte"].timestamp()) << request.cmdObj; + + + net->runReadyNetworkOperations(); +} + +/** + * Constructs and schedules a network interface response using the given optime to the required + * optime on the sync source candidate. + */ +void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net, + SyncSourceSelectorMock* selector, + HostAndPort currentSyncSource, + OpTime requiredOpTime) { + _scheduleRequiredOpTimeFetcherResponse( + net, + selector, + currentSyncSource, + requiredOpTime, + {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm())}); +} + +const OpTime requiredOpTime(Timestamp(200, 1U), 1LL); + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverWillCheckForRequiredOpTimeUsingOplogReplayQueryIfRequiredOpTimeIsProvided) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRemoteTermIsUninitialized) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse( + getNet(), + _selector.get(), + candidate1, + requiredOpTime, + {BSON("ts" << requiredOpTime.getTimestamp() << "t" << OpTime::kUninitializedTerm)}); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRequiredOpTimesTermIsUninitialized) { + auto requireOpTimeWithUninitializedTerm = + OpTime(requiredOpTime.getTimestamp(), OpTime::kUninitializedTerm); + _resolver = _makeResolver(lastOpTimeFetched, requireOpTimeWithUninitializedTerm); + + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRequiredOpTimeFetcherResponse( + getNet(), _selector.get(), candidate2, requireOpTimeWithUninitializedTerm); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimeIsNotFoundInRemoteOplog) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse( + getNet(), _selector.get(), candidate1, requiredOpTime, {}); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimesTermIsNotFoundInRemoteOplog) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _scheduleRequiredOpTimeFetcherResponse( + getNet(), + _selector.get(), + candidate1, + requiredOpTime, + {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm() + 1)}); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0)); + _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsScheduleErrorWhenSchedulingRequiredOpTimeFindCommandFails) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + _shouldFailRequest = [](const executor::RemoteCommandRequest& request) { + return request.cmdObj.getBoolField("oplogReplay"); + }; + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus); +} + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingRequiredOpTimeFetcher) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + _resolver->shutdown(); + executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations(); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus); +} + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingRequiredOpTimeFetcher) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0)); + + ASSERT_TRUE(_resolver->isActive()); + + getExecutor().shutdown(); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus); +} + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRequiredOpTimeCommand) { + _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime); + + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0)); + + _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort()); + + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration, + _selector->blacklistUntil); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus)); +} + } // namespace |