diff options
author | Benety Goh <benety@mongodb.com> | 2016-10-25 14:43:03 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-11-01 11:10:02 -0400 |
commit | 938885a1796d8b7e42748a4cf68c77872d75db98 (patch) | |
tree | 843f64cd0a6f664431ae87bded130f6f2862085f /src | |
parent | f25a36cdbe77f596adfea6e008fbf5a044cf3a83 (diff) | |
download | mongo-938885a1796d8b7e42748a4cf68c77872d75db98.tar.gz |
SERVER-23070 rewrote SyncSourceResolver to select sync source asynchronously
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/SConscript | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.cpp | 286 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver.h | 125 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_resolver_test.cpp | 536 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 5 |
6 files changed, 957 insertions, 27 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index c5588414d58..65be2421f30 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -753,6 +753,33 @@ env.CppUnitTest( ) env.Library( + target='sync_source_resolver', + source=[ + 'sync_source_resolver.cpp', + ], + LIBDEPS=[ + 'oplog_entry', + 'optime', + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/client/fetcher', + '$BUILD_DIR/mongo/db/namespace_string', + '$BUILD_DIR/mongo/executor/task_executor_interface', + '$BUILD_DIR/mongo/rpc/metadata', + '$BUILD_DIR/mongo/util/net/hostandport', + ], +) + +env.CppUnitTest( + target='sync_source_resolver_test', + source='sync_source_resolver_test.cpp', + LIBDEPS=[ + 'sync_source_resolver', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/unittest/task_executor_proxy', + ], +) + +env.Library( target='base_cloner_test_fixture', source=[ 'base_cloner_test_fixture.cpp', @@ -1091,7 +1118,6 @@ env.Library( "rs_initialsync.cpp", "rs_sync.cpp", "sync_source_feedback.cpp", - "sync_source_resolver.cpp", # TODO(dannenberg) move this into DataReplicator in repl/SConscript. ], LIBDEPS=[ 'bgsync', diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp index 0d33207f74a..cb7b272be20 100644 --- a/src/mongo/db/repl/sync_source_resolver.cpp +++ b/src/mongo/db/repl/sync_source_resolver.cpp @@ -26,32 +26,292 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + #include "mongo/platform/basic.h" #include "mongo/db/repl/sync_source_resolver.h" -#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/sync_source_selector.h" +#include "mongo/rpc/metadata/server_selection_metadata.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/destructor_guard.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" namespace mongo { namespace repl { -SyncSourceResolverResponse SyncSourceResolver::findSyncSource(OperationContext* txn, - const OpTime& lastOpTimeFetched) { - SyncSourceResolverResponse resp; - stdx::unique_lock<stdx::mutex> lock(_mutex); +const NamespaceString SyncSourceResolver::kLocalOplogNss("local.oplog.rs"); +const Seconds SyncSourceResolver::kFetcherTimeout(30); +const Seconds SyncSourceResolver::kFetcherErrorBlacklistDuration(10); +const Seconds SyncSourceResolver::kOplogEmptyBlacklistDuration(10); +const Seconds SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration(10); +const Seconds SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration(10); +const Minutes SyncSourceResolver::kTooStaleBlacklistDuration(1); + +SyncSourceResolver::SyncSourceResolver(executor::TaskExecutor* taskExecutor, + SyncSourceSelector* syncSourceSelector, + const OpTime& lastOpTimeFetched, + const OnCompletionFn& onCompletion) + : _taskExecutor(taskExecutor), + _syncSourceSelector(syncSourceSelector), + _lastOpTimeFetched(lastOpTimeFetched), + _onCompletion(onCompletion) { + uassert(ErrorCodes::BadValue, "task executor cannot be null", taskExecutor); + uassert(ErrorCodes::BadValue, "sync source selector cannot be null", syncSourceSelector); + uassert( + ErrorCodes::BadValue, "last fetched optime cannot be null", !lastOpTimeFetched.isNull()); + uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); +} + +SyncSourceResolver::~SyncSourceResolver() { + DESTRUCTOR_GUARD(shutdown(); join();); +} + +bool SyncSourceResolver::isActive() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _isActive_inlock(); +} + +bool SyncSourceResolver::_isActive_inlock() const { + return State::kRunning == _state || State::kShuttingDown == _state; +} + +Status SyncSourceResolver::startup() { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (State::kPreStart != _state) { + return Status(ErrorCodes::IllegalOperation, "sync source resolver already started"); + } + _state = State::kRunning; + } - resp = _replCoord->selectSyncSource(txn, lastOpTimeFetched); - if (resp.isOK()) { - // Store the syncSource for future use. - _syncSource = resp.getSyncSource(); + return _chooseAndProbeNextSyncSource(OpTime()); +} + +void SyncSourceResolver::shutdown() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + // Transition directly from PreStart to Complete if not started yet. + if (State::kPreStart == _state) { + _state = State::kComplete; + return; } - return resp; + // Nothing to do if we are already in ShuttingDown or Complete state. + if (State::kShuttingDown == _state || State::kComplete == _state) { + return; + } + + invariant(_state == State::kRunning); + _state = State::kShuttingDown; + + if (_fetcher) { + _fetcher->shutdown(); + } } -HostAndPort SyncSourceResolver::getActiveSyncSource() { - stdx::unique_lock<stdx::mutex> lock(_mutex); - return _syncSource; +void SyncSourceResolver::join() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_isActive_inlock(); }); +} + +bool SyncSourceResolver::_isShuttingDown() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return State::kShuttingDown == _state; +} + +StatusWith<HostAndPort> SyncSourceResolver::_chooseNewSyncSource() { + HostAndPort candidate; + try { + candidate = _syncSourceSelector->chooseNewSyncSource(_lastOpTimeFetched.getTimestamp()); + } catch (...) { + return exceptionToStatus(); + } + + if (_isShuttingDown()) { + return Status(ErrorCodes::CallbackCanceled, + str::stream() << "sync source resolver shut down before probing candidate: " + << candidate); + } + + return candidate; +} + +std::unique_ptr<Fetcher> SyncSourceResolver::_makeFirstOplogEntryFetcher( + HostAndPort candidate, OpTime earliestOpTimeSeen) { + return stdx::make_unique<Fetcher>( + _taskExecutor, + candidate, + kLocalOplogNss.db().toString(), + BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort" << BSON("$natural" << 1)), + stdx::bind(&SyncSourceResolver::_firstOplogEntryFetcherCallback, + this, + stdx::placeholders::_1, + candidate, + earliestOpTimeSeen), + rpc::ServerSelectionMetadata(true, boost::none).toBSON(), + kFetcherTimeout); +} + +Status SyncSourceResolver::_scheduleFetcher(std::unique_ptr<Fetcher> fetcher) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + // Must schedule fetcher inside lock in case fetcher's callback gets invoked immediately by task + // executor. + auto status = fetcher->schedule(); + if (status.isOK()) { + _shuttingDownFetcher = std::move(_fetcher); + _fetcher = std::move(fetcher); + } else { + error() << "Error scheduling fetcher to evaluate host as sync source, host:" + << fetcher->getSource() << ", error: " << status; + } + return status; +} + +OpTime SyncSourceResolver::_parseRemoteEarliestOpTime(const HostAndPort& candidate, + const Fetcher::QueryResponse& queryResponse) { + if (queryResponse.documents.empty()) { + // Remote oplog is empty. + const auto until = _taskExecutor->now() + kOplogEmptyBlacklistDuration; + log() << "Blacklisting due to empty oplog on host " << candidate << " for " + << kOplogEmptyBlacklistDuration << " until: " << until; + _syncSourceSelector->blacklistSyncSource(candidate, until); + return OpTime(); + } + + const auto& firstObjFound = queryResponse.documents.front(); + if (firstObjFound.isEmpty()) { + // First document in remote oplog is empty. + const auto until = _taskExecutor->now() + kFirstOplogEntryEmptyBlacklistDuration; + log() << "Blacklisting due to empty first document from host " << candidate << " for " + << kFirstOplogEntryEmptyBlacklistDuration << " until: " << until; + _syncSourceSelector->blacklistSyncSource(candidate, until); + return OpTime(); + } + + const OplogEntry oplogEntry(firstObjFound); + const auto remoteEarliestOpTime = oplogEntry.getOpTime(); + if (remoteEarliestOpTime.isNull()) { + // First document in remote oplog is empty. + const auto until = _taskExecutor->now() + kFirstOplogEntryNullTimestampBlacklistDuration; + log() << "Blacklisting due to null timestamp in first document from host " << candidate + << " for " << kFirstOplogEntryNullTimestampBlacklistDuration << " until: " << until; + _syncSourceSelector->blacklistSyncSource(candidate, until); + return OpTime(); + } + + return remoteEarliestOpTime; +} + +void SyncSourceResolver::_firstOplogEntryFetcherCallback( + const StatusWith<Fetcher::QueryResponse>& queryResult, + HostAndPort candidate, + OpTime earliestOpTimeSeen) { + if (_isShuttingDown()) { + _finishCallback(Status(ErrorCodes::CallbackCanceled, + str::stream() + << "sync source resolver shut down while probing candidate: " + << candidate)); + return; + } + + if (ErrorCodes::CallbackCanceled == queryResult.getStatus()) { + _finishCallback(queryResult.getStatus()); + return; + } + + if (!queryResult.isOK()) { + // We got an error. + const auto until = _taskExecutor->now() + kFetcherErrorBlacklistDuration; + log() << "Blacklisting " << candidate << " due to error: '" << queryResult.getStatus() + << "' for " << kFetcherErrorBlacklistDuration << " until: " << until; + _syncSourceSelector->blacklistSyncSource(candidate, until); + + _chooseAndProbeNextSyncSource(earliestOpTimeSeen); + return; + } + + const auto& queryResponse = queryResult.getValue(); + const auto remoteEarliestOpTime = _parseRemoteEarliestOpTime(candidate, queryResponse); + if (remoteEarliestOpTime.isNull()) { + _chooseAndProbeNextSyncSource(earliestOpTimeSeen); + return; + } + + // remoteEarliestOpTime may come from a very old config, so we cannot compare their terms. + if (_lastOpTimeFetched.getTimestamp() < remoteEarliestOpTime.getTimestamp()) { + // We're too stale to use this sync source. + const auto blacklistDuration = kTooStaleBlacklistDuration; + const auto until = _taskExecutor->now() + Minutes(1); + + log() << "We are too stale to use " << candidate << " as a sync source. " + << "Blacklisting this sync source" + << " because our last fetched timestamp: " << _lastOpTimeFetched.getTimestamp() + << " is before their earliest timestamp: " << remoteEarliestOpTime.getTimestamp() + << " for " << blacklistDuration << " until: " << until; + + _syncSourceSelector->blacklistSyncSource(candidate, until); + + // If all the viable sync sources are too far ahead of us (i.e. we are "too stale" relative + // each sync source), we will want to return the starting timestamp of the sync source + // candidate that is closest to us. See SyncSourceResolverResponse::earliestOpTimeSeen. + // We use "earliestOpTimeSeen" to keep track of the current minimum starting timestamp. + if (earliestOpTimeSeen.isNull() || + earliestOpTimeSeen.getTimestamp() > remoteEarliestOpTime.getTimestamp()) { + earliestOpTimeSeen = remoteEarliestOpTime; + } + + _chooseAndProbeNextSyncSource(earliestOpTimeSeen); + return; + } + + _finishCallback(candidate); +} + +Status SyncSourceResolver::_chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen) { + auto candidateResult = _chooseNewSyncSource(); + if (!candidateResult.isOK()) { + return _finishCallback(candidateResult); + } + + if (candidateResult.getValue().empty()) { + if (earliestOpTimeSeen.isNull()) { + return _finishCallback(candidateResult); + } + + SyncSourceResolverResponse response; + response.syncSourceStatus = {ErrorCodes::OplogStartMissing, "too stale to catch up"}; + response.earliestOpTimeSeen = earliestOpTimeSeen; + return _finishCallback(response); + } + + auto status = _scheduleFetcher( + _makeFirstOplogEntryFetcher(candidateResult.getValue(), earliestOpTimeSeen)); + if (!status.isOK()) { + return _finishCallback(status); + } + + return Status::OK(); +} + +Status SyncSourceResolver::_finishCallback(StatusWith<HostAndPort> result) { + SyncSourceResolverResponse response; + response.syncSourceStatus = std::move(result); + return _finishCallback(response); +} + +Status SyncSourceResolver::_finishCallback(const SyncSourceResolverResponse& response) { + _onCompletion(response); + + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_state != State::kComplete); + _state = State::kComplete; + _condition.notify_all(); + return response.syncSourceStatus.getStatus(); } } // namespace repl diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h index fd098374882..e0365c7ad1a 100644 --- a/src/mongo/db/repl/sync_source_resolver.h +++ b/src/mongo/db/repl/sync_source_resolver.h @@ -28,22 +28,32 @@ #pragma once +#include <memory> + +#include "mongo/base/status.h" #include "mongo/base/status_with.h" +#include "mongo/client/fetcher.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/repl/optime.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/functional.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" namespace mongo { class OperationContext; class Status; +class OperationContext; namespace repl { -class ReplicationCoordinator; +class SyncSourceSelector; /** - * SyncSourceResolverResponse contains the result of a call to findSyncSource. This result will + * SyncSourceResolverResponse contains the result from running SyncSourceResolver. This result will * indicate one of the following: * 1. A new sync source was selected. isOK() will return true and getSyncSource() will * return the HostAndPort of the new sync source. @@ -72,28 +82,117 @@ struct SyncSourceResolverResponse { /** * Supplies a sync source to Fetcher, Rollback and Reporter. + * Obtains sync source candidates to probe from SyncSourceSelector. + * Each instance is created as needed whenever a new sync source is required and + * is meant to be discarded after the sync source resolution is finished - 'onCompletion' + * callback is invoked with the results contained in SyncSourceResolverResponse. */ class SyncSourceResolver { public: - SyncSourceResolver(ReplicationCoordinator* replCoord) : _replCoord(replCoord){}; + static const NamespaceString kLocalOplogNss; + static const Seconds kFetcherTimeout; + static const Seconds kFetcherErrorBlacklistDuration; + static const Seconds kOplogEmptyBlacklistDuration; + static const Seconds kFirstOplogEntryEmptyBlacklistDuration; + static const Seconds kFirstOplogEntryNullTimestampBlacklistDuration; + static const Minutes kTooStaleBlacklistDuration; + + /** + * Callback function to report final status of resolving sync source. + */ + typedef stdx::function<void(const SyncSourceResolverResponse&)> OnCompletionFn; + + SyncSourceResolver(executor::TaskExecutor* taskExecutor, + SyncSourceSelector* syncSourceSelector, + const OpTime& lastOpTimeFetched, + const OnCompletionFn& onCompletion); + virtual ~SyncSourceResolver(); /** - * Uses the provided lastOpTimeFetched and replCoord to find a new sync source for - * DataReplicator components. + * Returns true if we are currently probing sync source candidates. */ - SyncSourceResolverResponse findSyncSource(OperationContext* txn, - const OpTime& lastOpTimeFetched); + bool isActive() const; /** - * Returns current sync source, which may be empty if there is no valid sync source available. + * Starts probing sync source candidates returned by the sync source selector. */ - HostAndPort getActiveSyncSource(); + Status startup(); + + /** + * Cancels all remote commands. + */ + void shutdown(); + + /** + * Block until inactive. + */ + void join(); private: - ReplicationCoordinator* _replCoord; - // Protects _syncSource. - stdx::mutex _mutex; - HostAndPort _syncSource; + bool _isActive_inlock() const; + bool _isShuttingDown() const; + + /** + * Returns new sync source from selector. + */ + StatusWith<HostAndPort> _chooseNewSyncSource(); + + /** + * Creates fetcher to read the first oplog entry on sync source. + */ + std::unique_ptr<Fetcher> _makeFirstOplogEntryFetcher(HostAndPort candidate, + OpTime earliestOpTimeSeen); + + /** + * Schedules fetcher to read oplog on sync source. + * Saves fetcher in '_fetcher' on success. + */ + Status _scheduleFetcher(std::unique_ptr<Fetcher> fetcher); + + /** + * Returns optime of first oplog entry from fetcher response. + * Returns null optime on error. + */ + OpTime _parseRemoteEarliestOpTime(const HostAndPort& candidate, + const Fetcher::QueryResponse& queryResponse); + + /** + * Callback for fetching first oplog entry on sync source. + */ + void _firstOplogEntryFetcherCallback(const StatusWith<Fetcher::QueryResponse>& queryResult, + HostAndPort candidate, + OpTime earliestOpTimeSeen); + + /** + * Obtains new sync source candidate and schedules remote command to fetcher first oplog entry. + * May transition state to Complete. + * Returns status that could be used as result for startup(). + */ + Status _chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen); + + /** + * Invokes completion callback and transitions state to State::kComplete. + * Returns result.getStatus(). + */ + Status _finishCallback(StatusWith<HostAndPort> result); + Status _finishCallback(const SyncSourceResolverResponse& response); + + executor::TaskExecutor* const _taskExecutor; + SyncSourceSelector* const _syncSourceSelector; + const OpTime _lastOpTimeFetched; + const OnCompletionFn _onCompletion; + + // Protects members of this sync source resolver. + mutable stdx::mutex _mutex; + mutable stdx::condition_variable _condition; + enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; + State _state = State::kPreStart; + + // Fetches first oplog entry on sync source candidate. + std::unique_ptr<Fetcher> _fetcher; + + // Holds reference to fetcher in the process of shutting down. + std::unique_ptr<Fetcher> _shuttingDownFetcher; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp new file mode 100644 index 00000000000..06d7934d7c8 --- /dev/null +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -0,0 +1,536 @@ +/** + * Copyright 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/db/cursor_id.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/sync_source_resolver.h" +#include "mongo/db/repl/sync_source_selector.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/task_executor_proxy.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +namespace { + +using namespace mongo; +using namespace mongo::repl; + +class TaskExecutorWithFailureInScheduleRemoteCommand : public unittest::TaskExecutorProxy { +public: + using ShouldFailRequestFn = stdx::function<bool(const executor::RemoteCommandRequest&)>; + + TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor, + ShouldFailRequestFn shouldFailRequest) + : unittest::TaskExecutorProxy(executor), _shouldFailRequest(shouldFailRequest) {} + + StatusWith<CallbackHandle> scheduleRemoteCommand(const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb) override { + if (_shouldFailRequest(request)) { + return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); + } + return getExecutor()->scheduleRemoteCommand(request, cb); + } + +private: + ShouldFailRequestFn _shouldFailRequest; +}; + +class SyncSourceSelectorMock : public SyncSourceSelector { +public: + void clearSyncSourceBlacklist() override {} + HostAndPort chooseNewSyncSource(const Timestamp& ts) override { + chooseNewSyncSourceHook(); + lastTimestampFetched = ts; + return syncSource; + } + void blacklistSyncSource(const HostAndPort& host, Date_t until) override { + blacklistHost = host; + blacklistUntil = until; + } + bool shouldChangeSyncSource(const HostAndPort&, const rpc::ReplSetMetadata&) { + return false; + } + SyncSourceResolverResponse selectSyncSource(OperationContext*, const OpTime&) { + return SyncSourceResolverResponse(); + } + + HostAndPort syncSource = HostAndPort("host1", 1234); + Timestamp lastTimestampFetched; + stdx::function<void()> chooseNewSyncSourceHook = []() {}; + + HostAndPort blacklistHost; + Date_t blacklistUntil; +}; + +class SyncSourceResolverTest : public executor::ThreadPoolExecutorTest { +private: + void setUp() override; + void tearDown() override; + +protected: + TaskExecutorWithFailureInScheduleRemoteCommand::ShouldFailRequestFn _shouldFailRequest; + std::unique_ptr<TaskExecutorWithFailureInScheduleRemoteCommand> _executorProxy; + + SyncSourceResolverResponse _response; + SyncSourceResolver::OnCompletionFn _onCompletion; + std::unique_ptr<SyncSourceSelectorMock> _selector; + + std::unique_ptr<SyncSourceResolver> _resolver; +}; + +const OpTime lastOpTimeFetched(Timestamp(Seconds(100), 1U), 1LL); + +void SyncSourceResolverTest::setUp() { + executor::ThreadPoolExecutorTest::setUp(); + + _shouldFailRequest = [](const executor::RemoteCommandRequest&) { return false; }; + _executorProxy = stdx::make_unique<TaskExecutorWithFailureInScheduleRemoteCommand>( + &getExecutor(), [this](const executor::RemoteCommandRequest& request) { + return _shouldFailRequest(request); + }); + + _response.syncSourceStatus = getDetectableErrorStatus(); + _onCompletion = [this](const SyncSourceResolverResponse& response) { _response = response; }; + + _selector = stdx::make_unique<SyncSourceSelectorMock>(); + _resolver = stdx::make_unique<SyncSourceResolver>( + _executorProxy.get(), + _selector.get(), + lastOpTimeFetched, + [this](const SyncSourceResolverResponse& response) { _onCompletion(response); }); + + launchExecutorThread(); +} + +void SyncSourceResolverTest::tearDown() { + executor::ThreadPoolExecutorTest::shutdownExecutorThread(); + executor::ThreadPoolExecutorTest::joinExecutorThread(); + + _resolver.reset(); + _selector.reset(); + _executorProxy.reset(); + + executor::ThreadPoolExecutorTest::tearDown(); +} + +const NamespaceString nss("local.oplog.rs"); + +BSONObj makeCursorResponse(CursorId cursorId, + const NamespaceString& nss, + std::vector<BSONObj> docs, + bool isFirstBatch = true) { + BSONObjBuilder bob; + { + BSONObjBuilder cursorBob(bob.subobjStart("cursor")); + cursorBob.append("id", cursorId); + cursorBob.append("ns", nss.toString()); + { + BSONArrayBuilder batchBob( + cursorBob.subarrayStart(isFirstBatch ? "firstBatch" : "nextBatch")); + for (const auto& doc : docs) { + batchBob.append(doc); + } + } + } + bob.append("ok", 1); + return bob.obj(); +} + +TEST_F(SyncSourceResolverTest, InvalidConstruction) { + SyncSourceSelectorMock selector; + const OpTime lastOpTimeFetched(Timestamp(Seconds(100), 1U), 1LL); + auto onCompletion = [](const SyncSourceResolverResponse&) {}; + + // Null task executor. + ASSERT_THROWS_CODE_AND_WHAT( + SyncSourceResolver(nullptr, &selector, lastOpTimeFetched, onCompletion), + UserException, + ErrorCodes::BadValue, + "task executor cannot be null"); + + // Null sync source selector. + ASSERT_THROWS_CODE_AND_WHAT( + SyncSourceResolver(&getExecutor(), nullptr, lastOpTimeFetched, onCompletion), + UserException, + ErrorCodes::BadValue, + "sync source selector cannot be null"); + + // Null last fetched optime. + ASSERT_THROWS_CODE_AND_WHAT( + SyncSourceResolver(&getExecutor(), &selector, OpTime(), onCompletion), + UserException, + ErrorCodes::BadValue, + "last fetched optime cannot be null"); + + // Null callback function. + ASSERT_THROWS_CODE_AND_WHAT( + SyncSourceResolver( + &getExecutor(), &selector, lastOpTimeFetched, SyncSourceResolver::OnCompletionFn()), + UserException, + ErrorCodes::BadValue, + "callback function cannot be null"); +} + +TEST_F(SyncSourceResolverTest, StartupReturnsIllegalOperationIfAlreadyActive) { + ASSERT_FALSE(_resolver->isActive()); + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::IllegalOperation, _resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); +} + +TEST_F(SyncSourceResolverTest, StartupReturnsShutdownInProgressIfExecutorIsShutdown) { + ASSERT_FALSE(_resolver->isActive()); + getExecutor().shutdown(); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, _resolver->startup()); + ASSERT_FALSE(_resolver->isActive()); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsStatusOkAndAnEmptyHostWhenNoViableHostExists) { + _selector->syncSource = HostAndPort(); + ASSERT_OK(_resolver->startup()); + + // Resolver invokes callback with empty host and becomes inactive immediately. + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus)); + ASSERT_EQUALS(lastOpTimeFetched.getTimestamp(), _selector->lastTimestampFetched); + + // Cannot restart a completed resolver. + ASSERT_EQUALS(ErrorCodes::IllegalOperation, _resolver->startup()); +} + +TEST_F( + SyncSourceResolverTest, + SyncSourceResolverReturnsCallbackCanceledIfResolverIsShutdownBeforeReturningEmptySyncSource) { + _selector->syncSource = HostAndPort(); + _selector->chooseNewSyncSourceHook = [this]() { _resolver->shutdown(); }; + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _resolver->startup()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverConvertsExceptionToStatusIfChoosingViableSyncSourceThrowsException) { + _selector->chooseNewSyncSourceHook = [this]() { + uassert(ErrorCodes::InternalError, "", false); + }; + ASSERT_EQUALS(ErrorCodes::InternalError, _resolver->startup()); + ASSERT_EQUALS(ErrorCodes::InternalError, _response.syncSourceStatus); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsScheduleErrorIfTaskExecutorFailsToScheduleRemoteCommand) { + _shouldFailRequest = [](const executor::RemoteCommandRequest&) { return true; }; + ASSERT_EQUALS(ErrorCodes::OperationFailed, _resolver->startup()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus); +} + +void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net, + SyncSourceSelectorMock* selector, + HostAndPort currentSyncSource, + HostAndPort nextSyncSource, + std::vector<BSONObj> docs) { + executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto request = net->scheduleSuccessfulResponse(makeCursorResponse(0, nss, docs)); + ASSERT_EQUALS(currentSyncSource, request.target); + ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.db(), request.dbname); + ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout); + auto firstElement = request.cmdObj.firstElement(); + ASSERT_EQUALS("find"_sd, firstElement.fieldNameStringData()); + ASSERT_EQUALS(SyncSourceResolver::kLocalOplogNss.coll(), firstElement.String()); + ASSERT_EQUALS(1, request.cmdObj.getIntField("limit")); + ASSERT_BSONOBJ_EQ(BSON("$natural" << 1), request.cmdObj.getObjectField("sort")); + + // Change next sync source candidate before delivering scheduled response. + selector->syncSource = nextSyncSource; + + net->runReadyNetworkOperations(); +} + +void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net, + SyncSourceSelectorMock* selector, + HostAndPort currentSyncSource, + HostAndPort nextSyncSource, + Timestamp ts) { + _scheduleFirstOplogEntryFetcherResponse( + net, selector, currentSyncSource, nextSyncSource, {BSON("ts" << ts << "t" << 0)}); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsStatusOkAndTheFoundHostWhenAnEligibleSyncSourceExists) { + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + ResolverReturnsCallbackCanceledIfResolverIsShutdownAfterSchedulingFetcher) { + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _resolver->shutdown(); + executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations(); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsCallbackCanceledIfExecutorIsShutdownAfterSchedulingFetcher) { + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + getExecutor().shutdown(); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, _response.syncSourceStatus); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeDoesNotHaveOldEnoughData) { + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(200, 2)); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kTooStaleBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsOplogStartMissingAndEarliestOpTimeAvailableWhenAllSourcesTooFresh) { + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + HostAndPort candidate3("node3", 12345); + _selector->syncSource = candidate1; + + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(400, 2)); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, candidate3, Timestamp(200, 2)); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate3, HostAndPort(), Timestamp(300, 2)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(ErrorCodes::OplogStartMissing, _response.syncSourceStatus); + ASSERT_EQUALS(Timestamp(200, 2), _response.earliestOpTimeSeen.getTimestamp()); +} + +void _scheduleNetworkErrorForFirstNode(executor::NetworkInterfaceMock* net, + SyncSourceSelectorMock* selector, + HostAndPort currentSyncSource, + HostAndPort nextSyncSource) { + executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net); + ASSERT_TRUE(net->hasReadyRequests()); + auto request = net->scheduleErrorResponse( + Status(ErrorCodes::HostUnreachable, "Sad message from the network :(")); + ASSERT_EQUALS(currentSyncSource, request.target); + + // Change next sync source candidate before delivering error to callback. + selector->syncSource = nextSyncSource; + + net->runReadyNetworkOperations(); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasANetworkError) { + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, candidate2); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsEmptyHostWhenNoViableNodeExistsAfterNetworkErrorOnFirstNode) { + HostAndPort candidate1("node1", 12345); + _selector->syncSource = candidate1; + + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, HostAndPort()); + + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration, + _selector->blacklistUntil); + + ASSERT_EQUALS(HostAndPort(), unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverReturnsScheduleErrorWhenTheSchedulingCommandToSecondNodeFails) { + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node2", 12345); + _selector->syncSource = candidate1; + + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _shouldFailRequest = [candidate2](const executor::RemoteCommandRequest& request) { + return candidate2 == request.target; + }; + + _scheduleNetworkErrorForFirstNode(getNet(), _selector.get(), candidate1, candidate2); + + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFetcherErrorBlacklistDuration, + _selector->blacklistUntil); + + ASSERT_EQUALS(ErrorCodes::OperationFailed, _response.syncSourceStatus); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasAnEmptyOplog) { + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, std::vector<BSONObj>()); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kOplogEmptyBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesWhenTheFirstNodeHasAnEmptyFirstOplogEntry) { + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, {BSONObj()}); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + SyncSourceResolver::kFirstOplogEntryEmptyBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +TEST_F(SyncSourceResolverTest, + SyncSourceResolverWillTryOtherSourcesWhenFirstNodeContainsOplogEntryWithNullTimestamp) { + HostAndPort candidate1("node1", 12345); + HostAndPort candidate2("node1", 12345); + _selector->syncSource = candidate1; + ASSERT_OK(_resolver->startup()); + ASSERT_TRUE(_resolver->isActive()); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate1, candidate2, Timestamp(0, 0)); + + ASSERT_TRUE(_resolver->isActive()); + ASSERT_EQUALS(candidate1, _selector->blacklistHost); + ASSERT_EQUALS(getExecutor().now() + + SyncSourceResolver::kFirstOplogEntryNullTimestampBlacklistDuration, + _selector->blacklistUntil); + + _scheduleFirstOplogEntryFetcherResponse( + getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2)); + + _resolver->join(); + ASSERT_FALSE(_resolver->isActive()); + ASSERT_EQUALS(candidate2, unittest::assertGet(_response.syncSourceStatus)); +} + +} // namespace diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 1eed68daea5..8a0c44d90fc 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -664,6 +664,10 @@ NetworkInterfaceMock::InNetworkGuard::~InNetworkGuard() { _net->exitNetwork(); } +NetworkInterfaceMock* NetworkInterfaceMock::InNetworkGuard::operator->() const { + return _net; +} + NetworkInterfaceMockClockSource::NetworkInterfaceMockClockSource(NetworkInterfaceMock* net) : _net(net) { _tracksSystemClock = false; diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 0ba054f11fe..6a34ceb45b3 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -515,6 +515,11 @@ public: */ ~InNetworkGuard(); + /** + * Returns network interface mock pointer. + */ + NetworkInterfaceMock* operator->() const; + private: NetworkInterfaceMock* _net; bool _callExitNetwork = true; |