summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/SConscript28
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp286
-rw-r--r--src/mongo/db/repl/sync_source_resolver.h125
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp536
-rw-r--r--src/mongo/executor/network_interface_mock.cpp4
-rw-r--r--src/mongo/executor/network_interface_mock.h5
6 files changed, 957 insertions, 27 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index c5588414d58..65be2421f30 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -753,6 +753,33 @@ env.CppUnitTest(
)
env.Library(
+ target='sync_source_resolver',
+ source=[
+ 'sync_source_resolver.cpp',
+ ],
+ LIBDEPS=[
+ 'oplog_entry',
+ 'optime',
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/client/fetcher',
+ '$BUILD_DIR/mongo/db/namespace_string',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
+ '$BUILD_DIR/mongo/rpc/metadata',
+ '$BUILD_DIR/mongo/util/net/hostandport',
+ ],
+)
+
+env.CppUnitTest(
+ target='sync_source_resolver_test',
+ source='sync_source_resolver_test.cpp',
+ LIBDEPS=[
+ 'sync_source_resolver',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/unittest/task_executor_proxy',
+ ],
+)
+
+env.Library(
target='base_cloner_test_fixture',
source=[
'base_cloner_test_fixture.cpp',
@@ -1091,7 +1118,6 @@ env.Library(
"rs_initialsync.cpp",
"rs_sync.cpp",
"sync_source_feedback.cpp",
- "sync_source_resolver.cpp", # TODO(dannenberg) move this into DataReplicator in repl/SConscript.
],
LIBDEPS=[
'bgsync',
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index 0d33207f74a..cb7b272be20 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -26,32 +26,292 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
#include "mongo/platform/basic.h"
#include "mongo/db/repl/sync_source_resolver.h"
-#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/sync_source_selector.h"
+#include "mongo/rpc/metadata/server_selection_metadata.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/destructor_guard.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
namespace mongo {
namespace repl {
-SyncSourceResolverResponse SyncSourceResolver::findSyncSource(OperationContext* txn,
- const OpTime& lastOpTimeFetched) {
- SyncSourceResolverResponse resp;
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+const NamespaceString SyncSourceResolver::kLocalOplogNss("local.oplog.rs");
+const Seconds SyncSourceResolver::kFetcherTimeout(30);
+const Seconds SyncSourceResolver::kFetcherErrorBlacklistDuration(10);
+const Seconds SyncSourceResolver::kOplogEmptyBlacklistDuration(10);
+const Seconds SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration(10);
+const Seconds SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration(10);
+const Minutes SyncSourceResolver::kTooStaleBlacklistDuration(1);
+
+SyncSourceResolver::SyncSourceResolver(executor::TaskExecutor* taskExecutor,
+ SyncSourceSelector* syncSourceSelector,
+ const OpTime& lastOpTimeFetched,
+ const OnCompletionFn& onCompletion)
+ : _taskExecutor(taskExecutor),
+ _syncSourceSelector(syncSourceSelector),
+ _lastOpTimeFetched(lastOpTimeFetched),
+ _onCompletion(onCompletion) {
+ uassert(ErrorCodes::BadValue, "task executor cannot be null", taskExecutor);
+ uassert(ErrorCodes::BadValue, "sync source selector cannot be null", syncSourceSelector);
+ uassert(
+ ErrorCodes::BadValue, "last fetched optime cannot be null", !lastOpTimeFetched.isNull());
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+}
+
+SyncSourceResolver::~SyncSourceResolver() {
+ DESTRUCTOR_GUARD(shutdown(); join(););
+}
+
+bool SyncSourceResolver::isActive() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _isActive_inlock();
+}
+
+bool SyncSourceResolver::_isActive_inlock() const {
+ return State::kRunning == _state || State::kShuttingDown == _state;
+}
+
+Status SyncSourceResolver::startup() {
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (State::kPreStart != _state) {
+ return Status(ErrorCodes::IllegalOperation, "sync source resolver already started");
+ }
+ _state = State::kRunning;
+ }
- resp = _replCoord->selectSyncSource(txn, lastOpTimeFetched);
- if (resp.isOK()) {
- // Store the syncSource for future use.
- _syncSource = resp.getSyncSource();
+ return _chooseAndProbeNextSyncSource(OpTime());
+}
+
+void SyncSourceResolver::shutdown() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ // Transition directly from PreStart to Complete if not started yet.
+ if (State::kPreStart == _state) {
+ _state = State::kComplete;
+ return;
}
- return resp;
+ // Nothing to do if we are already in ShuttingDown or Complete state.
+ if (State::kShuttingDown == _state || State::kComplete == _state) {
+ return;
+ }
+
+ invariant(_state == State::kRunning);
+ _state = State::kShuttingDown;
+
+ if (_fetcher) {
+ _fetcher->shutdown();
+ }
}
-HostAndPort SyncSourceResolver::getActiveSyncSource() {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- return _syncSource;
+void SyncSourceResolver::join() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _condition.wait(lk, [this]() { return !_isActive_inlock(); });
+}
+
+bool SyncSourceResolver::_isShuttingDown() const {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return State::kShuttingDown == _state;
+}
+
+StatusWith<HostAndPort> SyncSourceResolver::_chooseNewSyncSource() {
+ HostAndPort candidate;
+ try {
+ candidate = _syncSourceSelector->chooseNewSyncSource(_lastOpTimeFetched.getTimestamp());
+ } catch (...) {
+ return exceptionToStatus();
+ }
+
+ if (_isShuttingDown()) {
+ return Status(ErrorCodes::CallbackCanceled,
+ str::stream() << "sync source resolver shut down before probing candidate: "
+ << candidate);
+ }
+
+ return candidate;
+}
+
+std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher(
+ HostAndPort candidate, OpTime earliestOpTimeSeen) {
+ return stdx::make_unique<Fetcher>(
+ _taskExecutor,
+ candidate,
+ kLocalOplogNss.db().toString(),
+ BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort" << BSON("$natural" << 1)),
+ stdx::bind(&SyncSourceResolver::_firstOplogEntryFetcherCallback,
+ this,
+ stdx::placeholders::_1,
+ candidate,
+ earliestOpTimeSeen),
+ rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ kFetcherTimeout);
+}
+
+Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ // Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task
+ // executor.
+ auto status = fetcher->schedule();
+ if (status.isOK()) {
+ _shuttingDownFetcher = std::move(_fetcher);
+ _fetcher = std::move(fetcher);
+ } else {
+ error() << "Error scheduling fetcher to evaluate host as sync source, host:"
+ << fetcher->getSource() << ", error: " << status;
+ }
+ return status;
+}
+
+OpTime SyncSourceResolver::_parseRemoteEarliestOpTime(const HostAndPort& candidate,
+ const Fetcher::QueryResponse& queryResponse) {
+ if (queryResponse.documents.empty()) {
+ // Remote oplog is empty.
+ const auto until = _taskExecutor->now() + kOplogEmptyBlacklistDuration;
+ log() << "Blacklisting due to empty oplog on host " << candidate << " for "
+ << kOplogEmptyBlacklistDuration << " until: " << until;
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+ return OpTime();
+ }
+
+ const auto& firstObjFound = queryResponse.documents.front();
+ if (firstObjFound.isEmpty()) {
+ // First document in remote oplog is empty.
+ const auto until = _taskExecutor->now() + kFirstOplogEntryEmptyBlacklistDuration;
+ log() << "Blacklisting due to empty first document from host " << candidate << " for "
+ << kFirstOplogEntryEmptyBlacklistDuration << " until: " << until;
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+ return OpTime();
+ }
+
+ const OplogEntry oplogEntry(firstObjFound);
+ const auto remoteEarliestOpTime = oplogEntry.getOpTime();
+ if (remoteEarliestOpTime.isNull()) {
+ // First document in remote oplog is empty.
+ const auto until = _taskExecutor->now() + kFirstOplogEntryNullTimestampBlacklistDuration;
+ log() << "Blacklisting due to null timestamp in first document from host " << candidate
+ << " for " << kFirstOplogEntryNullTimestampBlacklistDuration << " until: " << until;
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+ return OpTime();
+ }
+
+ return remoteEarliestOpTime;
+}
+
+void SyncSourceResolver::_firstOplogEntryFetcherCallback(
+ const StatusWith<Fetcher::QueryResponse>& queryResult,
+ HostAndPort candidate,
+ OpTime earliestOpTimeSeen) {
+ if (_isShuttingDown()) {
+ _finishCallback(Status(ErrorCodes::CallbackCanceled,
+ str::stream()
+ << "sync source resolver shut down while probing candidate: "
+ << candidate));
+ return;
+ }
+
+ if (ErrorCodes::CallbackCanceled == queryResult.getStatus()) {
+ _finishCallback(queryResult.getStatus());
+ return;
+ }
+
+ if (!queryResult.isOK()) {
+ // We got an error.
+ const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration;
+ log() << "Blacklisting " << candidate << " due to error: '" << queryResult.getStatus()
+ << "' for " << kFetcherErrorBlacklistDuration << " until: " << until;
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+
+ _chooseAndProbeNextSyncSource(earliestOpTimeSeen);
+ return;
+ }
+
+ const auto& queryResponse = queryResult.getValue();
+ const auto remoteEarliestOpTime = _parseRemoteEarliestOpTime(candidate, queryResponse);
+ if (remoteEarliestOpTime.isNull()) {
+ _chooseAndProbeNextSyncSource(earliestOpTimeSeen);
+ return;
+ }
+
+ // remoteEarliestOpTime may come from a very old config, so we cannot compare their terms.
+ if (_lastOpTimeFetched.getTimestamp() < remoteEarliestOpTime.getTimestamp()) {
+ // We're too stale to use this sync source.
+ const auto blacklistDuration = kTooStaleBlacklistDuration;
+ const auto until = _taskExecutor->now() + Minutes(1);
+
+ log() << "We are too stale to use " << candidate << " as a sync source. "
+ << "Blacklisting this sync source"
+ << " because our last fetched timestamp: " << _lastOpTimeFetched.getTimestamp()
+ << " is before their earliest timestamp: " << remoteEarliestOpTime.getTimestamp()
+ << " for " << blacklistDuration << " until: " << until;
+
+ _syncSourceSelector->blacklistSyncSource(candidate, until);
+
+ // If all the viable sync sources are too far ahead of us (i.e. we are "too stale" relative
+ // each sync source), we will want to return the starting timestamp of the sync source
+ // candidate that is closest to us. See SyncSourceResolverResponse::earliestOpTimeSeen.
+ // We use "earliestOpTimeSeen" to keep track of the current minimum starting timestamp.
+ if (earliestOpTimeSeen.isNull() ||
+ earliestOpTimeSeen.getTimestamp() > remoteEarliestOpTime.getTimestamp()) {
+ earliestOpTimeSeen = remoteEarliestOpTime;
+ }
+
+ _chooseAndProbeNextSyncSource(earliestOpTimeSeen);
+ return;
+ }
+
+ _finishCallback(candidate);
+}
+
+Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen) {
+ auto candidateResult = _chooseNewSyncSource();
+ if (!candidateResult.isOK()) {
+ return _finishCallback(candidateResult);
+ }
+
+ if (candidateResult.getValue().empty()) {
+ if (earliestOpTimeSeen.isNull()) {
+ return _finishCallback(candidateResult);
+ }
+
+ SyncSourceResolverResponse response;
+ response.syncSourceStatus = {ErrorCodes::OplogStartMissing, "too stale to catch up"};
+ response.earliestOpTimeSeen = earliestOpTimeSeen;
+ return _finishCallback(response);
+ }
+
+ auto status = _scheduleFetcher(
+ _makeFirstOplogEntryFetcher(candidateResult.getValue(), earliestOpTimeSeen));
+ if (!status.isOK()) {
+ return _finishCallback(status);
+ }
+
+ return Status::OK();
+}
+
+Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) {
+ SyncSourceResolverResponse response;
+ response.syncSourceStatus = std::move(result);
+ return _finishCallback(response);
+}
+
+Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) {
+ _onCompletion(response);
+
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_state != State::kComplete);
+ _state = State::kComplete;
+ _condition.notify_all();
+ return response.syncSourceStatus.getStatus();
}
} // namespace repl
diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h
index fd098374882..e0365c7ad1a 100644
--- a/src/mongo/db/repl/sync_source_resolver.h
+++ b/src/mongo/db/repl/sync_source_resolver.h
@@ -28,22 +28,32 @@
#pragma once
+#include <memory>
+
+#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
+#include "mongo/client/fetcher.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/functional.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
namespace mongo {
class OperationContext;
class Status;
+class OperationContext;
namespace repl {
-class ReplicationCoordinator;
+class SyncSourceSelector;
/**
- * SyncSourceResolverResponse contains the result of a call to findSyncSource. This result will
+ * SyncSourceResolverResponse contains the result from running SyncSourceResolver. This result will
* indicate one of the following:
* 1. A new sync source was selected. isOK() will return true and getSyncSource() will
* return the HostAndPort of the new sync source.
@@ -72,28 +82,117 @@ struct SyncSourceResolverResponse {
/**
* Supplies a sync source to Fetcher, Rollback and Reporter.
+ * Obtains sync source candidates to probe from SyncSourceSelector.
+ * Each instance is created as needed whenever a new sync source is required and
+ * is meant to be discarded after the sync source resolution is finished - 'onCompletion'
+ * callback is invoked with the results contained in SyncSourceResolverResponse.
*/
class SyncSourceResolver {
public:
- SyncSourceResolver(ReplicationCoordinator* replCoord) : _replCoord(replCoord){};
+ static const NamespaceString kLocalOplogNss;
+ static const Seconds kFetcherTimeout;
+ static const Seconds kFetcherErrorBlacklistDuration;
+ static const Seconds kOplogEmptyBlacklistDuration;
+ static const Seconds kFirstOplogEntryEmptyBlacklistDuration;
+ static const Seconds kFirstOplogEntryNullTimestampBlacklistDuration;
+ static const Minutes kTooStaleBlacklistDuration;
+
+ /**
+ * Callback function to report final status of resolving sync source.
+ */
+ typedef stdx::function<void(const SyncSourceResolverResponse&)> OnCompletionFn;
+
+ SyncSourceResolver(executor::TaskExecutor* taskExecutor,
+ SyncSourceSelector* syncSourceSelector,
+ const OpTime& lastOpTimeFetched,
+ const OnCompletionFn& onCompletion);
+ virtual ~SyncSourceResolver();
/**
- * Uses the provided lastOpTimeFetched and replCoord to find a new sync source for
- * DataReplicator components.
+ * Returns true if we are currently probing sync source candidates.
*/
- SyncSourceResolverResponse findSyncSource(OperationContext* txn,
- const OpTime& lastOpTimeFetched);
+ bool isActive() const;
/**
- * Returns current sync source, which may be empty if there is no valid sync source available.
+ * Starts probing sync source candidates returned by the sync source selector.
*/
- HostAndPort getActiveSyncSource();
+ Status startup();
+
+ /**
+ * Cancels all remote commands.
+ */
+ void shutdown();
+
+ /**
+ * Block until inactive.
+ */
+ void join();
private:
- ReplicationCoordinator* _replCoord;
- // Protects _syncSource.
- stdx::mutex _mutex;
- HostAndPort _syncSource;
+ bool _isActive_inlock() const;
+ bool _isShuttingDown() const;
+
+ /**
+ * Returns new sync source from selector.
+ */
+ StatusWith<HostAndPort> _chooseNewSyncSource();
+
+ /**
+ * Creates fetcher to read the first oplog entry on sync source.
+ */
+ std::unique_ptr<Fetcher> _makeFirstOplogEntryFetcher(HostAndPort candidate,
+ OpTime earliestOpTimeSeen);
+
+ /**
+ * Schedules fetcher to read oplog on sync source.
+ * Saves fetcher in '_fetcher' on success.
+ */
+ Status _scheduleFetcher(std::unique_ptr<Fetcher> fetcher);
+
+ /**
+ * Returns optime of first oplog entry from fetcher response.
+ * Returns null optime on error.
+ */
+ OpTime _parseRemoteEarliestOpTime(const HostAndPort& candidate,
+ const Fetcher::QueryResponse& queryResponse);
+
+ /**
+ * Callback for fetching first oplog entry on sync source.
+ */
+ void _firstOplogEntryFetcherCallback(const StatusWith<Fetcher::QueryResponse>& queryResult,
+ HostAndPort candidate,
+ OpTime earliestOpTimeSeen);
+
+ /**
+ * Obtains new sync source candidate and schedules remote command to fetcher first oplog entry.
+ * May transition state to Complete.
+ * Returns status that could be used as result for startup().
+ */
+ Status _chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen);
+
+ /**
+ * Invokes completion callback and transitions state to State::kComplete.
+ * Returns result.getStatus().
+ */
+ Status _finishCallback(StatusWith<HostAndPort> result);
+ Status _finishCallback(const SyncSourceResolverResponse& response);
+
+ executor::TaskExecutor* const _taskExecutor;
+ SyncSourceSelector* const _syncSourceSelector;
+ const OpTime _lastOpTimeFetched;
+ const OnCompletionFn _onCompletion;
+
+ // Protects members of this sync source resolver.
+ mutable stdx::mutex _mutex;
+ mutable stdx::condition_variable _condition;
+ enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
+ State _state = State::kPreStart;
+
+ // Fetches first oplog entry on sync source candidate.
+ std::unique_ptr<Fetcher> _fetcher;
+
+ // Holds reference to fetcher in the process of shutting down.
+ std::unique_ptr<Fetcher> _shuttingDownFetcher;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
new file mode 100644
index 00000000000..06d7934d7c8
--- /dev/null
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -0,0 +1,536 @@
+/**
+ * Copyright 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/db/cursor_id.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/sync_source_resolver.h"
+#include "mongo/db/repl/sync_source_selector.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/task_executor_proxy.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+
+namespace {
+
+using namespace mongo;
+using namespace mongo::repl;
+
+class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy {
+public:
+ using ShouldFailRequestFn = stdx::function<bool(const executor::RemoteCommandRequest&)>;
+
+ TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor,
+ ShouldFailRequestFn shouldFailRequest)
+ : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {}
+
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb) override {
+ if (_shouldFailRequest(request)) {
+ return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
+ }
+ return getExecutor()->scheduleRemoteCommand(request, cb);
+ }
+
+private:
+ ShouldFailRequestFn _shouldFailRequest;
+};
+
+class SyncSourceSelectorMock : public SyncSourceSelector {
+public:
+ void clearSyncSourceBlacklist() override {}
+ HostAndPort chooseNewSyncSource(const Timestamp& ts) override {
+ chooseNewSyncSourceHook();
+ lastTimestampFetched = ts;
+ return syncSource;
+ }
+ void blacklistSyncSource(const HostAndPort& host, Date_t until) override {
+ blacklistHost = host;
+ blacklistUntil = until;
+ }
+ bool shouldChangeSyncSource(const HostAndPort&, const rpc::ReplSetMetadata&) {
+ return false;
+ }
+ SyncSourceResolverResponse selectSyncSource(OperationContext*, const OpTime&) {
+ return SyncSourceResolverResponse();
+ }
+
+ HostAndPort syncSource = HostAndPort("host1", 1234);
+ Timestamp lastTimestampFetched;
+ stdx::function<void()> chooseNewSyncSourceHook = []() {};
+
+ HostAndPort blacklistHost;
+ Date_t blacklistUntil;
+};
+
+class SyncSourceResolverTest : public executor::ThreadPoolExecutorTest {
+private:
+ void setUp() override;
+ void tearDown() override;
+
+protected:
+ TaskExecutorWithFailureInScheduleRemoteCommand::ShouldFailRequestFn _shouldFailRequest;
+ std::unique_ptr<TaskExecutorWithFailureInScheduleRemoteCommand> _executorProxy;
+
+ SyncSourceResolverResponse _response;
+ SyncSourceResolver::OnCompletionFn _onCompletion;
+ std::unique_ptr<SyncSourceSelectorMock> _selector;
+
+ std::unique_ptr<SyncSourceResolver> _resolver;
+};
+
+const OpTime lastOpTimeFetched(Timestamp(Seconds(100), 1U), 1LL);
+
+void SyncSourceResolverTest::setUp() {
+ executor::ThreadPoolExecutorTest::setUp();
+
+ _shouldFailRequest = [](const executor::RemoteCommandRequest&) { return false; };
+ _executorProxy = stdx::make_unique<TaskExecutorWithFailureInScheduleRemoteCommand>(
+ &getExecutor(), [this](const executor::RemoteCommandRequest& request) {
+ return _shouldFailRequest(request);
+ });
+
+ _response.syncSourceStatus = getDetectableErrorStatus();
+ _onCompletion = [this](const SyncSourceResolverResponse& response) { _response = response; };
+
+ _selector = stdx::make_unique<SyncSourceSelectorMock>();
+ _resolver = stdx::make_unique<SyncSourceResolver>(
+ _executorProxy.get(),
+ _selector.get(),
+ lastOpTimeFetched,
+ [this](const SyncSourceResolverResponse& response) { _onCompletion(response); });
+
+ launchExecutorThread();
+}
+
+void SyncSourceResolverTest::tearDown() {
+ executor::ThreadPoolExecutorTest::shutdownExecutorThread();
+ executor::ThreadPoolExecutorTest::joinExecutorThread();
+
+ _resolver.reset();
+ _selector.reset();
+ _executorProxy.reset();
+
+ executor::ThreadPoolExecutorTest::tearDown();
+}
+
+const NamespaceString nss("local.oplog.rs");
+
+BSONObj makeCursorResponse(CursorId cursorId,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ bool isFirstBatch = true) {
+ BSONObjBuilder bob;
+ {
+ BSONObjBuilder cursorBob(bob.subobjStart("cursor"));
+ cursorBob.append("id", cursorId);
+ cursorBob.append("ns", nss.toString());
+ {
+ BSONArrayBuilder batchBob(
+ cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch"));
+ for (const auto& doc : docs) {
+ batchBob.append(doc);
+ }
+ }
+ }
+ bob.append("ok", 1);
+ return bob.obj();
+}
+
+TEST_F(SyncSourceResolverTest, InvalidConstruction) {
+ SyncSourceSelectorMock selector;
+ const OpTime lastOpTimeFetched(Timestamp(Seconds(100), 1U), 1LL);
+ auto onCompletion = [](const SyncSourceResolverResponse&) {};
+
+ // Null task executor.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, onCompletion),
+ UserException,
+ ErrorCodes::BadValue,
+ "task executor cannot be null");
+
+ // Null sync source selector.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ SyncSourceResolver(&getExecutor(), nullptr, lastOpTimeFetched, onCompletion),
+ UserException,
+ ErrorCodes::BadValue,
+ "sync source selector cannot be null");
+
+ // Null last fetched optime.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ SyncSourceResolver(&getExecutor(), &selector, OpTime(), onCompletion),
+ UserException,
+ ErrorCodes::BadValue,
+ "last fetched optime cannot be null");
+
+ // Null callback function.
+ ASSERT_THROWS_CODE_AND_WHAT(
+ SyncSourceResolver(
+ &getExecutor(), &selector, lastOpTimeFetched, SyncSourceResolver::OnCompletionFn()),
+ UserException,
+ ErrorCodes::BadValue,
+ "callback function cannot be null");
+}
+
+TEST_F(SyncSourceResolverTest, StartupReturnsIllegalOperationIfAlreadyActive) {
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, _resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+}
+
+TEST_F(SyncSourceResolverTest, StartupReturnsShutdownInProgressIfExecutorIsShutdown) {
+ ASSERT_FALSE(_resolver->isActive());
+ getExecutor().shutdown();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _resolver->startup());
+ ASSERT_FALSE(_resolver->isActive());
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsStatusOkAndAnEmptyHostWhenNoViableHostExists) {
+ _selector->syncSource = HostAndPort();
+ ASSERT_OK(_resolver->startup());
+
+ // Resolver invokes callback with empty host and becomes inactive immediately.
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus));
+ ASSERT_EQUALS(lastOpTimeFetched.getTimestamp(), _selector->lastTimestampFetched);
+
+ // Cannot restart a completed resolver.
+ ASSERT_EQUALS(ErrorCodes::IllegalOperation, _resolver->startup());
+}
+
+TEST_F(
+ SyncSourceResolverTest,
+ SyncSourceResolverReturnsCallbackCanceledIfResolverIsShutdownBeforeReturningEmptySyncSource) {
+ _selector->syncSource = HostAndPort();
+ _selector->chooseNewSyncSourceHook = [this]() { _resolver->shutdown(); };
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _resolver->startup());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus);
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverConvertsExceptionToStatusIfChoosingViableSyncSourceThrowsException) {
+ _selector->chooseNewSyncSourceHook = [this]() {
+ uassert(ErrorCodes::InternalError, "", false);
+ };
+ ASSERT_EQUALS(ErrorCodes::InternalError, _resolver->startup());
+ ASSERT_EQUALS(ErrorCodes::InternalError, _response.syncSourceStatus);
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsScheduleErrorIfTaskExecutorFailsToScheduleRemoteCommand) {
+ _shouldFailRequest = [](const executor::RemoteCommandRequest&) { return true; };
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _resolver->startup());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus);
+}
+
+void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net,
+ SyncSourceSelectorMock* selector,
+ HostAndPort currentSyncSource,
+ HostAndPort nextSyncSource,
+ std::vector<BSONObj> docs) {
+ executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto request = net->scheduleSuccessfulResponse(makeCursorResponse(0, nss, docs));
+ ASSERT_EQUALS(currentSyncSource, request.target);
+ ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.db(), request.dbname);
+ ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout);
+ auto firstElement = request.cmdObj.firstElement();
+ ASSERT_EQUALS("find"_sd, firstElement.fieldNameStringData());
+ ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.coll(), firstElement.String());
+ ASSERT_EQUALS(1, request.cmdObj.getIntField("limit"));
+ ASSERT_BSONOBJ_EQ(BSON("$natural" << 1), request.cmdObj.getObjectField("sort"));
+
+ // Change next sync source candidate before delivering scheduled response.
+ selector->syncSource = nextSyncSource;
+
+ net->runReadyNetworkOperations();
+}
+
+void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net,
+ SyncSourceSelectorMock* selector,
+ HostAndPort currentSyncSource,
+ HostAndPort nextSyncSource,
+ Timestamp ts) {
+ _scheduleFirstOplogEntryFetcherResponse(
+ net, selector, currentSyncSource, nextSyncSource, {BSON("ts" << ts << "t" << 0)});
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsStatusOkAndTheFoundHostWhenAnEligibleSyncSourceExists) {
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ ResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingFetcher) {
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _resolver->shutdown();
+ executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus);
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingFetcher) {
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ getExecutor().shutdown();
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus);
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeDoesNotHaveOldEnoughData) {
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(200, 2));
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kTooStaleBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsOplogStartMissingAndEarliestOpTimeAvailableWhenAllSourcesTooFresh) {
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ HostAndPort candidate3("node3", 12345);
+ _selector->syncSource = candidate1;
+
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(400, 2));
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, candidate3, Timestamp(200, 2));
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate3, HostAndPort(), Timestamp(300, 2));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(ErrorCodes::OplogStartMissing, _response.syncSourceStatus);
+ ASSERT_EQUALS(Timestamp(200, 2), _response.earliestOpTimeSeen.getTimestamp());
+}
+
+void _scheduleNetworkErrorForFirstNode(executor::NetworkInterfaceMock* net,
+ SyncSourceSelectorMock* selector,
+ HostAndPort currentSyncSource,
+ HostAndPort nextSyncSource) {
+ executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto request = net->scheduleErrorResponse(
+ Status(ErrorCodes::HostUnreachable, "Sad message from the network :("));
+ ASSERT_EQUALS(currentSyncSource, request.target);
+
+ // Change next sync source candidate before delivering error to callback.
+ selector->syncSource = nextSyncSource;
+
+ net->runReadyNetworkOperations();
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasANetworkError) {
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, candidate2);
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsEmptyHostWhenNoViableNodeExistsAfterNetworkErrorOnFirstNode) {
+ HostAndPort candidate1("node1", 12345);
+ _selector->syncSource = candidate1;
+
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort());
+
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration,
+ _selector->blacklistUntil);
+
+ ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverReturnsScheduleErrorWhenTheSchedulingCommandToSecondNodeFails) {
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node2", 12345);
+ _selector->syncSource = candidate1;
+
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _shouldFailRequest = [candidate2](const executor::RemoteCommandRequest& request) {
+ return candidate2 == request.target;
+ };
+
+ _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, candidate2);
+
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration,
+ _selector->blacklistUntil);
+
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus);
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasAnEmptyOplog) {
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, std::vector<BSONObj>());
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kOplogEmptyBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasAnEmptyFirstOplogEntry) {
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, {BSONObj()});
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+TEST_F(SyncSourceResolverTest,
+ SyncSourceResolverWillTryOtherSourcesWhenFirstNodeContainsOplogEntryWithNullTimestamp) {
+ HostAndPort candidate1("node1", 12345);
+ HostAndPort candidate2("node1", 12345);
+ _selector->syncSource = candidate1;
+ ASSERT_OK(_resolver->startup());
+ ASSERT_TRUE(_resolver->isActive());
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate1, candidate2, Timestamp(0, 0));
+
+ ASSERT_TRUE(_resolver->isActive());
+ ASSERT_EQUALS(candidate1, _selector->blacklistHost);
+ ASSERT_EQUALS(getExecutor().now() +
+ SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration,
+ _selector->blacklistUntil);
+
+ _scheduleFirstOplogEntryFetcherResponse(
+ getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+
+ _resolver->join();
+ ASSERT_FALSE(_resolver->isActive());
+ ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus));
+}
+
+} // namespace
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 1eed68daea5..8a0c44d90fc 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -664,6 +664,10 @@ NetworkInterfaceMock::InNetworkGuard::~InNetworkGuard() {
_net->exitNetwork();
}
+NetworkInterfaceMock* NetworkInterfaceMock::InNetworkGuard::operator->() const {
+ return _net;
+}
+
NetworkInterfaceMockClockSource::NetworkInterfaceMockClockSource(NetworkInterfaceMock* net)
: _net(net) {
_tracksSystemClock = false;
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 0ba054f11fe..6a34ceb45b3 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -515,6 +515,11 @@ public:
*/
~InNetworkGuard();
+ /**
+ * Returns network interface mock pointer.
+ */
+ NetworkInterfaceMock* operator->() const;
+
private:
NetworkInterfaceMock* _net;
bool _callExitNetwork = true;