summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/bgsync.cpp2
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp120
-rw-r--r--src/mongo/db/repl/sync_source_resolver.h40
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp372
4 files changed, 522 insertions, 12 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index d190db0e2e5..63500f6994e 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -285,6 +285,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
HostAndPort source;
SyncSourceResolverResponse syncSourceResp;
SyncSourceResolver* syncSourceResolver;
+ OpTime minValid;
{
stdx::unique_lock<stdx::mutex> lock(_mutex);
lastOpTimeFetched = _lastOpTimeFetched;
@@ -293,6 +294,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
_replicationCoordinatorExternalState->getTaskExecutor(),
_replCoord,
lastOpTimeFetched,
+ minValid,
[&syncSourceResp](const SyncSourceResolverResponse& resp) { syncSourceResp = resp; });
syncSourceResolver = _syncSourceResolver.get();
}
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index cb7b272be20..c3c563e6649 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -52,19 +52,25 @@ const Seconds SyncSourceResolver::kOplogEmptyBlacklistDuration(10);
const Seconds SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration(10);
const Seconds SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration(10);
const Minutes SyncSourceResolver::kTooStaleBlacklistDuration(1);
+const Seconds SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration(60);
SyncSourceResolver::SyncSourceResolver(executor::TaskExecutor* taskExecutor,
SyncSourceSelector* syncSourceSelector,
const OpTime& lastOpTimeFetched,
+ const OpTime& requiredOpTime,
const OnCompletionFn& onCompletion)
: _taskExecutor(taskExecutor),
_syncSourceSelector(syncSourceSelector),
_lastOpTimeFetched(lastOpTimeFetched),
+ _requiredOpTime(requiredOpTime),
_onCompletion(onCompletion) {
uassert(ErrorCodes::BadValue, "task executor cannot be null", taskExecutor);
uassert(ErrorCodes::BadValue, "sync source selector cannot be null", syncSourceSelector);
uassert(
ErrorCodes::BadValue, "last fetched optime cannot be null", !lastOpTimeFetched.isNull());
+ uassert(ErrorCodes::BadValue,
+ "required optime (if provided) must be more recent than last fetched optime",
+ requiredOpTime.isNull() || requiredOpTime > lastOpTimeFetched);
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
}
@@ -157,6 +163,26 @@ std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher(
kFetcherTimeout);
}
+std::unique_ptr<Fetcher> SyncSourceResolver::_makeRequiredOpTimeFetcher(HostAndPort candidate,
+ OpTime earliestOpTimeSeen) {
+ // This query is structured so that it is executed on the sync source using the oplog
+ // start hack (oplogReplay=true and $gt/$gte predicate over "ts").
+ return stdx::make_unique<Fetcher>(
+ _taskExecutor,
+ candidate,
+ kLocalOplogNss.db().toString(),
+ BSON("find" << kLocalOplogNss.coll() << "oplogReplay" << true << "filter"
+ << BSON("ts" << BSON("$gte" << _requiredOpTime.getTimestamp() << "$lte"
+ << _requiredOpTime.getTimestamp()))),
+ stdx::bind(&SyncSourceResolver::_requiredOpTimeFetcherCallback,
+ this,
+ stdx::placeholders::_1,
+ candidate,
+ earliestOpTimeSeen),
+ rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ kFetcherTimeout);
+}
+
Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
// Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task
@@ -269,6 +295,93 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback(
return;
}
+ // Schedules fetcher to look for '_requiredOpTime' in the remote oplog.
+ if (!_requiredOpTime.isNull()) {
+ auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen));
+ if (!status.isOK()) {
+ _finishCallback(status);
+ }
+ return;
+ }
+
+ _finishCallback(candidate);
+}
+
+Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
+ const Fetcher::QueryResponse& queryResponse) {
+ if (queryResponse.documents.empty()) {
+ return Status(
+ ErrorCodes::NoMatchingDocument,
+ "remote oplog does not contain entry with optime matching our required optime");
+ }
+ const OplogEntry oplogEntry(queryResponse.documents.front());
+ const auto opTime = oplogEntry.getOpTime();
+ if (_requiredOpTime != opTime) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "remote oplog contain entry with matching timestamp "
+ << opTime.getTimestamp().toString()
+ << " but optime "
+ << opTime.toString()
+ << " does not "
+ "match our required optime");
+ }
+ if (_requiredOpTime.getTerm() != opTime.getTerm()) {
+ return Status(ErrorCodes::BadValue,
+ str::stream() << "remote oplog contain entry with term " << opTime.getTerm()
+ << " that does not "
+ "match the term in our required optime");
+ }
+ return Status::OK();
+}
+
+void SyncSourceResolver::_requiredOpTimeFetcherCallback(
+ const StatusWith<Fetcher::QueryResponse>& queryResult,
+ HostAndPort candidate,
+ OpTime earliestOpTimeSeen) {
+ if (_isShuttingDown()) {
+ _finishCallback(Status(ErrorCodes::CallbackCanceled,
+ str::stream() << "sync source resolver shut down while looking for "
+ "required optime "
+ << _requiredOpTime.toString()
+ << " in candidate's oplog: "
+ << candidate));
+ return;
+ }
+
+ if (ErrorCodes::CallbackCanceled == queryResult.getStatus()) {
+ _finishCallback(queryResult.getStatus());
+ return;
+ }
+
+ if (!queryResult.isOK()) {
+ // We got an error.
+ const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration;
+ log() << "Blacklisting " << candidate << " due to required optime fetcher error: '"
+ << queryResult.getStatus() << "' for " << kFetcherErrorBlacklistDuration
+ << " until: " << until << ". required optime: " << _requiredOpTime;
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+
+ _chooseAndProbeNextSyncSource(earliestOpTimeSeen);
+ return;
+ }
+
+ const auto& queryResponse = queryResult.getValue();
+ auto status = _compareRequiredOpTimeWithQueryResponse(queryResponse);
+ if (!status.isOK()) {
+ const auto until = _taskExecutor->now() + kNoRequiredOpTimeBlacklistDuration;
+ warning() << "We cannot use " << candidate.toString()
+ << " as a sync source because it does not contain the necessary "
+ "operations for us to reach a consistent state: "
+ << status << " last fetched optime: " << _lastOpTimeFetched
+ << ". required optime: " << _requiredOpTime
+ << ". Blacklisting this sync source for " << kNoRequiredOpTimeBlacklistDuration
+ << " until: " << until;
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+
+ _chooseAndProbeNextSyncSource(earliestOpTimeSeen);
+ return;
+ }
+
_finishCallback(candidate);
}
@@ -305,7 +418,12 @@ Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) {
}
Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) {
- _onCompletion(response);
+ try {
+ _onCompletion(response);
+ } catch (...) {
+ warning() << "sync source resolver finish callback threw exception: "
+ << exceptionToStatus();
+ }
stdx::lock_guard<stdx::mutex> lock(_mutex);
invariant(_state != State::kComplete);
diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h
index e0365c7ad1a..099b56994e2 100644
--- a/src/mongo/db/repl/sync_source_resolver.h
+++ b/src/mongo/db/repl/sync_source_resolver.h
@@ -96,6 +96,7 @@ public:
static const Seconds kFirstOplogEntryEmptyBlacklistDuration;
static const Seconds kFirstOplogEntryNullTimestampBlacklistDuration;
static const Minutes kTooStaleBlacklistDuration;
+ static const Seconds kNoRequiredOpTimeBlacklistDuration;
/**
* Callback function to report final status of resolving sync source.
@@ -105,6 +106,7 @@ public:
SyncSourceResolver(executor::TaskExecutor* taskExecutor,
SyncSourceSelector* syncSourceSelector,
const OpTime& lastOpTimeFetched,
+ const OpTime& requiredOpTime,
const OnCompletionFn& onCompletion);
virtual ~SyncSourceResolver();
@@ -144,6 +146,12 @@ private:
OpTime earliestOpTimeSeen);
/**
+ * Creates fetcher to check the remote oplog for '_requiredOpTime'.
+ */
+ std::unique_ptr<Fetcher> _makeRequiredOpTimeFetcher(HostAndPort candidate,
+ OpTime earliestOpTimeSeen);
+
+ /**
* Schedules fetcher to read oplog on sync source.
* Saves fetcher in '_fetcher' on success.
*/
@@ -164,6 +172,18 @@ private:
OpTime earliestOpTimeSeen);
/**
+ * Checks query response for required optime.
+ */
+ Status _compareRequiredOpTimeWithQueryResponse(const Fetcher::QueryResponse& queryResponse);
+
+ /**
+ * Callback for checking if the remote oplog contains '_requiredOpTime'.
+ */
+ void _requiredOpTimeFetcherCallback(const StatusWith<Fetcher::QueryResponse>& queryResult,
+ HostAndPort candidate,
+ OpTime earliestOpTimeSeen);
+
+ /**
* Obtains new sync source candidate and schedules remote command to fetcher first oplog entry.
* May transition state to Complete.
* Returns status that could be used as result for startup().
@@ -177,14 +197,34 @@ private:
Status _finishCallback(StatusWith<HostAndPort> result);
Status _finishCallback(const SyncSourceResolverResponse& response);
+ // Executor used to send remote commands to sync source candidates.
executor::TaskExecutor* const _taskExecutor;
+
+ // Sync source selector used to obtain sync source candidates and for us to blacklist non-viable
+ // candidates.
SyncSourceSelector* const _syncSourceSelector;
+
+ // A viable sync source must contain a starting oplog entry with a timestamp equal or earlier
+ // than the timestamp in '_lastOpTimeFetched'.
const OpTime _lastOpTimeFetched;
+
+ // If '_requiredOpTime' is not null, a viable sync source must contain an oplog entry with an
+ // optime equal to this value.
+ const OpTime _requiredOpTime;
+
+ // This is invoked exactly once after startup. The caller gets the results of the sync source
+ // resolver via this callback in a SyncSourceResolverResponse struct when the resolver finishes.
const OnCompletionFn _onCompletion;
// Protects members of this sync source resolver.
mutable stdx::mutex _mutex;
mutable stdx::condition_variable _condition;
+
+ // State transitions:
+ // PreStart --> Running --> ShuttingDown --> Complete
+ // It is possible to skip intermediate states. For example,
+ // Calling shutdown() when the resolver has not started will transition from PreStart directly
+ // to Complete.
enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
State _state = State::kPreStart;
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index a01a59ffff6..f94da2e3eb9 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -97,6 +97,8 @@ private:
void tearDown() override;
protected:
+ std::unique_ptr<SyncSourceResolver> _makeResolver(const OpTime& lastOpTimeFetched,
+ const OpTime& requiredOpTime);
TaskExecutorWithFailureInScheduleRemoteCommand::ShouldFailRequestFn _shouldFailRequest;
std::unique_ptr<TaskExecutorWithFailureInScheduleRemoteCommand> _executorProxy;
@@ -122,11 +124,7 @@ void SyncSourceResolverTest::setUp() {
_onCompletion = [this](const SyncSourceResolverResponse& response) { _response = response; };
_selector = stdx::make_unique<SyncSourceSelectorMock>();
- _resolver = stdx::make_unique<SyncSourceResolver>(
- _executorProxy.get(),
- _selector.get(),
- lastOpTimeFetched,
- [this](const SyncSourceResolverResponse& response) { _onCompletion(response); });
+ _resolver = _makeResolver(lastOpTimeFetched, OpTime());
launchExecutorThread();
}
@@ -142,6 +140,16 @@ void SyncSourceResolverTest::tearDown() {
executor::ThreadPoolExecutorTest::tearDown();
}
+std::unique_ptr<SyncSourceResolver> SyncSourceResolverTest::_makeResolver(
+ const OpTime& lastOpTimeFetched, const OpTime& requiredOpTime) {
+ return stdx::make_unique<SyncSourceResolver>(
+ _executorProxy.get(),
+ _selector.get(),
+ lastOpTimeFetched,
+ requiredOpTime,
+ [this](const SyncSourceResolverResponse& response) { _onCompletion(response); });
+}
+
const NamespaceString nss("local.oplog.rs");
BSONObj makeCursorResponse(CursorId cursorId,
@@ -168,36 +176,57 @@ BSONObj makeCursorResponse(CursorId cursorId,
TEST_F(SyncSourceResolverTest, InvalidConstruction) {
SyncSourceSelectorMock selector;
const OpTime lastOpTimeFetched(Timestamp(Seconds(100), 1U), 1LL);
+ const OpTime requiredOpTime;
auto onCompletion = [](const SyncSourceResolverResponse&) {};
// Null task executor.
ASSERT_THROWS_CODE_AND_WHAT(
- SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, onCompletion),
+ SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, requiredOpTime, onCompletion),
UserException,
ErrorCodes::BadValue,
"task executor cannot be null");
// Null sync source selector.
ASSERT_THROWS_CODE_AND_WHAT(
- SyncSourceResolver(&getExecutor(), nullptr, lastOpTimeFetched, onCompletion),
+ SyncSourceResolver(
+ &getExecutor(), nullptr, lastOpTimeFetched, requiredOpTime, onCompletion),
UserException,
ErrorCodes::BadValue,
"sync source selector cannot be null");
// Null last fetched optime.
ASSERT_THROWS_CODE_AND_WHAT(
- SyncSourceResolver(&getExecutor(), &selector, OpTime(), onCompletion),
+ SyncSourceResolver(&getExecutor(), &selector, OpTime(), requiredOpTime, onCompletion),
UserException,
ErrorCodes::BadValue,
"last fetched optime cannot be null");
- // Null callback function.
+ // If provided, required optime must be more recent than last fetched optime.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ SyncSourceResolver(&getExecutor(),
+ &selector,
+ lastOpTimeFetched,
+ OpTime(Timestamp(Seconds(50), 1U), 1LL),
+ onCompletion),
+ UserException,
+ ErrorCodes::BadValue,
+ "required optime (if provided) must be more recent than last fetched optime");
ASSERT_THROWS_CODE_AND_WHAT(
SyncSourceResolver(
- &getExecutor(), &selector, lastOpTimeFetched, SyncSourceResolver::OnCompletionFn()),
+ &getExecutor(), &selector, lastOpTimeFetched, lastOpTimeFetched, onCompletion),
UserException,
ErrorCodes::BadValue,
- "callback function cannot be null");
+ "required optime (if provided) must be more recent than last fetched optime");
+
+ // Null callback function.
+ ASSERT_THROWS_CODE_AND_WHAT(SyncSourceResolver(&getExecutor(),
+ &selector,
+ lastOpTimeFetched,
+ requiredOpTime,
+ SyncSourceResolver::OnCompletionFn()),
+ UserException,
+ ErrorCodes::BadValue,
+ "callback function cannot be null");
}
TEST_F(SyncSourceResolverTest, StartupReturnsIllegalOperationIfAlreadyActive) {
@@ -302,6 +331,26 @@ TEST_F(SyncSourceResolverTest,
}
TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverTransitionsToCompleteWhenFinishCallbackThrowsException) {
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ _onCompletion = [this](const SyncSourceResolverResponse& response) {
+ _response = response;
+ uassert(ErrorCodes::InternalError, "", false);
+ };
+
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
ResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingFetcher) {
ASSERT_OK(_resolver->startup());
ASSERT_TRUE(_resolver->isActive());
@@ -530,4 +579,305 @@ TEST_F(SyncSourceResolverTest,
ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
}
+/**
+ * Constructs and schedules a network interface response using the given documents to the required
+ * optime on the sync source candidate.
+ */
+void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net,
+ SyncSourceSelectorMock* selector,
+ HostAndPort currentSyncSource,
+ OpTime requiredOpTime,
+ std::vector<BSONObj> docs) {
+ executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(0, nss, docs));
+ ASSERT_EQUALS(currentSyncSource, request.target);
+ ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.db(), request.dbname);
+ ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout);
+ auto firstElement = request.cmdObj.firstElement();
+ ASSERT_EQUALS("find"_sd, firstElement.fieldNameStringData());
+ ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.coll(), firstElement.String());
+ ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
+ auto filter = request.cmdObj.getObjectField("filter");
+ ASSERT_TRUE(filter.hasField("ts")) << request.cmdObj;
+ auto tsFilter = filter.getObjectField("ts");
+ ASSERT_TRUE(tsFilter.hasField("$gte")) << request.cmdObj;
+ ASSERT_EQUALS(requiredOpTime.getTimestamp(), tsFilter["$gte"].timestamp()) << request.cmdObj;
+ ASSERT_TRUE(tsFilter.hasField("$lte")) << request.cmdObj;
+ ASSERT_EQUALS(requiredOpTime.getTimestamp(), tsFilter["$lte"].timestamp()) << request.cmdObj;
+
+
+ net->runReadyNetworkOperations();
+}
+
+/**
+ * Constructs and schedules a network interface response using the given optime to the required
+ * optime on the sync source candidate.
+ */
+void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net,
+ SyncSourceSelectorMock* selector,
+ HostAndPort currentSyncSource,
+ OpTime requiredOpTime) {
+ _scheduleRequiredOpTimeFetcherResponse(
+ net,
+ selector,
+ currentSyncSource,
+ requiredOpTime,
+ {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm())});
+}
+
+const OpTime requiredOpTime(Timestamp(200, 1U), 1LL);
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverWillCheckForRequiredOpTimeUsingOplogReplayQueryIfRequiredOpTimeIsProvided) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRemoteTermIsUninitialized) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(
+ getNet(),
+ _selector.get(),
+ candidate1,
+ requiredOpTime,
+ {BSON("ts" << requiredOpTime.getTimestamp() << "t" << OpTime::kUninitializedTerm)});
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverRejectsRemoteOpTimeWhenCheckingRequiredOpTimeIfRequiredOpTimesTermIsUninitialized) {
+ auto requireOpTimeWithUninitializedTerm =
+ OpTime(requiredOpTime.getTimestamp(), OpTime::kUninitializedTerm);
+ _resolver = _makeResolver(lastOpTimeFetched, requireOpTimeWithUninitializedTerm);
+
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate1, requiredOpTime);
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRequiredOpTimeFetcherResponse(
+ getNet(), _selector.get(), candidate2, requireOpTimeWithUninitializedTerm);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimeIsNotFoundInRemoteOplog) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(
+ getNet(), _selector.get(), candidate1, requiredOpTime, {});
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesIfRequiredOpTimesTermIsNotFoundInRemoteOplog) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleRequiredOpTimeFetcherResponse(
+ getNet(),
+ _selector.get(),
+ candidate1,
+ requiredOpTime,
+ {BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm() + 1)});
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kNoRequiredOpTimeBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 0));
+ _scheduleRequiredOpTimeFetcherResponse(getNet(), _selector.get(), candidate2, requiredOpTime);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsScheduleErrorWhenSchedulingRequiredOpTimeFindCommandFails) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ _shouldFailRequest = [](const executor::RemoteCommandRequest& request) {
+ return request.cmdObj.getBoolField("oplogReplay");
+ };
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus);
+}
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingRequiredOpTimeFetcher) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ _resolver->shutdown();
+ executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus);
+}
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingRequiredOpTimeFetcher) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+
+ getExecutor().shutdown();
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus);
+}
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverReturnsEmptyHostIfNoViableHostExistsAfterNetworkErrorOnRequiredOpTimeCommand) {
+ _resolver = _makeResolver(lastOpTimeFetched, requiredOpTime);
+
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate1, Timestamp(10, 0));
+
+ _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort());
+
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus));
+}
+
} // namespace