summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAmirsaman Memaripour <amirsaman.memaripour@10gen.com>2020-02-13 23:54:42 +0000
committerevergreen <evergreen@mongodb.com>2020-02-13 23:54:42 +0000
commit336ef860c53c1c538ba637817c2b9da2bdf30046 (patch)
treed37114efda541c2fa9572b9a0d3d328f891053fc /src/mongo
parentfa4944cf85dbf40319c61e9c286c4f37c07fd084 (diff)
downloadmongo-336ef860c53c1c538ba637817c2b9da2bdf30046.tar.gz
SERVER-45815 Make observer for topologyVersion change
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/SConscript26
-rw-r--r--src/mongo/db/repl/replication_coordinator.h9
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp56
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h13
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_noop.h4
-rw-r--r--src/mongo/db/repl/topology_version_observer.cpp198
-rw-r--r--src/mongo/db/repl/topology_version_observer.h141
-rw-r--r--src/mongo/db/repl/topology_version_observer_test.cpp180
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.cpp7
-rw-r--r--src/mongo/embedded/replication_coordinator_embedded.h4
13 files changed, 647 insertions, 13 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 204e8042cd2..bb51efa6b36 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1352,6 +1352,19 @@ env.CppUnitTest(
)
env.CppUnitTest(
+ target='topology_version_observer_test',
+ source=[
+ 'topology_version_observer_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/bson/util/bson_extract',
+ 'repl_coordinator_impl',
+ 'repl_coordinator_test_fixture',
+ 'topology_version_observer',
+ ]
+)
+
+env.CppUnitTest(
target='db_repl_cloners_test',
source=[
'all_database_cloner_test.cpp',
@@ -1410,3 +1423,16 @@ env.Library(
'election_reason_counter',
],
)
+
+env.Library(
+ target='topology_version_observer',
+ source=[
+ 'topology_version_observer.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/service_context',
+ 'repl_coordinator_interface',
+ 'replica_set_messages',
+ ],
+)
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index ef796d6877c..42b5e7503c8 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -966,6 +966,15 @@ public:
boost::optional<Date_t> deadline) const = 0;
/**
+ * The futurized version of `awaitIsMasterResponse()`:
+ * * The future is ready for all cases that `awaitIsMasterResponse()` returns immediately.
+ * * For cases that `awaitIsMasterResponse()` blocks, calling `get()` on the future is blocking.
+ */
+ virtual SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> getIsMasterResponseFuture(
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const = 0;
+
+ /**
* Returns the OpTime that consists of the timestamp of the latest oplog entry and the current
* term.
* This function throws if:
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index bbbbaf3250a..ba2d3a58528 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1962,12 +1962,11 @@ std::shared_ptr<IsMasterResponse> ReplicationCoordinatorImpl::_makeIsMasterRespo
return response;
}
-std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMasterResponse(
- OperationContext* opCtx,
+SharedSemiFuture<ReplicationCoordinatorImpl::SharedIsMasterResponse>
+ReplicationCoordinatorImpl::_getIsMasterResponseFuture(
+ WithLock lk,
const SplitHorizon::Parameters& horizonParams,
- boost::optional<TopologyVersion> clientTopologyVersion,
- boost::optional<Date_t> deadline) const {
- stdx::unique_lock lk(_mutex);
+ boost::optional<TopologyVersion> clientTopologyVersion) const {
const MemberState myState = _topCoord->getMemberState();
if (!_rsConfig.isInitialized() || myState.removed()) {
@@ -1977,7 +1976,8 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste
auto response = std::make_shared<IsMasterResponse>();
response->setTopologyVersion(_topCoord->getTopologyVersion());
response->markAsNoConfig();
- return response;
+ return SharedSemiFuture<SharedIsMasterResponse>(
+ SharedIsMasterResponse(std::move(response)));
}
const auto& self = _rsConfig.getMemberAt(_selfIndex);
@@ -1986,12 +1986,10 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste
const StringData horizonString = self.determineHorizon(horizonParams);
if (!clientTopologyVersion) {
// The client is not using awaitable isMaster so we respond immediately.
- return _makeIsMasterResponse(horizonString, lk);
+ return SharedSemiFuture<SharedIsMasterResponse>(
+ SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk)));
}
- // If clientTopologyVersion is not none, deadline must also be not none.
- invariant(deadline);
-
// Each awaitable isMaster will wait on their specific horizon. We always expect horizonString
// to exist in _horizonToPromiseMap.
auto horizonIter = _horizonToPromiseMap.find(horizonString);
@@ -2003,7 +2001,8 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste
if (clientTopologyVersion->getProcessId() != topologyVersion.getProcessId()) {
// Getting a different process id indicates that the server has restarted so we return
// immediately with the updated process id.
- return _makeIsMasterResponse(horizonString, lk);
+ return SharedSemiFuture<SharedIsMasterResponse>(
+ SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk)));
}
auto prevCounter = clientTopologyVersion->getCounter();
@@ -2017,9 +2016,40 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste
if (prevCounter < topologyVersionCounter) {
// The received isMaster command contains a stale topology version so we respond
// immediately with a more current topology version.
- return _makeIsMasterResponse(horizonString, lk);
+ return SharedSemiFuture<SharedIsMasterResponse>(
+ SharedIsMasterResponse(_makeIsMasterResponse(horizonString, lk)));
+ }
+
+ return future;
+}
+
+SharedSemiFuture<ReplicationCoordinatorImpl::SharedIsMasterResponse>
+ReplicationCoordinatorImpl::getIsMasterResponseFuture(
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const {
+ stdx::lock_guard lk(_mutex);
+ return _getIsMasterResponseFuture(lk, horizonParams, clientTopologyVersion);
+}
+
+std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMasterResponse(
+ OperationContext* opCtx,
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion,
+ boost::optional<Date_t> deadline) const {
+ stdx::unique_lock lk(_mutex);
+
+ auto future = _getIsMasterResponseFuture(lk, horizonParams, clientTopologyVersion);
+ if (future.isReady()) {
+ return future.get();
}
+ // If clientTopologyVersion is not none, deadline must also be not none.
+ invariant(deadline);
+ const auto myState = _topCoord->getMemberState();
+ invariant(_rsConfig.isInitialized() && !myState.removed());
+ const auto& self = _rsConfig.getMemberAt(_selfIndex);
+ const StringData horizonString = self.determineHorizon(horizonParams);
+ const TopologyVersion topologyVersion = _topCoord->getTopologyVersion();
lk.unlock();
if (MONGO_unlikely(waitForIsMasterResponse.shouldFail())) {
@@ -2034,7 +2064,7 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste
"Waiting for an isMaster response from a topology change or until deadline: "
"{deadline_get}. Current TopologyVersion counter is {topologyVersionCounter}",
"deadline_get"_attr = deadline.get(),
- "topologyVersionCounter"_attr = topologyVersionCounter);
+ "topologyVersionCounter"_attr = topologyVersion.getCounter());
auto statusWithIsMaster =
futureGetNoThrowWithDeadline(opCtx, future, deadline.get(), opCtx->getTimeoutError());
auto status = statusWithIsMaster.getStatus();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index ac68f2f3d63..e252ce389a5 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -341,6 +341,12 @@ public:
virtual void incrementTopologyVersion(OperationContext* opCtx) override;
+ using SharedIsMasterResponse = std::shared_ptr<const IsMasterResponse>;
+
+ virtual SharedSemiFuture<SharedIsMasterResponse> getIsMasterResponseFuture(
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const override;
+
virtual std::shared_ptr<const IsMasterResponse> awaitIsMasterResponse(
OperationContext* opCtx,
const SplitHorizon::Parameters& horizonParams,
@@ -1131,6 +1137,13 @@ private:
*/
std::shared_ptr<IsMasterResponse> _makeIsMasterResponse(const StringData horizonString,
WithLock) const;
+ /**
+ * Creates a semi-future for isMasterResponse.
+ */
+ virtual SharedSemiFuture<SharedIsMasterResponse> _getIsMasterResponseFuture(
+ WithLock,
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const;
/**
* Utility method that schedules or performs actions specified by a HeartbeatResponseAction
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index ef691cbcb53..074a4e20103 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -563,6 +563,16 @@ void ReplicationCoordinatorMock::incrementTopologyVersion(OperationContext* opCt
return;
}
+SharedSemiFuture<std::shared_ptr<const IsMasterResponse>>
+ReplicationCoordinatorMock::getIsMasterResponseFuture(
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const {
+ auto response =
+ awaitIsMasterResponse(nullptr, horizonParams, clientTopologyVersion, Date_t::now());
+ return SharedSemiFuture<std::shared_ptr<const IsMasterResponse>>(
+ std::shared_ptr<const IsMasterResponse>(response));
+}
+
std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorMock::awaitIsMasterResponse(
OperationContext* opCtx,
const SplitHorizon::Parameters& horizonParams,
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index db8e460605d..2b24406bfb2 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -329,6 +329,10 @@ public:
boost::optional<TopologyVersion> clientTopologyVersion,
boost::optional<Date_t> deadline) const override;
+ virtual SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> getIsMasterResponseFuture(
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const override;
+
virtual OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
virtual HostAndPort getCurrentPrimaryHostAndPort() const override;
diff --git a/src/mongo/db/repl/replication_coordinator_noop.cpp b/src/mongo/db/repl/replication_coordinator_noop.cpp
index 59d96c0bbfd..0107f1a1aae 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.cpp
+++ b/src/mongo/db/repl/replication_coordinator_noop.cpp
@@ -480,6 +480,14 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorNoOp::awaitIsMaste
MONGO_UNREACHABLE;
}
+
+SharedSemiFuture<std::shared_ptr<const IsMasterResponse>>
+ReplicationCoordinatorNoOp::getIsMasterResponseFuture(
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const {
+ MONGO_UNREACHABLE;
+}
+
OpTime ReplicationCoordinatorNoOp::getLatestWriteOpTime(OperationContext* opCtx) const {
return getMyLastAppliedOpTime();
}
diff --git a/src/mongo/db/repl/replication_coordinator_noop.h b/src/mongo/db/repl/replication_coordinator_noop.h
index 4b385d86423..d5e17658516 100644
--- a/src/mongo/db/repl/replication_coordinator_noop.h
+++ b/src/mongo/db/repl/replication_coordinator_noop.h
@@ -266,6 +266,10 @@ public:
boost::optional<TopologyVersion> clientTopologyVersion,
boost::optional<Date_t> deadline) const final;
+ SharedSemiFuture<std::shared_ptr<const IsMasterResponse>> getIsMasterResponseFuture(
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const final;
+
OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
HostAndPort getCurrentPrimaryHostAndPort() const override;
diff --git a/src/mongo/db/repl/topology_version_observer.cpp b/src/mongo/db/repl/topology_version_observer.cpp
new file mode 100644
index 00000000000..9d7cef07fd2
--- /dev/null
+++ b/src/mongo/db/repl/topology_version_observer.cpp
@@ -0,0 +1,198 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplication
+
+#include "mongo/db/repl/topology_version_observer.h"
+
+#include "mongo/base/status_with.h"
+#include "mongo/db/client.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/logv2/log.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+namespace repl {
+
+void TopologyVersionObserver::init(ReplicationCoordinator* replCoordinator) noexcept {
+ invariant(replCoordinator);
+ stdx::lock_guard<Mutex> lk(_mutex);
+ invariant(_state.load() == State::kUninitialized);
+
+ LOGV2_INFO(40440,
+ "Starting {topologyVersionObserverName}",
+ "topologyVersionObserverName"_attr = toString());
+ _replCoordinator = replCoordinator;
+
+ invariant(!_thread);
+ _thread = stdx::thread([&]() { this->_workerThreadBody(); });
+
+ // Wait for the observer thread to update the status.
+ while (_state.load() != State::kRunning) {
+ }
+}
+
+void TopologyVersionObserver::shutdown() noexcept {
+ stdx::unique_lock<Mutex> lk(_mutex);
+ if (_state.load() == State::kUninitialized) {
+ return;
+ }
+
+ // Check if another `shutdown()` has already completed.
+ if (!_thread) {
+ return;
+ }
+
+ LOGV2_INFO(40441,
+ "Stopping {topologyVersionObserverName}",
+ "topologyVersionObserverName"_attr = toString());
+ auto state = _state.load();
+ invariant(state == State::kRunning || state == State::kShutdown);
+
+ // Wait for the observer client to exit from its main loop.
+ // Observer thread must update the state before attempting to acquire the mutex.
+ while (_state.load() == State::kRunning) {
+ invariant(_observerClient);
+ _observerClient->lock();
+ auto opCtx = _observerClient->getOperationContext();
+ if (opCtx) {
+ opCtx->markKilled(ErrorCodes::ShutdownInProgress);
+ }
+ _observerClient->unlock();
+ }
+
+ invariant(_state.load() == State::kShutdown);
+ auto thread = std::exchange(_thread, boost::none);
+ lk.unlock();
+
+ invariant(thread);
+ thread->join();
+}
+
+std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::getCached() noexcept {
+ // Acquires the lock to avoid potential races with `_workerThreadBody()`.
+ // Atomics cannot be used here as `shared_ptr` cannot be atomically updated.
+ stdx::lock_guard<Mutex> lk(_mutex);
+ return _cache;
+}
+
+std::string TopologyVersionObserver::toString() const {
+ return str::stream() << kTopologyVersionObserverName;
+}
+
+std::shared_ptr<const IsMasterResponse> TopologyVersionObserver::_getIsMasterResponse(
+ boost::optional<TopologyVersion> topologyVersion, bool* shouldShutdown) try {
+ invariant(*shouldShutdown == false);
+ ServiceContext::UniqueOperationContext opCtx;
+ try {
+ opCtx = Client::getCurrent()->makeOperationContext();
+ } catch (...) {
+ // Failure to create an operation context could cause deadlocks.
+ *shouldShutdown = true;
+ LOGV2_WARNING(40442, "Observer was unable to create a new OperationContext.");
+ return nullptr;
+ }
+
+ invariant(opCtx);
+ auto future = _replCoordinator->getIsMasterResponseFuture({}, topologyVersion);
+ auto response = future.get(opCtx.get());
+ if (!response->isConfigSet()) {
+ return nullptr;
+ }
+
+ return response;
+} catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) {
+ LOGV2_WARNING(
+ 40443, "Observer was interrupted by {exception}", "exception"_attr = e.toString());
+ *shouldShutdown = true;
+ return nullptr;
+} catch (DBException& e) {
+ LOGV2_WARNING(40444,
+ "Observer could not retrieve isMasterResponse: {exception}",
+ "exception"_attr = e.toString());
+ return nullptr;
+}
+
+void TopologyVersionObserver::_workerThreadBody() noexcept {
+ invariant(_state.load() == State::kUninitialized);
+
+ // Creates a new client and makes `_observerClient` to point to it, which allows `shutdown()`
+ // to access the client object.
+ Client::initThread(kTopologyVersionObserverName);
+ _observerClient = Client::getCurrent();
+
+ // `init()` must hold the mutex until the observer updates the state.
+ invariant(!_mutex.try_lock());
+ // The following notifies `init()` that `_observerClient` is set and ready to use.
+ _state.store(State::kRunning);
+
+ auto getTopologyVersion = [&]() -> boost::optional<TopologyVersion> {
+ // Only the observer thread updates `_cache`, thus there is no need to hold the lock before
+ // accessing `_cache` here.
+ if (_cache) {
+ return _cache->getTopologyVersion();
+ }
+ return boost::none;
+ };
+
+ ON_BLOCK_EXIT([&] {
+ // Once the observer detects a shutdown, it must update the state first before attempting
+ // to acquire the lock. This is necessary to avoid deadlocks.
+ invariant(_state.load() == State::kRunning);
+ _state.store(State::kShutdown);
+
+ stdx::unique_lock lock(_mutex);
+
+ // Invalidate the cache as it is no longer updated
+ _cache.reset();
+
+ // Client object is local to this thread, and is no longer be available.
+ _observerClient = nullptr;
+
+ LOGV2_INFO(40447,
+ "Stopped {topologyVersionObserverName}",
+ "topologyVersionObserverName"_attr = toString());
+ });
+
+ bool shouldShutdown = false;
+ LOGV2_INFO(40445,
+ "Started {topologyVersionObserverName}",
+ "topologyVersionObserverName"_attr = toString());
+ while (!shouldShutdown) {
+ auto response = _getIsMasterResponse(getTopologyVersion(), &shouldShutdown);
+ // Only update if the version is more recent than the cached version, or `_cache` is null.
+ if (!shouldShutdown && response != _cache) {
+ stdx::lock_guard lock(_mutex);
+ _cache = response;
+ }
+ }
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/topology_version_observer.h b/src/mongo/db/repl/topology_version_observer.h
new file mode 100644
index 00000000000..afc55321ab2
--- /dev/null
+++ b/src/mongo/db/repl/topology_version_observer.h
@@ -0,0 +1,141 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/none.hpp>
+#include <boost/optional.hpp>
+#include <memory>
+#include <string>
+
+#include "mongo/db/repl/is_master_response.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class Client;
+
+namespace repl {
+
+constexpr auto kTopologyVersionObserverName = "TopologyVersionObserver";
+
+/**
+ * An utility to observe topology changes asynchronously and cache updates.
+ * `getCached()` is thread-safe (through `_mutex`), but its behavior is undefined during the
+ * execution of the constructor/destructor methods.
+ *
+ * The life-cycle for instances of this class (`_state`) is as follows:
+ * * Uninitialized: immediately after construction.
+ * Call `init()` to initialize the instance and start the observer thread.
+ * You may only call `init()` once -- otherwise, it will terminate the process.
+ * * Running: anytime after returning from `init()` and before calling `shutdown()`.
+ * Note that if the observer thread stops due to an error, it will set the state to Shutdown.
+ * * Shutdown: the object is ready for destruction.
+ * You must wait for `shutdown()` to return before deleting the object.
+ * Multiple, multithreaded calls to `shutdown()` is safe, and only one will proceed.
+ * After transitioning to shutdown, you can only call the destructor.
+ *
+ * constructor() -> init() -> getCached() ... getCached() -> shutdown() -> destructor()
+ */
+class TopologyVersionObserver final {
+public:
+ TopologyVersionObserver() {}
+
+ TopologyVersionObserver(const TopologyVersionObserver&) = delete;
+
+ TopologyVersionObserver(TopologyVersionObserver&&) noexcept = delete;
+
+ TopologyVersionObserver& operator=(const TopologyVersionObserver&) = delete;
+
+ TopologyVersionObserver& operator=(TopologyVersionObserver&&) = delete;
+
+ ~TopologyVersionObserver() {
+ auto state = _state.load();
+ invariant(state == State::kShutdown || state == State::kUninitialized);
+ }
+
+ void init(ReplicationCoordinator* replCoordinator) noexcept;
+
+ void shutdown() noexcept;
+
+ /**
+ * Returns a reference (shared pointer) to the cached version of `IsMasterResponse`.
+ * Note that the reference is initially set to `nullptr`.
+ * Also, the reference is set back to `nullptr` if the thread that updates `_cache` terminates
+ * due to an error (i.e., exception), or it receives an invalid response.
+ */
+ std::shared_ptr<const IsMasterResponse> getCached() noexcept;
+
+ std::string toString() const;
+
+private:
+ enum class State {
+ kUninitialized,
+ kRunning,
+ kShutdown,
+ };
+
+ std::shared_ptr<const IsMasterResponse> _getIsMasterResponse(boost::optional<TopologyVersion>,
+ bool*);
+
+ void _workerThreadBody() noexcept;
+
+ /**
+ * Protects shared accesses to `_cache`, `_observerClient`, and serializes calls to `init()`
+ * and `shutdown()` methods.
+ *
+ * Accessing the cached `IsMasterResponse` follows a single-producer, multi-consumer model:
+ * consumers are readers of `_cache` and the producer is the observer thread. The assumption
+ * is that the contention on this lock is insignificant.
+ */
+ mutable Mutex _mutex = MONGO_MAKE_LATCH(kTopologyVersionObserverName);
+
+ // The reference to the latest cached version of `IsMasterResponse`
+ std::shared_ptr<const IsMasterResponse> _cache;
+
+ // Holds a reference to the observer client to allow `shutdown()` to stop the observer thread.
+ Client* _observerClient;
+
+ /**
+ * Represents the current state of the observer.
+ * Note that the observer thread never updates the state.
+ */
+ AtomicWord<State> _state;
+
+ boost::optional<stdx::thread> _thread;
+
+ ReplicationCoordinator* _replCoordinator;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/topology_version_observer_test.cpp b/src/mongo/db/repl/topology_version_observer_test.cpp
new file mode 100644
index 00000000000..348a1b62fbd
--- /dev/null
+++ b/src/mongo/db/repl/topology_version_observer_test.cpp
@@ -0,0 +1,180 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include <functional>
+#include <iostream>
+#include <memory>
+
+#include "mongo/bson/timestamp.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/db/repl/is_master_response.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/replication_coordinator_test_fixture.h"
+#include "mongo/db/repl/topology_version_observer.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace repl {
+namespace {
+
+/**
+ * Sets up and tears down the test environment for `TopologyVersionObserver`
+ */
+class TopologyVersionObserverTest : public ReplCoordTest {
+protected:
+ BSONObj getConfigObj() {
+ BSONObjBuilder configBuilder;
+ configBuilder << "_id"
+ << "mySet";
+ configBuilder << "version" << 1;
+ configBuilder << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:12345"));
+ configBuilder << "protocolVersion" << 1;
+ return configBuilder.obj();
+ }
+
+public:
+ TopologyVersionObserverTest() {
+ auto configObj = getConfigObj();
+ assertStartSuccess(configObj, HostAndPort("node1", 12345));
+ ReplSetConfig config = assertMakeRSConfig(configObj);
+ replCoord = getReplCoord();
+
+ ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+ replCoordSetMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 1), Date_t() + Seconds(100));
+ replCoordSetMyLastDurableOpTime(OpTime(Timestamp(100, 1), 1), Date_t() + Seconds(100));
+ simulateSuccessfulV1Election();
+ ASSERT(replCoord->getMemberState().primary());
+
+ getNet()->enterNetwork();
+ getNet()->advanceTime(Date_t::now() + sleepTime);
+ getNet()->exitNetwork();
+
+ observer = std::make_unique<TopologyVersionObserver>();
+ observer->init(replCoord);
+ }
+
+ ~TopologyVersionObserverTest() {
+ observer->shutdown();
+ }
+
+ auto getObserverCache() {
+ // Wait for observer to initialize its cache. Due to the unpredictable nature of thread
+ // scheduling, do not change the following to a fixed-wait.
+ auto cache = observer->getCached();
+ while (!cache) {
+ sleepFor(sleepTime);
+ cache = observer->getCached();
+ }
+
+ return cache;
+ }
+
+protected:
+ ReplicationCoordinatorImpl* replCoord;
+
+ const Milliseconds sleepTime = Milliseconds(100);
+
+ std::unique_ptr<TopologyVersionObserver> observer;
+};
+
+
+TEST_F(TopologyVersionObserverTest, PopulateCache) {
+ auto cachedResponse = getObserverCache();
+ ASSERT(cachedResponse);
+
+ auto opCtx = makeOperationContext();
+ auto expectedResponse =
+ replCoord->awaitIsMasterResponse(opCtx.get(), {}, boost::none, boost::none);
+ ASSERT_EQ(cachedResponse->toBSON().toString(), expectedResponse->toBSON().toString());
+}
+
+TEST_F(TopologyVersionObserverTest, UpdateCache) {
+ auto cachedResponse = getObserverCache();
+ ASSERT(cachedResponse);
+
+ // Force an election to advance topology version
+ auto opCtx = makeOperationContext();
+ auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
+ simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTimeoutWhen, opCtx.get());
+
+ // Wait for the observer to update its cache
+ while (observer->getCached()->getTopologyVersion()->getCounter() ==
+ cachedResponse->getTopologyVersion()->getCounter()) {
+ sleepFor(sleepTime);
+ }
+
+ auto newResponse = observer->getCached();
+ ASSERT(newResponse && newResponse->getTopologyVersion());
+ ASSERT(newResponse->getTopologyVersion()->getCounter() >
+ cachedResponse->getTopologyVersion()->getCounter());
+
+ auto expectedResponse =
+ replCoord->awaitIsMasterResponse(opCtx.get(), {}, boost::none, boost::none);
+ ASSERT(expectedResponse && expectedResponse->getTopologyVersion());
+
+ ASSERT_EQ(newResponse->getTopologyVersion()->getCounter(),
+ expectedResponse->getTopologyVersion()->getCounter());
+}
+
+TEST_F(TopologyVersionObserverTest, HandleDBException) {
+ auto cachedResponse = getObserverCache();
+ ASSERT(cachedResponse);
+
+ // Kill the operation waiting on the `isMaster` future to make it throw
+ auto cur = ServiceContext::LockedClientsCursor(getGlobalServiceContext());
+ while (auto client = cur.next()) {
+ if (client->desc() == kTopologyVersionObserverName) {
+ client->lock();
+ ASSERT(client->getOperationContext());
+ client->getOperationContext()->markKilled(ErrorCodes::ShutdownInProgress);
+ client->unlock();
+ }
+ }
+
+ // Observer thread must handle the exception and fetch the most recent IMR
+ auto newResponse = getObserverCache();
+ ASSERT(newResponse->getTopologyVersion()->getCounter() ==
+ cachedResponse->getTopologyVersion()->getCounter());
+}
+
+} // namespace
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/embedded/replication_coordinator_embedded.cpp b/src/mongo/embedded/replication_coordinator_embedded.cpp
index 6dcdf347463..61add653db5 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.cpp
+++ b/src/mongo/embedded/replication_coordinator_embedded.cpp
@@ -506,6 +506,13 @@ std::shared_ptr<const repl::IsMasterResponse> ReplicationCoordinatorEmbedded::aw
UASSERT_NOT_IMPLEMENTED;
};
+SharedSemiFuture<std::shared_ptr<const IsMasterResponse>>
+ReplicationCoordinatorEmbedded::getIsMasterResponseFuture(
+ const SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const {
+ UASSERT_NOT_IMPLEMENTED;
+}
+
OpTime ReplicationCoordinatorEmbedded::getLatestWriteOpTime(OperationContext* opCtx) const {
return getMyLastAppliedOpTime();
}
diff --git a/src/mongo/embedded/replication_coordinator_embedded.h b/src/mongo/embedded/replication_coordinator_embedded.h
index 8790c2bb2a2..3b419911bb9 100644
--- a/src/mongo/embedded/replication_coordinator_embedded.h
+++ b/src/mongo/embedded/replication_coordinator_embedded.h
@@ -273,6 +273,10 @@ public:
boost::optional<TopologyVersion> previous,
boost::optional<Date_t> deadline) const override;
+ virtual SharedSemiFuture<std::shared_ptr<const repl::IsMasterResponse>>
+ getIsMasterResponseFuture(const repl::SplitHorizon::Parameters& horizonParams,
+ boost::optional<TopologyVersion> clientTopologyVersion) const;
+
repl::OpTime getLatestWriteOpTime(OperationContext* opCtx) const override;
HostAndPort getCurrentPrimaryHostAndPort() const override;