summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_source_resolver_test.cpp
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-10-30 21:26:25 -0400
committerBenety Goh <benety@mongodb.com>2016-11-02 18:57:01 -0400
commit053a2dbc9c503bf55ba52bc3c937bc138cc67b8a (patch)
treec679dc006202681f545b21c79267be079f670983 /src/mongo/db/repl/sync_source_resolver_test.cpp
parent16b2afc48459fbd2670c5e824bd662d6ac7b584f (diff)
downloadmongo-053a2dbc9c503bf55ba52bc3c937bc138cc67b8a.tar.gz
SERVER-25145 SyncSourceResolver selects sync sources based on required optime if given
Diffstat (limited to 'src/mongo/db/repl/sync_source_resolver_test.cpp')
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp372
1 files changed, 361 insertions, 11 deletions
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index a01a59ffff6..f94da2e3eb9 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -97,6 +97,8 @@ private:
void tearDown() override;
protected:
+ std::unique_ptr<SyncSourceResolver> _makeResolver(const OpTime& lastOpTimeFetched,
+ const OpTime& requiredOpTime);
TaskExecutorWithFailureInScheduleRemoteCommand::ShouldFailRequestFn _shouldFailRequest;
std::unique_ptr<TaskExecutorWithFailureInScheduleRemoteCommand> _executorProxy;
@@ -122,11 +124,7 @@ void SyncSourceResolverTest::setUp() {
_onCompletion = [this](const SyncSourceResolverResponse& response) { _response = response; };
_selector = stdx::make_unique<SyncSourceSelectorMock>();
- _resolver = stdx::make_unique<SyncSourceResolver>(
- _executorProxy.get(),
- _selector.get(),
- lastOpTimeFetched,
- [this](const SyncSourceResolverResponse& response) { _onCompletion(response); });
+ _resolver = _makeResolver(lastOpTimeFetched, OpTime());
launchExecutorThread();
}
@@ -142,6 +140,16 @@ void SyncSourceResolverTest::tearDown() {
executor::ThreadPoolExecutorTest::tearDown();
}
+std::unique_ptr<SyncSourceResolver> SyncSourceResolverTest::_makeResolver(
+ const OpTime& lastOpTimeFetched, const OpTime& requiredOpTime) {
+ return stdx::make_unique<SyncSourceResolver>(
+ _executorProxy.get(),
+ _selector.get(),
+ lastOpTimeFetched,
+ requiredOpTime,
+ [this](const SyncSourceResolverResponse& response) { _onCompletion(response); });
+}
+
const NamespaceString nss("local.oplog.rs");
BSONObj makeCursorResponse(CursorId cursorId,
@@ -168,36 +176,57 @@ BSONObj makeCursorResponse(CursorId cursorId,
TEST_F(SyncSourceResolverTest, InvalidConstruction) {
SyncSourceSelectorMock selector;
const OpTime lastOpTimeFetched(Timestamp(Seconds(100), 1U), 1LL);
+ const OpTime requiredOpTime;
auto onCompletion = [](const SyncSourceResolverResponse&) {};
// Null task executor.
ASSERT_THROWS_CODE_AND_WHAT(
- SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, onCompletion),
+ SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, requiredOpTime, onCompletion),
UserException,
ErrorCodes::BadValue,
"task executor cannot be null");
// Null sync source selector.
ASSERT_THROWS_CODE_AND_WHAT(
- SyncSourceResolver(&getExecutor(), nullptr, lastOpTimeFetched, onCompletion),
+ SyncSourceResolver(
+ &getExecutor(), nullptr, lastOpTimeFetched, requiredOpTime, onCompletion),
UserException,
ErrorCodes::BadValue,
"sync source selector cannot be null");
// Null last fetched optime.
ASSERT_THROWS_CODE_AND_WHAT(
- SyncSourceResolver(&getExecutor(), &selector, OpTime(), onCompletion),
+ SyncSourceResolver(&getExecutor(), &selector, OpTime(), requiredOpTime, onCompletion),
UserException,
ErrorCodes::BadValue,
"last fetched optime cannot be null");
- // Null callback function.
+ // If provided, required optime must be more recent than last fetched optime.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ SyncSourceResolver(&getExecutor(),
+ &selector,
+ lastOpTimeFetched,
+ OpTime(Timestamp(Seconds(50), 1U), 1LL),
+ onCompletion),
+ UserException,
+ ErrorCodes::BadValue,
+ "required optime (if provided) must be more recent than last fetched optime");
ASSERT_THROWS_CODE_AND_WHAT(
SyncSourceResolver(
- &getExecutor(), &selector, lastOpTimeFetched, SyncSourceResolver::OnCompletionFn()),
+ &getExecutor(), &selector, lastOpTimeFetched, lastOpTimeFetched, onCompletion),
UserException,
ErrorCodes::BadValue,
- "callback function cannot be null");
+ "required optime (if provided) must be more recent than last fetched optime");
+
+ // Null callback function.
+ ASSERT_THROWS_CODE_AND_WHAT(SyncSourceResolver(&getExecutor(),
+ &selector,
+ lastOpTimeFetched,
+ requiredOpTime,
+ SyncSourceResolver::OnCompletionFn()),
+ UserException,
+ ErrorCodes::BadValue,
+ "callback function cannot be null");
}
TEST_F(SyncSourceResolverTest, StartupReturnsIllegalOperationIfAlreadyActive) {
@@ -302,6 +331,26 @@ TEST_F(SyncSourceResolverTest,
}
TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverTransitionsToCompleteWhenFinishCallbackThrowsException) {
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ _onCompletion = [this](const SyncSourceResolverResponse& response) {
+ _response = response;
+ uassert(ErrorCodes::InternalError, "", false);
+ };
+
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
ResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingFetcher) {
ASSERT_OK(_resolver->startup());
ASSERT_TRUE(_resolver->isActive());
@@ -530,4 +579,305 @@ TEST_F(SyncSourceResolverTest,
ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
}
+/**
+ * Constructs and schedules a network interface response using the given documents to the required
+ * optime on the sync source candidate.
+ */
+void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net,
+ SyncSourceSelectorMock* selector,
+ HostAndPort currentSyncSource,
+ OpTime requiredOpTime,
+ std::vector<BSONObj> docs) {
+ executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(0, nss, docs));
+ ASSERT_EQUALS(currentSyncSource, request.target);
+ ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.db(), request.dbname);
+ ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout);
+ auto firstElement = request.cmdObj.firstElement();
+ ASSERT_EQUALS("find"_sd, firstElement.fieldNameStringData());
+ ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.coll(), firstElement.String());
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ auto filter = request.cmdObj.getObjectField("filter");
+ ASSERT_TRUE(filter.hasField("ts")) << request.cmdObj;
+ auto tsFilter = filter.getObjectField("ts");
+ ASSERT_TRUE(tsFilter.hasField("$gte")) << request.cmdObj;
+ ASSERT_EQUALS(requiredOpTime.getTimestamp(), tsFilter["$gte"].timestamp()) << request.cmdObj;
+ ASSERT_TRUE(tsFilter.hasField("$lte")) << request.cmdObj;
+ ASSERT_EQUALS(requiredOpTime.getTimestamp(), tsFilter["$lte"].timestamp()) << request.cmdObj;
+
+
+ net->runReadyNetworkOperations();
+}
+
+/**
+ * Constructs and schedules a network interface response using the given optime to the required
+ * optime on the sync source candidate.
+ */
+void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net,
+ SyncSourceSelectorMock* selector,
+ HostAndPort currentSyncSource,
+ OpTime requiredOpTime) {
+ _scheduleRequiredOpTimeFetcherResponse(
+ net,
+ selector,
+ currentSyncSource,
+ requiredOpTime,
+ {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm())});
+}
+
+const OpTime requiredOpTime(Timestamp(200, 1U), 1LL);
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverWillCheckForRequiredOpTimeUsingOplogReplayQueryIfRequiredOpTimeIsProvided) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRemoteTermIsUninitialized) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(
+ getNet(),
+ _selector.get(),
+ candidate1,
+ requiredOpTime,
+ {BSON("ts" << requiredOpTime.getTimestamp() << "t" << OpTime::kUninitializedTerm)});
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRequiredOpTimesTermIsUninitialized) {
+ auto requireOpTimeWithUninitializedTerm =
+ OpTime(requiredOpTime.getTimestamp(), OpTime::kUninitializedTerm);
+ _resolver = _makeResolver(lastOpTimeFetched, requireOpTimeWithUninitializedTerm);
+
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime);
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRequiredOpTimeFetcherResponse(
+ getNet(), _selector.get(), candidate2, requireOpTimeWithUninitializedTerm);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimeIsNotFoundInRemoteOplog) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(
+ getNet(), _selector.get(), candidate1, requiredOpTime, {});
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimesTermIsNotFoundInRemoteOplog) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(
+ getNet(),
+ _selector.get(),
+ candidate1,
+ requiredOpTime,
+ {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm() + 1)});
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsScheduleErrorWhenSchedulingRequiredOpTimeFindCommandFails) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ _shouldFailRequest = [](const executor::RemoteCommandRequest& request) {
+ return request.cmdObj.getBoolField("oplogReplay");
+ };
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus);
+}
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingRequiredOpTimeFetcher) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _resolver->shutdown();
+ executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus);
+}
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingRequiredOpTimeFetcher) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ getExecutor().shutdown();
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus);
+}
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRequiredOpTimeCommand) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0));
+
+ _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort());
+
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus));
+}
+
} // namespace