diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/SConscript | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 56 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_noop.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_noop.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer.cpp | 198 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer.h | 141 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_version_observer_test.cpp | 180 |
11 files changed, 636 insertions, 13 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 204e8042cd2..bb51efa6b36 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1352,6 +1352,19 @@ env.CppUnitTest( ) env.CppUnitTest( + target='topology_version_observer_test', + source=[ + 'topology_version_observer_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/bson/util/bson_extract', + 'repl_coordinator_impl', + 'repl_coordinator_test_fixture', + 'topology_version_observer', + ] +) + +env.CppUnitTest( target='db_repl_cloners_test', source=[ 'all_database_cloner_test.cpp', @@ -1410,3 +1423,16 @@ env.Library( 'election_reason_counter', ], ) + +env.Library( + target='topology_version_observer', + source=[ + 'topology_version_observer.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/service_context', + 'repl_coordinator_interface', + 'replica_set_messages', + ], +) diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index ef796d6877c..42b5e7503c8 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -966,6 +966,15 @@ public: boost::optional<Date_t> deadline) const = 0; /** + * The futurized version of `awaitIsMasterResponse()`: + * * The future is ready for all cases that `awaitIsMasterResponse()` returns immediately. + * * For cases that `awaitIsMasterResponse()` blocks, calling `get()` on the future is blocking. + */ + virtual SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> getIsMasterResponseFuture( + const SplitHorizon::Parameters& horizonParams, + boost::optional<TopologyVersion> clientTopologyVersion) const = 0; + + /** * Returns the OpTime that consists of the timestamp of the latest oplog entry and the current * term. * This function throws if: diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index bbbbaf3250a..ba2d3a58528 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1962,12 +1962,11 @@ std::shared_ptr<IsMasterResponse> ReplicationCoordinatorImpl::_makeIsMasterRespo return response; } -std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMasterResponse( - OperationContext* opCtx, +SharedSemiFuture<ReplicationCoordinatorImpl::SharedIsMasterResponse> +ReplicationCoordinatorImpl::_getIsMasterResponseFuture( + WithLock lk, const SplitHorizon::Parameters& horizonParams, - boost::optional<TopologyVersion> clientTopologyVersion, - boost::optional<Date_t> deadline) const { - stdx::unique_lock lk(_mutex); + boost::optional<TopologyVersion> clientTopologyVersion) const { const MemberState myState = _topCoord->getMemberState(); if (!_rsConfig.isInitialized() || myState.removed()) { @@ -1977,7 +1976,8 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste auto response = std::make_shared<IsMasterResponse>(); response->setTopologyVersion(_topCoord->getTopologyVersion()); response->markAsNoConfig(); - return response; + return SharedSemiFuture<SharedIsMasterResponse>( + SharedIsMasterResponse(std::move(response))); } const auto& self = _rsConfig.getMemberAt(_selfIndex); @@ -1986,12 +1986,10 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste const StringData horizonString = self.determineHorizon(horizonParams); if (!clientTopologyVersion) { // The client is not using awaitable isMaster so we respond immediately. - return _makeIsMasterResponse(horizonString, lk); + return SharedSemiFuture<SharedIsMasterResponse>( + SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk))); } - // If clientTopologyVersion is not none, deadline must also be not none. - invariant(deadline); - // Each awaitable isMaster will wait on their specific horizon. We always expect horizonString // to exist in _horizonToPromiseMap. auto horizonIter = _horizonToPromiseMap.find(horizonString); @@ -2003,7 +2001,8 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste if (clientTopologyVersion->getProcessId() != topologyVersion.getProcessId()) { // Getting a different process id indicates that the server has restarted so we return // immediately with the updated process id. - return _makeIsMasterResponse(horizonString, lk); + return SharedSemiFuture<SharedIsMasterResponse>( + SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk))); } auto prevCounter = clientTopologyVersion->getCounter(); @@ -2017,9 +2016,40 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste if (prevCounter < topologyVersionCounter) { // The received isMaster command contains a stale topology version so we respond // immediately with a more current topology version. - return _makeIsMasterResponse(horizonString, lk); + return SharedSemiFuture<SharedIsMasterResponse>( + SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk))); + } + + return future; +} + +SharedSemiFuture<ReplicationCoordinatorImpl::SharedIsMasterResponse> +ReplicationCoordinatorImpl::getIsMasterResponseFuture( + const SplitHorizon::Parameters& horizonParams, + boost::optional<TopologyVersion> clientTopologyVersion) const { + stdx::lock_guard lk(_mutex); + return _getIsMasterResponseFuture(lk, horizonParams, clientTopologyVersion); +} + +std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMasterResponse( + OperationContext* opCtx, + const SplitHorizon::Parameters& horizonParams, + boost::optional<TopologyVersion> clientTopologyVersion, + boost::optional<Date_t> deadline) const { + stdx::unique_lock lk(_mutex); + + auto future = _getIsMasterResponseFuture(lk, horizonParams, clientTopologyVersion); + if (future.isReady()) { + return future.get(); } + // If clientTopologyVersion is not none, deadline must also be not none. + invariant(deadline); + const auto myState = _topCoord->getMemberState(); + invariant(_rsConfig.isInitialized() && !myState.removed()); + const auto& self = _rsConfig.getMemberAt(_selfIndex); + const StringData horizonString = self.determineHorizon(horizonParams); + const TopologyVersion topologyVersion = _topCoord->getTopologyVersion(); lk.unlock(); if (MONGO_unlikely(waitForIsMasterResponse.shouldFail())) { @@ -2034,7 +2064,7 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste "Waiting for an isMaster response from a topology change or until deadline: " "{deadline_get}. Current TopologyVersion counter is {topologyVersionCounter}", "deadline_get"_attr = deadline.get(), - "topologyVersionCounter"_attr = topologyVersionCounter); + "topologyVersionCounter"_attr = topologyVersion.getCounter()); auto statusWithIsMaster = futureGetNoThrowWithDeadline(opCtx, future, deadline.get(), opCtx->getTimeoutError()); auto status = statusWithIsMaster.getStatus(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index ac68f2f3d63..e252ce389a5 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -341,6 +341,12 @@ public: virtual void incrementTopologyVersion(OperationContext* opCtx) override; + using SharedIsMasterResponse = std::shared_ptr<const IsMasterResponse>; + + virtual SharedSemiFuture<SharedIsMasterResponse> getIsMasterResponseFuture( + const SplitHorizon::Parameters& horizonParams, + boost::optional<TopologyVersion> clientTopologyVersion) const override; + virtual std::shared_ptr<const IsMasterResponse> awaitIsMasterResponse( OperationContext* opCtx, const SplitHorizon::Parameters& horizonParams, @@ -1131,6 +1137,13 @@ private: */ std::shared_ptr<IsMasterResponse> _makeIsMasterResponse(const StringData horizonString, WithLock) const; + /** + * Creates a semi-future for isMasterResponse. + */ + virtual SharedSemiFuture<SharedIsMasterResponse> _getIsMasterResponseFuture( + WithLock, + const SplitHorizon::Parameters& horizonParams, + boost::optional<TopologyVersion> clientTopologyVersion) const; /** * Utility method that schedules or performs actions specified by a HeartbeatResponseAction diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index ef691cbcb53..074a4e20103 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -563,6 +563,16 @@ void ReplicationCoordinatorMock::incrementTopologyVersion(OperationContext* opCt return; } +SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> +ReplicationCoordinatorMock::getIsMasterResponseFuture( + const SplitHorizon::Parameters& horizonParams, + boost::optional<TopologyVersion> clientTopologyVersion) const { + auto response = + awaitIsMasterResponse(nullptr, horizonParams, clientTopologyVersion, Date_t::now()); + return SharedSemiFuture<std::shared_ptr<const IsMasterResponse>>( + std::shared_ptr<const IsMasterResponse>(response)); +} + std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorMock::awaitIsMasterResponse( OperationContext* opCtx, const SplitHorizon::Parameters& horizonParams, diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index db8e460605d..2b24406bfb2 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -329,6 +329,10 @@ public: boost::optional<TopologyVersion> clientTopologyVersion, boost::optional<Date_t> deadline) const override; + virtual SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> getIsMasterResponseFuture( + const SplitHorizon::Parameters& horizonParams, + boost::optional<TopologyVersion> clientTopologyVersion) const override; + virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const override; virtual HostAndPort getCurrentPrimaryHostAndPort() const override; diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp index 59d96c0bbfd..0107f1a1aae 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.cpp +++ b/src/mongo/db/repl/replication_coordinator_noop.cpp @@ -480,6 +480,14 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorNoOp::awaitIsMaste MONGO_UNREACHABLE; } + +SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> +ReplicationCoordinatorNoOp::getIsMasterResponseFuture( + const SplitHorizon::Parameters& horizonParams, + boost::optional<TopologyVersion> clientTopologyVersion) const { + MONGO_UNREACHABLE; +} + OpTime ReplicationCoordinatorNoOp::getLatestWriteOpTime(OperationContext* opCtx) const { return getMyLastAppliedOpTime(); } diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h index 4b385d86423..d5e17658516 100644 --- a/src/mongo/db/repl/replication_coordinator_noop.h +++ b/src/mongo/db/repl/replication_coordinator_noop.h @@ -266,6 +266,10 @@ public: boost::optional<TopologyVersion> clientTopologyVersion, boost::optional<Date_t> deadline) const final; + SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> getIsMasterResponseFuture( + const SplitHorizon::Parameters& horizonParams, + boost::optional<TopologyVersion> clientTopologyVersion) const final; + OpTime getLatestWriteOpTime(OperationContext* opCtx) const override; HostAndPort getCurrentPrimaryHostAndPort() const override; diff --git a/src/mongo/db/repl/topology_version_observer.cpp b/src/mongo/db/repl/topology_version_observer.cpp new file mode 100644 index 00000000000..9d7cef07fd2 --- /dev/null +++ b/src/mongo/db/repl/topology_version_observer.cpp @@ -0,0 +1,198 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication + +#include "mongo/db/repl/topology_version_observer.h" + +#include "mongo/base/status_with.h" +#include "mongo/db/client.h" +#include "mongo/db/operation_context.h" +#include "mongo/logv2/log.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace repl { + +void TopologyVersionObserver::init(ReplicationCoordinator* replCoordinator) noexcept { + invariant(replCoordinator); + stdx::lock_guard<Mutex> lk(_mutex); + invariant(_state.load() == State::kUninitialized); + + LOGV2_INFO(40440, + "Starting {topologyVersionObserverName}", + "topologyVersionObserverName"_attr = toString()); + _replCoordinator = replCoordinator; + + invariant(!_thread); + _thread = stdx::thread([&]() { this->_workerThreadBody(); }); + + // Wait for the observer thread to update the status. + while (_state.load() != State::kRunning) { + } +} + +void TopologyVersionObserver::shutdown() noexcept { + stdx::unique_lock<Mutex> lk(_mutex); + if (_state.load() == State::kUninitialized) { + return; + } + + // Check if another `shutdown()` has already completed. + if (!_thread) { + return; + } + + LOGV2_INFO(40441, + "Stopping {topologyVersionObserverName}", + "topologyVersionObserverName"_attr = toString()); + auto state = _state.load(); + invariant(state == State::kRunning || state == State::kShutdown); + + // Wait for the observer client to exit from its main loop. + // Observer thread must update the state before attempting to acquire the mutex. + while (_state.load() == State::kRunning) { + invariant(_observerClient); + _observerClient->lock(); + auto opCtx = _observerClient->getOperationContext(); + if (opCtx) { + opCtx->markKilled(ErrorCodes::ShutdownInProgress); + } + _observerClient->unlock(); + } + + invariant(_state.load() == State::kShutdown); + auto thread = std::exchange(_thread, boost::none); + lk.unlock(); + + invariant(thread); + thread->join(); +} + +std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::getCached() noexcept { + // Acquires the lock to avoid potential races with `_workerThreadBody()`. + // Atomics cannot be used here as `shared_ptr` cannot be atomically updated. + stdx::lock_guard<Mutex> lk(_mutex); + return _cache; +} + +std::string TopologyVersionObserver::toString() const { + return str::stream() << kTopologyVersionObserverName; +} + +std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::_getIsMasterResponse( + boost::optional<TopologyVersion> topologyVersion, bool* shouldShutdown) try { + invariant(*shouldShutdown == false); + ServiceContext::UniqueOperationContext opCtx; + try { + opCtx = Client::getCurrent()->makeOperationContext(); + } catch (...) { + // Failure to create an operation context could cause deadlocks. + *shouldShutdown = true; + LOGV2_WARNING(40442, "Observer was unable to create a new OperationContext."); + return nullptr; + } + + invariant(opCtx); + auto future = _replCoordinator->getIsMasterResponseFuture({}, topologyVersion); + auto response = future.get(opCtx.get()); + if (!response->isConfigSet()) { + return nullptr; + } + + return response; +} catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) { + LOGV2_WARNING( + 40443, "Observer was interrupted by {exception}", "exception"_attr = e.toString()); + *shouldShutdown = true; + return nullptr; +} catch (DBException& e) { + LOGV2_WARNING(40444, + "Observer could not retrieve isMasterResponse: {exception}", + "exception"_attr = e.toString()); + return nullptr; +} + +void TopologyVersionObserver::_workerThreadBody() noexcept { + invariant(_state.load() == State::kUninitialized); + + // Creates a new client and makes `_observerClient` to point to it, which allows `shutdown()` + // to access the client object. + Client::initThread(kTopologyVersionObserverName); + _observerClient = Client::getCurrent(); + + // `init()` must hold the mutex until the observer updates the state. + invariant(!_mutex.try_lock()); + // The following notifies `init()` that `_observerClient` is set and ready to use. + _state.store(State::kRunning); + + auto getTopologyVersion = [&]() -> boost::optional<TopologyVersion> { + // Only the observer thread updates `_cache`, thus there is no need to hold the lock before + // accessing `_cache` here. + if (_cache) { + return _cache->getTopologyVersion(); + } + return boost::none; + }; + + ON_BLOCK_EXIT([&] { + // Once the observer detects a shutdown, it must update the state first before attempting + // to acquire the lock. This is necessary to avoid deadlocks. + invariant(_state.load() == State::kRunning); + _state.store(State::kShutdown); + + stdx::unique_lock lock(_mutex); + + // Invalidate the cache as it is no longer updated + _cache.reset(); + + // Client object is local to this thread, and is no longer be available. + _observerClient = nullptr; + + LOGV2_INFO(40447, + "Stopped {topologyVersionObserverName}", + "topologyVersionObserverName"_attr = toString()); + }); + + bool shouldShutdown = false; + LOGV2_INFO(40445, + "Started {topologyVersionObserverName}", + "topologyVersionObserverName"_attr = toString()); + while (!shouldShutdown) { + auto response = _getIsMasterResponse(getTopologyVersion(), &shouldShutdown); + // Only update if the version is more recent than the cached version, or `_cache` is null. + if (!shouldShutdown && response != _cache) { + stdx::lock_guard lock(_mutex); + _cache = response; + } + } +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/topology_version_observer.h b/src/mongo/db/repl/topology_version_observer.h new file mode 100644 index 00000000000..afc55321ab2 --- /dev/null +++ b/src/mongo/db/repl/topology_version_observer.h @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/none.hpp> +#include <boost/optional.hpp> +#include <memory> +#include <string> + +#include "mongo/db/repl/is_master_response.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class Client; + +namespace repl { + +constexpr auto kTopologyVersionObserverName = "TopologyVersionObserver"; + +/** + * An utility to observe topology changes asynchronously and cache updates. + * `getCached()` is thread-safe (through `_mutex`), but its behavior is undefined during the + * execution of the constructor/destructor methods. + * + * The life-cycle for instances of this class (`_state`) is as follows: + * * Uninitialized: immediately after construction. + * Call `init()` to initialize the instance and start the observer thread. + * You may only call `init()` once -- otherwise, it will terminate the process. + * * Running: anytime after returning from `init()` and before calling `shutdown()`. + * Note that if the observer thread stops due to an error, it will set the state to Shutdown. + * * Shutdown: the object is ready for destruction. + * You must wait for `shutdown()` to return before deleting the object. + * Multiple, multithreaded calls to `shutdown()` is safe, and only one will proceed. + * After transitioning to shutdown, you can only call the destructor. + * + * constructor() -> init() -> getCached() ... getCached() -> shutdown() -> destructor() + */ +class TopologyVersionObserver final { +public: + TopologyVersionObserver() {} + + TopologyVersionObserver(const TopologyVersionObserver&) = delete; + + TopologyVersionObserver(TopologyVersionObserver&&) noexcept = delete; + + TopologyVersionObserver& operator=(const TopologyVersionObserver&) = delete; + + TopologyVersionObserver& operator=(TopologyVersionObserver&&) = delete; + + ~TopologyVersionObserver() { + auto state = _state.load(); + invariant(state == State::kShutdown || state == State::kUninitialized); + } + + void init(ReplicationCoordinator* replCoordinator) noexcept; + + void shutdown() noexcept; + + /** + * Returns a reference (shared pointer) to the cached version of `IsMasterResponse`. + * Note that the reference is initially set to `nullptr`. + * Also, the reference is set back to `nullptr` if the thread that updates `_cache` terminates + * due to an error (i.e., exception), or it receives an invalid response. + */ + std::shared_ptr<const IsMasterResponse> getCached() noexcept; + + std::string toString() const; + +private: + enum class State { + kUninitialized, + kRunning, + kShutdown, + }; + + std::shared_ptr<const IsMasterResponse> _getIsMasterResponse(boost::optional<TopologyVersion>, + bool*); + + void _workerThreadBody() noexcept; + + /** + * Protects shared accesses to `_cache`, `_observerClient`, and serializes calls to `init()` + * and `shutdown()` methods. + * + * Accessing the cached `IsMasterResponse` follows a single-producer, multi-consumer model: + * consumers are readers of `_cache` and the producer is the observer thread. The assumption + * is that the contention on this lock is insignificant. + */ + mutable Mutex _mutex = MONGO_MAKE_LATCH(kTopologyVersionObserverName); + + // The reference to the latest cached version of `IsMasterResponse` + std::shared_ptr<const IsMasterResponse> _cache; + + // Holds a reference to the observer client to allow `shutdown()` to stop the observer thread. + Client* _observerClient; + + /** + * Represents the current state of the observer. + * Note that the observer thread never updates the state. + */ + AtomicWord<State> _state; + + boost::optional<stdx::thread> _thread; + + ReplicationCoordinator* _replCoordinator; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/topology_version_observer_test.cpp b/src/mongo/db/repl/topology_version_observer_test.cpp new file mode 100644 index 00000000000..348a1b62fbd --- /dev/null +++ b/src/mongo/db/repl/topology_version_observer_test.cpp @@ -0,0 +1,180 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include <functional> +#include <iostream> +#include <memory> + +#include "mongo/bson/timestamp.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/repl/is_master_response.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/replication_coordinator_test_fixture.h" +#include "mongo/db/repl/topology_version_observer.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace repl { +namespace { + +/** + * Sets up and tears down the test environment for `TopologyVersionObserver` + */ +class TopologyVersionObserverTest : public ReplCoordTest { +protected: + BSONObj getConfigObj() { + BSONObjBuilder configBuilder; + configBuilder << "_id" + << "mySet"; + configBuilder << "version" << 1; + configBuilder << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345")); + configBuilder << "protocolVersion" << 1; + return configBuilder.obj(); + } + +public: + TopologyVersionObserverTest() { + auto configObj = getConfigObj(); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + replCoord = getReplCoord(); + + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1), Date_t() + Seconds(100)); + replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1), Date_t() + Seconds(100)); + simulateSuccessfulV1Election(); + ASSERT(replCoord->getMemberState().primary()); + + getNet()->enterNetwork(); + getNet()->advanceTime(Date_t::now() + sleepTime); + getNet()->exitNetwork(); + + observer = std::make_unique<TopologyVersionObserver>(); + observer->init(replCoord); + } + + ~TopologyVersionObserverTest() { + observer->shutdown(); + } + + auto getObserverCache() { + // Wait for observer to initialize its cache. Due to the unpredictable nature of thread + // scheduling, do not change the following to a fixed-wait. + auto cache = observer->getCached(); + while (!cache) { + sleepFor(sleepTime); + cache = observer->getCached(); + } + + return cache; + } + +protected: + ReplicationCoordinatorImpl* replCoord; + + const Milliseconds sleepTime = Milliseconds(100); + + std::unique_ptr<TopologyVersionObserver> observer; +}; + + +TEST_F(TopologyVersionObserverTest, PopulateCache) { + auto cachedResponse = getObserverCache(); + ASSERT(cachedResponse); + + auto opCtx = makeOperationContext(); + auto expectedResponse = + replCoord->awaitIsMasterResponse(opCtx.get(), {}, boost::none, boost::none); + ASSERT_EQ(cachedResponse->toBSON().toString(), expectedResponse->toBSON().toString()); +} + +TEST_F(TopologyVersionObserverTest, UpdateCache) { + auto cachedResponse = getObserverCache(); + ASSERT(cachedResponse); + + // Force an election to advance topology version + auto opCtx = makeOperationContext(); + auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest(); + simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen, opCtx.get()); + + // Wait for the observer to update its cache + while (observer->getCached()->getTopologyVersion()->getCounter() == + cachedResponse->getTopologyVersion()->getCounter()) { + sleepFor(sleepTime); + } + + auto newResponse = observer->getCached(); + ASSERT(newResponse && newResponse->getTopologyVersion()); + ASSERT(newResponse->getTopologyVersion()->getCounter() > + cachedResponse->getTopologyVersion()->getCounter()); + + auto expectedResponse = + replCoord->awaitIsMasterResponse(opCtx.get(), {}, boost::none, boost::none); + ASSERT(expectedResponse && expectedResponse->getTopologyVersion()); + + ASSERT_EQ(newResponse->getTopologyVersion()->getCounter(), + expectedResponse->getTopologyVersion()->getCounter()); +} + +TEST_F(TopologyVersionObserverTest, HandleDBException) { + auto cachedResponse = getObserverCache(); + ASSERT(cachedResponse); + + // Kill the operation waiting on the `isMaster` future to make it throw + auto cur = ServiceContext::LockedClientsCursor(getGlobalServiceContext()); + while (auto client = cur.next()) { + if (client->desc() == kTopologyVersionObserverName) { + client->lock(); + ASSERT(client->getOperationContext()); + client->getOperationContext()->markKilled(ErrorCodes::ShutdownInProgress); + client->unlock(); + } + } + + // Observer thread must handle the exception and fetch the most recent IMR + auto newResponse = getObserverCache(); + ASSERT(newResponse->getTopologyVersion()->getCounter() == + cachedResponse->getTopologyVersion()->getCounter()); +} + +} // namespace +} // namespace repl +} // namespace mongo |