summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLamont Nelson <lamont.nelson@mongodb.com>2020-02-21 17:12:29 -0500
committerLamont Nelson <lamont.nelson@mongodb.com>2020-02-24 13:35:49 -0500
commit50da4a76ed2f3abbdf9d8d40ad57c5fb5169b5fb (patch)
treee5337b799d2a110faed4629a77954b7a753a0245
parent5c833792316fe542445683b865a66a64eb793a4a (diff)
downloadmongo-50da4a76ed2f3abbdf9d8d40ad57c5fb5169b5fb.tar.gz
SERVER-43332: new RSM implementation based on sdam
-rw-r--r--jstests/noPassthrough/replica_set_connection_error_codes.js17
-rw-r--r--src/mongo/client/SConscript4
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp305
-rw-r--r--src/mongo/client/replica_set_monitor_manager.h4
-rw-r--r--src/mongo/client/sdam/SConscript14
-rw-r--r--src/mongo/client/sdam/sdam.h37
-rw-r--r--src/mongo/client/sdam/sdam_configuration.cpp95
-rw-r--r--src/mongo/client/sdam/sdam_configuration.h100
-rw-r--r--src/mongo/client/sdam/sdam_datatypes.h6
-rw-r--r--src/mongo/client/sdam/server_description.cpp27
-rw-r--r--src/mongo/client/sdam/server_description.h7
-rw-r--r--src/mongo/client/sdam/server_selector.cpp267
-rw-r--r--src/mongo/client/sdam/server_selector.h193
-rw-r--r--src/mongo/client/sdam/server_selector_test.cpp463
-rw-r--r--src/mongo/client/sdam/topology_description.cpp65
-rw-r--r--src/mongo/client/sdam/topology_description.h72
-rw-r--r--src/mongo/client/sdam/topology_description_test.cpp90
-rw-r--r--src/mongo/client/sdam/topology_listener.cpp163
-rw-r--r--src/mongo/client/sdam/topology_listener.h77
-rw-r--r--src/mongo/client/sdam/topology_manager.cpp68
-rw-r--r--src/mongo/client/sdam/topology_manager.h30
-rw-r--r--src/mongo/client/sdam/topology_state_machine.h1
-rw-r--r--src/mongo/client/server_is_master_monitor.cpp375
-rw-r--r--src/mongo/client/server_is_master_monitor.h135
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp553
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h163
-rw-r--r--src/mongo/client/streamable_replica_set_monitor_query_processor.cpp67
-rw-r--r--src/mongo/client/streamable_replica_set_monitor_query_processor.h49
28 files changed, 3060 insertions, 387 deletions
diff --git a/jstests/noPassthrough/replica_set_connection_error_codes.js b/jstests/noPassthrough/replica_set_connection_error_codes.js
index d431415ee6d..835d712a45e 100644
--- a/jstests/noPassthrough/replica_set_connection_error_codes.js
+++ b/jstests/noPassthrough/replica_set_connection_error_codes.js
@@ -56,14 +56,15 @@ const awaitShell = stepDownPrimary(rst);
rst.getPrimary();
rst.awaitNodesAgreeOnPrimary();
-// DBClientRS will continue to send command requests to the node it believed to be primary even
-// after it stepped down so long as it hasn't closed its connection.
-assert.commandFailedWithCode(rsConn.getDB("test").runCommand({create: "mycoll"}),
- ErrorCodes.NotMaster);
-
-// However, once the server responds back with a ErrorCodes.NotMaster error, DBClientRS will
-// cause the ReplicaSetMonitor to attempt to discover the current primary.
-assert.commandWorked(rsConn.getDB("test").runCommand({create: "mycoll"}));
+// DBClientRS should discover the current primary eventually and get NotMaster errors in the
+// meantime.
+assert.soon(() => {
+ const res = rsConn.getDB("test").runCommand({create: "mycoll"});
+ if (!res.ok) {
+ assert(res.code == ErrorCodes.NotMaster);
+ }
+ return res.ok;
+});
try {
assert.commandWorked(directConn.adminCommand({configureFailPoint: failpoint, mode: "off"}));
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index b06b7d6f20d..dd7c410279c 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -199,6 +199,9 @@ clientDriverEnv.Library(
'replica_set_monitor_manager.cpp',
env.Idlc('replica_set_monitor_params.idl')[0],
'server_ping_monitor.cpp',
+ 'server_is_master_monitor.cpp',
+ 'streamable_replica_set_monitor.cpp',
+ 'streamable_replica_set_monitor_query_processor.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/write_concern_options',
@@ -210,6 +213,7 @@ clientDriverEnv.Library(
'$BUILD_DIR/mongo/util/background_job',
'$BUILD_DIR/mongo/util/md5',
'$BUILD_DIR/mongo/util/net/network',
+ '$BUILD_DIR/mongo/client/sdam/sdam',
'clientdriver_minimal',
'read_preference',
],
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index 1e81bbbb688..193a799806d 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -38,8 +38,8 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/mongo_uri.h"
-#include "mongo/client/replica_set_monitor_params_gen.h"
#include "mongo/client/scanning_replica_set_monitor.h"
+#include "mongo/client/replica_set_monitor.h"
#include "mongo/client/streamable_replica_set_monitor.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface_factory.h"
@@ -47,201 +47,196 @@
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
-#include "mongo/logv2/log.h"
#include "mongo/platform/mutex.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/util/map_util.h"
+#include "mongo/client/replica_set_monitor_params_gen.h"
-namespace mongo {
+ namespace mongo {
-using std::set;
-using std::shared_ptr;
-using std::string;
-using std::vector;
+ using std::set;
+ using std::shared_ptr;
+ using std::string;
+ using std::vector;
-using executor::NetworkInterface;
-using executor::NetworkInterfaceThreadPool;
-using executor::TaskExecutor;
-using executor::TaskExecutorPool;
-using executor::ThreadPoolTaskExecutor;
+ using executor::NetworkInterface;
+ using executor::NetworkInterfaceThreadPool;
+ using executor::TaskExecutor;
+ using executor::TaskExecutorPool;
+ using executor::ThreadPoolTaskExecutor;
-namespace {
-const auto getGlobalRSMMonitorManager =
- ServiceContext::declareDecoration<ReplicaSetMonitorManager>();
-} // namespace
+ namespace {
+ const auto getGlobalRSMMonitorManager =
+ ServiceContext::declareDecoration<ReplicaSetMonitorManager>();
+ } // namespace
-ReplicaSetMonitorManager::~ReplicaSetMonitorManager() {
- shutdown();
-}
+ ReplicaSetMonitorManager::~ReplicaSetMonitorManager() {
+ shutdown();
+ }
-ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() {
- return &getGlobalRSMMonitorManager(getGlobalServiceContext());
-}
+ ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() {
+ return &getGlobalRSMMonitorManager(getGlobalServiceContext());
+ }
-shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData setName) {
- stdx::lock_guard<Latch> lk(_mutex);
+ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData setName) {
+ stdx::lock_guard<Latch> lk(_mutex);
- if (auto monitor = _monitors[setName].lock()) {
- return monitor;
- } else {
- return shared_ptr<ReplicaSetMonitor>();
+ if (auto monitor = _monitors[setName].lock()) {
+ return monitor;
+ } else {
+ return shared_ptr<ReplicaSetMonitor>();
+ }
}
-}
-void ReplicaSetMonitorManager::_setupTaskExecutorInLock() {
- if (_isShutdown || _taskExecutor) {
- // do not restart taskExecutor if is in shutdown
- return;
- }
+ void ReplicaSetMonitorManager::_setupTaskExecutorInLock() {
+ if (_isShutdown || _taskExecutor) {
+ // do not restart taskExecutor if is in shutdown
+ return;
+ }
- // construct task executor
- auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
- auto net = executor::makeNetworkInterface(
- "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList));
- auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
- _taskExecutor = std::make_unique<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
- _taskExecutor->startup();
-}
-
-namespace {
-void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) {
- uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b);
-}
-} // namespace
-
-shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
- const ConnectionString& connStr) {
- return getOrCreateMonitor(MongoURI(connStr));
-}
-
-shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const MongoURI& uri) {
- invariant(uri.type() == ConnectionString::SET);
-
- stdx::lock_guard<Latch> lk(_mutex);
- uassert(ErrorCodes::ShutdownInProgress,
- str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown",
- !_isShutdown);
-
- _setupTaskExecutorInLock();
- const auto& setName = uri.getSetName();
- auto monitor = _monitors[setName].lock();
- if (monitor) {
- uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode());
- return monitor;
+ // construct task executor
+ auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
+ auto net = executor::makeNetworkInterface(
+ "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList));
+ auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
+ _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _taskExecutor->startup();
}
- LOGV2(20186, "Starting new replica set monitor for {uri}", "uri"_attr = uri.toString());
+ namespace {
+ void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) {
+ uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b);
+ }
+ } // namespace
- if (disableStreamableReplicaSetMonitor.load()) {
- auto newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri);
- _monitors[setName] = newMonitor;
- newMonitor->init();
- return newMonitor;
- } else {
- uasserted(31451, "StreamableReplicaSetMonitor is not yet implemented");
+ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
+ const ConnectionString& connStr) {
+ return getOrCreateMonitor(MongoURI(connStr));
}
-}
-vector<string> ReplicaSetMonitorManager::getAllSetNames() {
- vector<string> allNames;
+ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
+ const MongoURI& uri) {
+ invariant(uri.type() == ConnectionString::SET);
+ stdx::lock_guard<Latch> lk(_mutex);
+ uassert(ErrorCodes::ShutdownInProgress,
+ str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown",
+ !_isShutdown);
+
+ _setupTaskExecutorInLock();
+ const auto& setName = uri.getSetName();
+ auto monitor = _monitors[setName].lock();
+ if (monitor) {
+ uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode());
+ return monitor;
+ }
- stdx::lock_guard<Latch> lk(_mutex);
+ log() << "Starting new replica set monitor for " << uri.toString();
- for (const auto& entry : _monitors) {
- allNames.push_back(entry.first);
+ std::shared_ptr<ReplicaSetMonitor> newMonitor;
+ if (disableStreamableReplicaSetMonitor.load()) {
+ newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri);
+ newMonitor->init();
+ } else {
+ newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor());
+ }
+ _monitors[setName] = newMonitor;
+ return newMonitor;
}
- return allNames;
-}
+ vector<string> ReplicaSetMonitorManager::getAllSetNames() {
+ vector<string> allNames;
+
+ stdx::lock_guard<Latch> lk(_mutex);
-void ReplicaSetMonitorManager::removeMonitor(StringData setName) {
- stdx::lock_guard<Latch> lk(_mutex);
- ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName);
- if (it != _monitors.end()) {
- if (auto monitor = it->second.lock()) {
- monitor->drop();
+ for (const auto& entry : _monitors) {
+ allNames.push_back(entry.first);
}
- _monitors.erase(it);
- LOGV2(
- 20187, "Removed ReplicaSetMonitor for replica set {setName}", "setName"_attr = setName);
+
+ return allNames;
}
-}
-void ReplicaSetMonitorManager::shutdown() {
- decltype(_monitors) monitors;
- decltype(_taskExecutor) taskExecutor;
- {
+ void ReplicaSetMonitorManager::removeMonitor(StringData setName) {
stdx::lock_guard<Latch> lk(_mutex);
- if (std::exchange(_isShutdown, true)) {
- return;
+ ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName);
+ if (it != _monitors.end()) {
+ if (auto monitor = it->second.lock()) {
+ monitor->drop();
+ }
+ _monitors.erase(it);
+ log() << "Removed ReplicaSetMonitor for replica set " << setName;
}
-
- monitors = std::exchange(_monitors, {});
- taskExecutor = std::exchange(_taskExecutor, {});
}
- if (taskExecutor) {
- LOGV2_DEBUG(20188, 1, "Shutting down task executor used for monitoring replica sets");
- taskExecutor->shutdown();
- }
+ void ReplicaSetMonitorManager::shutdown() {
+ decltype(_monitors) monitors;
+ decltype(_taskExecutor) taskExecutor;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (std::exchange(_isShutdown, true)) {
+ return;
+ }
+
+ monitors = std::exchange(_monitors, {});
+ taskExecutor = std::exchange(_taskExecutor, {});
+ }
- if (monitors.size()) {
- LOGV2(20189, "Dropping all ongoing scans against replica sets");
- }
- for (auto& [name, monitor] : monitors) {
- auto anchor = monitor.lock();
- if (!anchor) {
- continue;
+ if (taskExecutor) {
+ LOG(1) << "Shutting down task executor used for monitoring replica sets";
+ taskExecutor->shutdown();
}
- anchor->drop();
- }
+ for (auto& [name, monitor] : monitors) {
+ auto anchor = monitor.lock();
+ if (!anchor) {
+ continue;
+ }
+ anchor->drop();
+ }
- if (taskExecutor) {
- taskExecutor->join();
+ if (taskExecutor) {
+ taskExecutor->join();
+ }
}
-}
-void ReplicaSetMonitorManager::removeAllMonitors() {
- shutdown();
+ void ReplicaSetMonitorManager::removeAllMonitors() {
+ shutdown();
- {
- stdx::lock_guard<Latch> lk(_mutex);
- _isShutdown = false;
- }
-}
-
-void ReplicaSetMonitorManager::report(BSONObjBuilder* builder, bool forFTDC) {
- // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the
- // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and
- // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook potentially
- // calling ShardRegistry::updateConfigServerConnectionString.
- auto setNames = getAllSetNames();
-
- BSONObjBuilder setStats(
- builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets"));
-
- for (const auto& setName : setNames) {
- auto monitor = getMonitor(setName);
- if (!monitor) {
- continue;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _isShutdown = false;
}
- monitor->appendInfo(setStats, forFTDC);
}
-}
-TaskExecutor* ReplicaSetMonitorManager::getExecutor() {
- invariant(_taskExecutor);
- return _taskExecutor.get();
-}
+ void ReplicaSetMonitorManager::report(BSONObjBuilder * builder, bool forFTDC) {
+ // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the
+ // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and
+ // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook
+ // potentially calling ShardRegistry::updateConfigServerConnectionString.
+ auto setNames = getAllSetNames();
+
+ BSONObjBuilder setStats(
+ builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets"));
+
+ for (const auto& setName : setNames) {
+ auto monitor = getMonitor(setName);
+ if (!monitor) {
+ continue;
+ }
+ monitor->appendInfo(setStats, forFTDC);
+ }
+ }
-ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() {
- return _notifier;
-}
+ std::shared_ptr<executor::TaskExecutor> ReplicaSetMonitorManager::getExecutor() {
+ invariant(_taskExecutor);
+ return _taskExecutor;
+ }
-bool ReplicaSetMonitorManager::isShutdown() const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _isShutdown;
-}
+ ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() {
+ return _notifier;
+ }
+ bool ReplicaSetMonitorManager::isShutdown() const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _isShutdown;
+ }
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h
index 10f8d5b4d19..7a19071014f 100644
--- a/src/mongo/client/replica_set_monitor_manager.h
+++ b/src/mongo/client/replica_set_monitor_manager.h
@@ -97,7 +97,7 @@ public:
/**
* Returns an executor for running RSM tasks.
*/
- executor::TaskExecutor* getExecutor();
+ std::shared_ptr<executor::TaskExecutor> getExecutor();
ReplicaSetChangeNotifier& getNotifier();
@@ -111,7 +111,7 @@ private:
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "ReplicaSetMonitorManager::_mutex");
// Executor for monitoring replica sets.
- std::unique_ptr<executor::TaskExecutor> _taskExecutor;
+ std::shared_ptr<executor::TaskExecutor> _taskExecutor;
// Widget to notify listeners when a RSM notices a change
ReplicaSetChangeNotifier _notifier;
diff --git a/src/mongo/client/sdam/SConscript b/src/mongo/client/sdam/SConscript
index 0a10d22332f..6cc5c642ad6 100644
--- a/src/mongo/client/sdam/SConscript
+++ b/src/mongo/client/sdam/SConscript
@@ -7,16 +7,21 @@ env = env.Clone()
env.Library(
target='sdam',
source=[
+ 'sdam_configuration.cpp',
'sdam_datatypes.cpp',
'server_description.cpp',
'topology_description.cpp',
+ 'topology_listener.cpp',
'topology_state_machine.cpp',
'topology_manager.cpp',
+ 'server_selector.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/repl/optime',
'$BUILD_DIR/mongo/util/clock_sources',
+ '$BUILD_DIR/mongo/client/read_preference',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/db/wire_version',
'$BUILD_DIR/mongo/rpc/metadata',
],
@@ -68,6 +73,15 @@ env.CppUnitTest(
)
env.CppUnitTest(
+ target='server_selector_test',
+ source=['server_selector_test.cpp'],
+ LIBDEPS=[
+ 'sdam',
+ 'sdam_test',
+ ],
+)
+
+env.CppUnitTest(
target='topology_state_machine_test',
source=['topology_state_machine_test.cpp'],
LIBDEPS=['sdam', 'sdam_test'],
diff --git a/src/mongo/client/sdam/sdam.h b/src/mongo/client/sdam/sdam.h
new file mode 100644
index 00000000000..16645e2418a
--- /dev/null
+++ b/src/mongo/client/sdam/sdam.h
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/client/sdam/sdam.h"
+#include "mongo/client/sdam/sdam_datatypes.h"
+#include "mongo/client/sdam/server_description.h"
+#include "mongo/client/sdam/server_selector.h"
+#include "mongo/client/sdam/topology_description.h"
+#include "mongo/client/sdam/topology_listener.h"
+#include "mongo/client/sdam/topology_manager.h"
diff --git a/src/mongo/client/sdam/sdam_configuration.cpp b/src/mongo/client/sdam/sdam_configuration.cpp
new file mode 100644
index 00000000000..d9852ddae94
--- /dev/null
+++ b/src/mongo/client/sdam/sdam_configuration.cpp
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "sdam_configuration.h"
+
+namespace mongo::sdam {
+SdamConfiguration::SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList,
+ TopologyType initialType,
+ mongo::Milliseconds heartBeatFrequencyMs,
+ boost::optional<std::string> setName)
+ : _seedList(seedList),
+ _initialType(initialType),
+ _heartBeatFrequencyMs(heartBeatFrequencyMs),
+ _setName(setName) {
+ uassert(ErrorCodes::InvalidSeedList,
+ "seed list size must be >= 1",
+ !seedList || (*seedList).size() >= 1);
+
+ uassert(ErrorCodes::InvalidSeedList,
+ "TopologyType Single must have exactly one entry in the seed list.",
+ _initialType != TopologyType::kSingle || (*seedList).size() == 1);
+
+ uassert(
+ ErrorCodes::InvalidTopologyType,
+ "Only ToplogyTypes ReplicaSetNoPrimary and Single are allowed when a setName is provided.",
+ !_setName ||
+ (_initialType == TopologyType::kReplicaSetNoPrimary ||
+ _initialType == TopologyType::kSingle));
+
+ uassert(ErrorCodes::TopologySetNameRequired,
+ "setName is required for ReplicaSetNoPrimary",
+ _initialType != TopologyType::kReplicaSetNoPrimary || _setName);
+
+ uassert(ErrorCodes::InvalidHeartBeatFrequency,
+ "topology heartbeat must be >= 500ms",
+ _heartBeatFrequencyMs >= kMinHeartbeatFrequencyMS);
+}
+
+const boost::optional<std::vector<ServerAddress>>& SdamConfiguration::getSeedList() const {
+ return _seedList;
+}
+
+TopologyType SdamConfiguration::getInitialType() const {
+ return _initialType;
+}
+
+Milliseconds SdamConfiguration::getHeartBeatFrequency() const {
+ return _heartBeatFrequencyMs;
+}
+
+const boost::optional<std::string>& SdamConfiguration::getSetName() const {
+ return _setName;
+}
+
+
+ServerSelectionConfiguration::ServerSelectionConfiguration(
+ const Milliseconds localThresholdMs, const Milliseconds serverSelectionTimeoutMs)
+ : _localThresholdMs(localThresholdMs), _serverSelectionTimeoutMs(serverSelectionTimeoutMs) {}
+
+Milliseconds ServerSelectionConfiguration::getLocalThresholdMs() const {
+ return _localThresholdMs;
+}
+
+Milliseconds ServerSelectionConfiguration::getServerSelectionTimeoutMs() const {
+ return _serverSelectionTimeoutMs;
+}
+Milliseconds ServerSelectionConfiguration::getHeartBeatFrequencyMs() const {
+ return _heartBeatFrequencyMs;
+}
+}; // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/sdam_configuration.h b/src/mongo/client/sdam/sdam_configuration.h
new file mode 100644
index 00000000000..895e52f4d87
--- /dev/null
+++ b/src/mongo/client/sdam/sdam_configuration.h
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/client/sdam/sdam_datatypes.h"
+
+namespace mongo::sdam {
+class SdamConfiguration {
+public:
+ SdamConfiguration() : SdamConfiguration(boost::none){};
+
+ /**
+ * Initialize the TopologyDescription. This constructor may uassert if the provided
+ * configuration options are not valid according to the Server Discovery & Monitoring Spec.
+ *
+ * Initial Servers
+ * initial servers may be set to a seed list of one or more server addresses.
+ *
+ * Initial TopologyType
+ * The initial TopologyType may be set to Single, Unknown, or ReplicaSetNoPrimary.
+ *
+ * Initial setName
+ * The client's initial replica set name is required in order to initially configure the
+ * topology type as ReplicaSetNoPrimary.
+ *
+ * Allowed configuration combinations
+ * TopologyType Single cannot be used with multiple seeds.
+ * If setName is not null, only TopologyType ReplicaSetNoPrimary and Single, are
+ * allowed.
+ */
+ explicit SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList,
+ TopologyType initialType = TopologyType::kUnknown,
+ Milliseconds heartBeatFrequencyMs = kDefaultHeartbeatFrequencyMs,
+ boost::optional<std::string> setName = boost::none);
+
+ const boost::optional<std::vector<ServerAddress>>& getSeedList() const;
+ TopologyType getInitialType() const;
+ Milliseconds getHeartBeatFrequency() const;
+ const boost::optional<std::string>& getSetName() const;
+
+ static constexpr Milliseconds kDefaultHeartbeatFrequencyMs = Seconds(10);
+ static constexpr Milliseconds kMinHeartbeatFrequencyMS = Milliseconds(500);
+ static constexpr Milliseconds kDefaultConnectTimeoutMS = Milliseconds(100);
+
+private:
+ boost::optional<std::vector<ServerAddress>> _seedList;
+ TopologyType _initialType;
+ Milliseconds _heartBeatFrequencyMs;
+ boost::optional<std::string> _setName;
+};
+
+class ServerSelectionConfiguration {
+public:
+ explicit ServerSelectionConfiguration(const Milliseconds localThresholdMs,
+ const Milliseconds serverSelectionTimeoutMs);
+
+ Milliseconds getLocalThresholdMs() const;
+ Milliseconds getServerSelectionTimeoutMs() const;
+ Milliseconds getHeartBeatFrequencyMs() const;
+
+ static constexpr Milliseconds kDefaultLocalThresholdMS = Milliseconds(15);
+ static constexpr Milliseconds kDefaultServerSelectionTimeoutMs = Milliseconds(30000);
+
+ static ServerSelectionConfiguration defaultConfiguration() {
+ return ServerSelectionConfiguration{kDefaultLocalThresholdMS,
+ kDefaultServerSelectionTimeoutMs};
+ }
+
+private:
+ Milliseconds _localThresholdMs = kDefaultLocalThresholdMS;
+ Milliseconds _serverSelectionTimeoutMs = kDefaultServerSelectionTimeoutMs;
+ Milliseconds _heartBeatFrequencyMs = SdamConfiguration::kDefaultHeartbeatFrequencyMs;
+};
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h
index d3f7e7f3b50..520d517da87 100644
--- a/src/mongo/client/sdam/sdam_datatypes.h
+++ b/src/mongo/client/sdam/sdam_datatypes.h
@@ -126,4 +126,10 @@ using ServerDescriptionPtr = std::shared_ptr<ServerDescription>;
class TopologyDescription;
using TopologyDescriptionPtr = std::shared_ptr<TopologyDescription>;
+
+class TopologyManager;
+using TopologyManagerPtr = std::unique_ptr<TopologyManager>;
+
+class TopologyListener;
+using TopologyListenerPtr = std::shared_ptr<TopologyListener>;
}; // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/server_description.cpp b/src/mongo/client/sdam/server_description.cpp
index 1674840a5a6..42dd37d0cdc 100644
--- a/src/mongo/client/sdam/server_description.cpp
+++ b/src/mongo/client/sdam/server_description.cpp
@@ -26,9 +26,9 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/client/sdam/server_description.h"
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include <algorithm>
#include <boost/algorithm/string.hpp>
@@ -137,6 +137,14 @@ void ServerDescription::saveTags(BSONObj tagsObj) {
}
}
+void ServerDescription::appendBsonTags(BSONObjBuilder& builder) const {
+ for (const auto& pair : _tags) {
+ const auto& key = pair.first;
+ const auto& value = pair.second;
+ builder.append(key, value);
+ }
+}
+
void ServerDescription::saveElectionId(BSONElement electionId) {
if (electionId.type() == jstOID) {
_electionId = electionId.OID();
@@ -399,6 +407,11 @@ BSONObj ServerDescription::toBson() const {
bson.append("arbiters", _arbiters);
bson.append("passives", _passives);
+ if (getTags().size()) {
+ BSONObjBuilder tagsBuilder(bson.subobjStart("tags"));
+ appendBsonTags(tagsBuilder);
+ }
+
return bson.obj();
}
@@ -414,6 +427,18 @@ std::string ServerDescription::toString() const {
return toBson().toString();
}
+ServerDescriptionPtr ServerDescription::cloneWithRTT(IsMasterRTT rtt) {
+ auto newServerDescription = std::make_shared<ServerDescription>(*this);
+ newServerDescription->_rtt = rtt;
+ return newServerDescription;
+}
+
+const boost::optional<TopologyDescriptionPtr> ServerDescription::getTopologyDescription() {
+ return (_topologyDescription)
+ ? boost::optional<TopologyDescriptionPtr>(_topologyDescription->lock())
+ : boost::none;
+}
+
bool operator==(const mongo::sdam::ServerDescription& a, const mongo::sdam::ServerDescription& b) {
return a.isEquivalent(b);
diff --git a/src/mongo/client/sdam/server_description.h b/src/mongo/client/sdam/server_description.h
index 6fe02a5a9d8..7d95331fa0e 100644
--- a/src/mongo/client/sdam/server_description.h
+++ b/src/mongo/client/sdam/server_description.h
@@ -78,6 +78,7 @@ public:
const boost::optional<ServerAddress>& getMe() const;
const boost::optional<std::string>& getSetName() const;
const std::map<std::string, std::string>& getTags() const;
+ void appendBsonTags(BSONObjBuilder& builder) const;
// network attributes
const boost::optional<std::string>& getError() const;
@@ -104,9 +105,11 @@ public:
const boost::optional<int>& getSetVersion() const;
const boost::optional<OID>& getElectionId() const;
const boost::optional<TopologyVersion>& getTopologyVersion() const;
+ const boost::optional<TopologyDescriptionPtr> getTopologyDescription();
BSONObj toBson() const;
std::string toString() const;
+ ServerDescriptionPtr cloneWithRTT(IsMasterRTT rtt);
private:
/**
@@ -202,6 +205,10 @@ private:
// pool for server. Incremented on network error or timeout.
int _poolResetCounter = 0;
+ // The topology description of that we are a part of
+ boost::optional<std::weak_ptr<TopologyDescription>> _topologyDescription;
+
+ friend class TopologyDescription;
friend class ServerDescriptionBuilder;
};
diff --git a/src/mongo/client/sdam/server_selector.cpp b/src/mongo/client/sdam/server_selector.cpp
new file mode 100644
index 00000000000..31bacb1b43e
--- /dev/null
+++ b/src/mongo/client/sdam/server_selector.cpp
@@ -0,0 +1,267 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "server_selector.h"
+
+#include <algorithm>
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+#include "mongo/client/sdam/topology_description.h"
+#include "mongo/platform/random.h"
+#include "mongo/util/log.h"
+
+namespace mongo::sdam {
+ServerSelector::~ServerSelector() {}
+
+SdamServerSelector::SdamServerSelector(const ServerSelectionConfiguration& config)
+ : _config(config), _random(PseudoRandom(SecureRandom().nextInt64())) {}
+
+void SdamServerSelector::_getCandidateServers(std::vector<ServerDescriptionPtr>* result,
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria) {
+ // when querying the primary we don't need to consider tags
+ bool shouldTagFilter = true;
+
+ // TODO: check to see if we want to enforce minOpTime at all since
+ // it was effectively optional in the original implementation.
+ // TODO: the old version of the RSM does this, and many of
+ // the tests seem to rely on this behavior for correctness.
+ if (!criteria.minOpTime.isNull()) {
+ auto eligibleServers = topologyDescription->findServers([](const ServerDescriptionPtr& s) {
+ return (s->getType() == ServerType::kRSPrimary ||
+ s->getType() == ServerType::kRSSecondary);
+ });
+
+ auto beginIt = eligibleServers.begin();
+ auto endIt = eligibleServers.end();
+ auto maxIt = std::max_element(beginIt,
+ endIt,
+ [topologyDescription](const ServerDescriptionPtr& left,
+ const ServerDescriptionPtr& right) {
+ return left->getOpTime() < right->getOpTime();
+ });
+ if (maxIt != endIt) {
+ auto maxOpTime = (*maxIt)->getOpTime();
+ if (maxOpTime && maxOpTime < criteria.minOpTime) {
+ // ignore minOpTime
+ const_cast<ReadPreferenceSetting&>(criteria) = ReadPreferenceSetting(criteria.pref);
+ log() << "ignoring minOpTime for " << criteria.toString();
+ }
+ }
+ }
+
+ switch (criteria.pref) {
+ case ReadPreference::Nearest:
+ *result = topologyDescription->findServers(nearestFilter(criteria));
+ break;
+
+ case ReadPreference::SecondaryOnly:
+ *result = topologyDescription->findServers(secondaryFilter(criteria));
+ break;
+
+ case ReadPreference::PrimaryOnly: {
+ const auto primaryCriteria = ReadPreferenceSetting(criteria.pref);
+ *result = topologyDescription->findServers(primaryFilter(primaryCriteria));
+ shouldTagFilter = false;
+ break;
+ }
+
+ case ReadPreference::PrimaryPreferred: {
+ // ignore tags and max staleness for primary query
+ auto primaryCriteria = ReadPreferenceSetting(ReadPreference::PrimaryOnly);
+ _getCandidateServers(result, topologyDescription, primaryCriteria);
+ if (result->size()) {
+ shouldTagFilter = false;
+ break;
+ }
+
+ // keep tags and maxStaleness for secondary query
+ auto secondaryCriteria = criteria;
+ secondaryCriteria.pref = ReadPreference::SecondaryOnly;
+ _getCandidateServers(result, topologyDescription, secondaryCriteria);
+ break;
+ }
+
+ case ReadPreference::SecondaryPreferred: {
+ // keep tags and maxStaleness for secondary query
+ auto secondaryCriteria = criteria;
+ secondaryCriteria.pref = ReadPreference::SecondaryOnly;
+ _getCandidateServers(result, topologyDescription, secondaryCriteria);
+ if (result->size()) {
+ break;
+ }
+
+ // ignore tags and maxStaleness for primary query
+ shouldTagFilter = false;
+ auto primaryCriteria = ReadPreferenceSetting(ReadPreference::PrimaryOnly);
+ _getCandidateServers(result, topologyDescription, primaryCriteria);
+ break;
+ }
+
+ default:
+ MONGO_UNREACHABLE
+ }
+
+ if (shouldTagFilter) {
+ filterTags(result, criteria.tags);
+ }
+}
+
+boost::optional<std::vector<ServerDescriptionPtr>> SdamServerSelector::selectServers(
+ const TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) {
+
+ // If the topology wire version is invalid, raise an error
+ if (!topologyDescription->isWireVersionCompatible()) {
+ uasserted(ErrorCodes::IncompatibleServerVersion,
+ *topologyDescription->getWireVersionCompatibleError());
+ }
+
+ if (topologyDescription->getType() == TopologyType::kUnknown) {
+ return boost::none;
+ }
+
+ if (topologyDescription->getType() == TopologyType::kSingle) {
+ auto servers = topologyDescription->getServers();
+ return (servers.size() && servers[0]->getType() != ServerType::kUnknown)
+ ? boost::optional<std::vector<ServerDescriptionPtr>>{{servers[0]}}
+ : boost::none;
+ }
+
+ std::vector<ServerDescriptionPtr> results;
+ _getCandidateServers(&results, topologyDescription, criteria);
+
+ if (results.size()) {
+ ServerDescriptionPtr minServer =
+ *std::min_element(results.begin(), results.end(), LatencyWindow::rttCompareFn);
+
+ invariant(minServer->getRtt());
+ auto latencyWindow = LatencyWindow(*minServer->getRtt(), _config.getLocalThresholdMs());
+ latencyWindow.filterServers(&results);
+
+ // latency window should always leave at least one result
+ invariant(results.size());
+
+ return results;
+ }
+
+ return boost::none;
+}
+
+ServerDescriptionPtr SdamServerSelector::_randomSelect(
+ const std::vector<ServerDescriptionPtr>& servers) const {
+ return servers[_random.nextInt64(servers.size())];
+}
+
+boost::optional<ServerDescriptionPtr> SdamServerSelector::selectServer(
+ const TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) {
+ auto servers = selectServers(topologyDescription, criteria);
+ return servers ? boost::optional<ServerDescriptionPtr>(_randomSelect(*servers)) : boost::none;
+}
+
+bool SdamServerSelector::_containsAllTags(ServerDescriptionPtr server, const BSONObj& tags) {
+ auto serverTags = server->getTags();
+ for (auto& checkTag : tags) {
+ auto checkKey = checkTag.fieldName();
+ auto checkValue = checkTag.String();
+ auto pos = serverTags.find(checkKey);
+ if (pos == serverTags.end() || pos->second != checkValue) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void SdamServerSelector::filterTags(std::vector<ServerDescriptionPtr>* servers,
+ const TagSet& tagSet) {
+ const auto& checkTags = tagSet.getTagBSON();
+
+ if (checkTags.nFields() == 0)
+ return;
+
+ const auto predicate = [&](const ServerDescriptionPtr& s) {
+ auto it = checkTags.begin();
+ while (it != checkTags.end()) {
+ if (it->isABSONObj()) {
+ const BSONObj& tags = it->Obj();
+ if (_containsAllTags(s, tags)) {
+ // found a match -- don't remove the server
+ return false;
+ }
+ } else {
+ log() << "invalid tags specified for server selection; tags should be specified as "
+ "a bson Obj: "
+ << it->toString();
+ }
+ ++it;
+ }
+
+ // remove the server
+ return true;
+ };
+
+ servers->erase(std::remove_if(servers->begin(), servers->end(), predicate), servers->end());
+}
+
+bool SdamServerSelector::recencyFilter(const ReadPreferenceSetting& readPref,
+ const ServerDescriptionPtr& s) {
+ bool result = true;
+
+ // TODO: check to see if we want to enforce minOpTime at all since
+ // it was effectively optional in the original implementation.
+ if (!readPref.minOpTime.isNull()) {
+ result = result && (s->getOpTime() >= readPref.minOpTime);
+ }
+
+ if (readPref.maxStalenessSeconds.count()) {
+ auto topologyDescription = s->getTopologyDescription();
+ invariant(topologyDescription);
+ auto staleness = _calculateStaleness(*topologyDescription, s);
+ result = result && (staleness <= readPref.maxStalenessSeconds);
+ }
+
+ return result;
+}
+
+
+void LatencyWindow::filterServers(std::vector<ServerDescriptionPtr>* servers) {
+ servers->erase(std::remove_if(servers->begin(),
+ servers->end(),
+ [&](const ServerDescriptionPtr& s) {
+ // Servers that have made it to this stage are not ServerType
+ // == kUnknown, so they must have an associated latency.
+ invariant(s->getType() != ServerType::kUnknown);
+ invariant(s->getRtt());
+ return !this->isWithinWindow(*s->getRtt());
+ }),
+ servers->end());
+}
+
+bool LatencyWindow::isWithinWindow(IsMasterRTT latency) {
+ return lower <= latency && latency <= upper;
+}
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/server_selector.h b/src/mongo/client/sdam/server_selector.h
new file mode 100644
index 00000000000..9cb676bd3b6
--- /dev/null
+++ b/src/mongo/client/sdam/server_selector.h
@@ -0,0 +1,193 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+#include <functional>
+#include <vector>
+
+#include "mongo/client/read_preference.h"
+#include "mongo/client/sdam/sdam_configuration.h"
+#include "mongo/client/sdam/sdam_datatypes.h"
+#include "mongo/client/sdam/server_description.h"
+#include "mongo/client/sdam/topology_description.h"
+#include "mongo/platform/random.h"
+
+namespace mongo::sdam {
+/**
+ * This is the interface that allows one to select a server to satisfy a DB operation given a
+ * TopologyDescription and a ReadPreferenceSetting.
+ */
+class ServerSelector {
+public:
+ /**
+ * Finds a list of candidate servers according to the ReadPreferenceSetting.
+ */
+ virtual boost::optional<std::vector<ServerDescriptionPtr>> selectServers(
+ TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) = 0;
+
+ /**
+ * Select a single server according to the ReadPreference and latency of the
+ * ServerDescription(s). The server is selected randomly from those that match the criteria.
+ */
+ virtual boost::optional<ServerDescriptionPtr> selectServer(
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria) = 0;
+
+ virtual ~ServerSelector();
+};
+using ServerSelectorPtr = std::unique_ptr<ServerSelector>;
+
+class SdamServerSelector : public ServerSelector {
+public:
+ explicit SdamServerSelector(const ServerSelectionConfiguration& config);
+
+ boost::optional<std::vector<ServerDescriptionPtr>> selectServers(
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria) override;
+
+ boost::optional<ServerDescriptionPtr> selectServer(
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria) override;
+
+ // remove servers that do not match the TagSet
+ void filterTags(std::vector<ServerDescriptionPtr>* servers, const TagSet& tagSet);
+
+private:
+ void _getCandidateServers(std::vector<ServerDescriptionPtr>* result,
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria);
+
+ bool _containsAllTags(ServerDescriptionPtr server, const BSONObj& tags);
+
+ ServerDescriptionPtr _randomSelect(const std::vector<ServerDescriptionPtr>& servers) const;
+
+ // staleness for a ServerDescription is defined here:
+ // https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#maxstalenessseconds
+ Milliseconds _calculateStaleness(const TopologyDescriptionPtr& topologyDescription,
+ const ServerDescriptionPtr& serverDescription) {
+ if (serverDescription->getType() != ServerType::kRSSecondary)
+ return Milliseconds(0);
+
+ const Date_t& lastWriteDate = serverDescription->getLastWriteDate()
+ ? *serverDescription->getLastWriteDate()
+ : Date_t::min();
+
+ if (topologyDescription->getType() == TopologyType::kReplicaSetWithPrimary) {
+ // (S.lastUpdateTime - S.lastWriteDate) - (P.lastUpdateTime - P.lastWriteDate) +
+ // heartbeatFrequencyMS
+
+ // topologyType == kReplicaSetWithPrimary implies the validity of the primary server
+ // description.
+ invariant(topologyDescription->getPrimary());
+ const auto& primaryDescription = *topologyDescription->getPrimary();
+
+ const auto& primaryLastWriteDate = primaryDescription->getLastWriteDate()
+ ? *primaryDescription->getLastWriteDate()
+ : Date_t::min();
+
+ auto result = (serverDescription->getLastUpdateTime() - lastWriteDate) -
+ (primaryDescription->getLastUpdateTime() - primaryLastWriteDate) +
+ _config.getHeartBeatFrequencyMs();
+ return duration_cast<Milliseconds>(result);
+ } else if (topologyDescription->getType() == TopologyType::kReplicaSetNoPrimary) {
+ // SMax.lastWriteDate - S.lastWriteDate + heartbeatFrequencyMS
+ Date_t maxLastWriteDate = Date_t::min();
+
+ // identify secondary with max last write date.
+ for (const auto& s : topologyDescription->getServers()) {
+ if (s->getType() != ServerType::kRSSecondary)
+ continue;
+
+ const auto& sLastWriteDate =
+ s->getLastWriteDate() ? *s->getLastWriteDate() : Date_t::min();
+
+ if (sLastWriteDate > maxLastWriteDate) {
+ maxLastWriteDate = sLastWriteDate;
+ }
+ }
+
+ auto result = (maxLastWriteDate - lastWriteDate) + _config.getHeartBeatFrequencyMs();
+ return duration_cast<Milliseconds>(result);
+ } else {
+ // Not a replica set
+ return Milliseconds(0);
+ }
+ }
+
+ bool recencyFilter(const ReadPreferenceSetting& readPref, const ServerDescriptionPtr& s);
+
+ // A SelectionFilter is a higher order function used to filter out servers from the current
+ // Topology. It's return value is used as input to the TopologyDescription::findServers
+ // function, and is a function that takes a ServerDescriptionPtr and returns a bool indicating
+ // whether to keep this server or not based on the ReadPreference, server type, and recency
+ // metrics of the server.
+ using SelectionFilter = unique_function<std::function<bool(const ServerDescriptionPtr&)>(
+ const ReadPreferenceSetting&)>;
+
+ const SelectionFilter secondaryFilter = [this](const ReadPreferenceSetting& readPref) {
+ return [&](const ServerDescriptionPtr& s) {
+ return (s->getType() == ServerType::kRSSecondary) && recencyFilter(readPref, s);
+ };
+ };
+
+ const SelectionFilter primaryFilter = [this](const ReadPreferenceSetting& readPref) {
+ return [&](const ServerDescriptionPtr& s) {
+ return (s->getType() == ServerType::kRSPrimary) && recencyFilter(readPref, s);
+ };
+ };
+
+ const SelectionFilter nearestFilter = [this](const ReadPreferenceSetting& readPref) {
+ return [&](const ServerDescriptionPtr& s) {
+ return (s->getType() == ServerType::kRSPrimary ||
+ s->getType() == ServerType::kRSSecondary) &&
+ recencyFilter(readPref, s);
+ };
+ };
+
+ ServerSelectionConfiguration _config;
+ mutable PseudoRandom _random;
+};
+
+// This is used to filter out servers based on their current latency measurements.
+struct LatencyWindow {
+ const IsMasterRTT lower;
+ const IsMasterRTT upper;
+
+ explicit LatencyWindow(const IsMasterRTT lowerBound, const IsMasterRTT windowWidth)
+ : lower(lowerBound), upper(lowerBound + windowWidth) {}
+
+ bool isWithinWindow(IsMasterRTT latency);
+
+ // remove servers not in the latency window in-place.
+ void filterServers(std::vector<ServerDescriptionPtr>* servers);
+
+ static bool rttCompareFn(const ServerDescriptionPtr& a, const ServerDescriptionPtr& b) {
+ return a->getRtt() < b->getRtt();
+ }
+};
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/server_selector_test.cpp b/src/mongo/client/sdam/server_selector_test.cpp
new file mode 100644
index 00000000000..233c00b32a5
--- /dev/null
+++ b/src/mongo/client/sdam/server_selector_test.cpp
@@ -0,0 +1,463 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/client/sdam/server_selector.h"
+
+#include <boost/optional/optional_io.hpp>
+
+#include "mongo/client/sdam/sdam_test_base.h"
+#include "mongo/client/sdam/server_description_builder.h"
+#include "mongo/client/sdam/topology_description.h"
+#include "mongo/client/sdam/topology_manager.h"
+#include "mongo/db/wire_version.h"
+#include "mongo/util/system_clock_source.h"
+
+namespace mongo::sdam {
+
+class ServerSelectorTestFixture : public SdamTestFixture {
+public:
+ static inline const auto clockSource = SystemClockSource::get();
+ static inline const auto sdamConfiguration = SdamConfiguration({{"s0"}});
+ static inline const auto selectionConfig =
+ ServerSelectionConfiguration(Milliseconds(10), Milliseconds(10));
+
+ static constexpr auto SET_NAME = "set";
+ static constexpr int NUM_ITERATIONS = 1000;
+
+ struct TagSets {
+ static inline const auto eastProduction = BSON("dc"
+ << "east"
+ << "usage"
+ << "production");
+ static inline const auto westProduction = BSON("dc"
+ << "west"
+ << "usage"
+ << "production");
+ static inline const auto northTest = BSON("dc"
+ << "north"
+ << "usage"
+ << "test");
+ static inline const auto northProduction = BSON("dc"
+ << "north"
+ << "usage"
+ << "production");
+ static inline const auto production = BSON("usage"
+ << "production");
+
+ static inline const auto test = BSON("usage"
+ << "test");
+
+ static inline const auto integration = BSON("usage"
+ << "integration");
+
+ static inline const auto primary = BSON("tag"
+ << "primary");
+ static inline const auto secondary = BSON("tag"
+ << "secondary");
+
+ static inline const auto emptySet = TagSet{BSONArray(BSONObj())};
+ static inline const auto eastOrWestProductionSet =
+ TagSet(BSON_ARRAY(eastProduction << westProduction));
+ static inline const auto westProductionSet = TagSet(BSON_ARRAY(westProduction));
+ static inline const auto productionSet = TagSet(BSON_ARRAY(production));
+ static inline const auto testSet = TagSet(BSON_ARRAY(test));
+ static inline const auto integrationOrTestSet = TagSet(BSON_ARRAY(integration << test));
+ static inline const auto integrationSet = TagSet(BSON_ARRAY(integration));
+
+ static inline const auto primarySet = TagSet(BSON_ARRAY(primary));
+ static inline const auto secondarySet = TagSet(BSON_ARRAY(secondary));
+ };
+
+ static ServerDescriptionPtr make_with_latency(IsMasterRTT latency,
+ ServerAddress address,
+ ServerType serverType = ServerType::kRSPrimary,
+ std::map<std::string, std::string> tags = {}) {
+ auto builder = ServerDescriptionBuilder()
+ .withType(serverType)
+ .withAddress(address)
+ .withSetName(SET_NAME)
+ .withRtt(latency)
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastUpdateTime(Date_t::now());
+
+ for (auto it = tags.begin(); it != tags.end(); ++it) {
+ builder.withTag(it->first, it->second);
+ }
+
+ return builder.instance();
+ }
+
+ static auto makeServerDescriptionList() {
+ return std::vector<ServerDescriptionPtr>{
+ make_with_latency(Milliseconds(1),
+ "s1",
+ ServerType::kRSSecondary,
+ {{"dc", "east"}, {"usage", "production"}}),
+ make_with_latency(Milliseconds(1),
+ "s1-test",
+ ServerType::kRSSecondary,
+ {{"dc", "east"}, {"usage", "test"}}),
+ make_with_latency(Milliseconds(1),
+ "s2",
+ ServerType::kRSSecondary,
+ {{"dc", "west"}, {"usage", "production"}}),
+ make_with_latency(Milliseconds(1),
+ "s2-test",
+ ServerType::kRSSecondary,
+ {{"dc", "west"}, {"usage", "test"}}),
+ make_with_latency(Milliseconds(1),
+ "s3",
+ ServerType::kRSSecondary,
+ {{"dc", "north"}, {"usage", "production"}})};
+ };
+
+ SdamServerSelector selector = SdamServerSelector(selectionConfig);
+};
+
+TEST_F(ServerSelectorTestFixture, ShouldFilterCorrectlyByLatencyWindow) {
+ const auto delta = Milliseconds(10);
+ const auto windowWidth = Milliseconds(100);
+ const auto lowerBound = Milliseconds(100);
+
+ auto window = LatencyWindow(lowerBound, windowWidth);
+
+ std::vector<ServerDescriptionPtr> servers = {
+ make_with_latency(window.lower - delta, "less"),
+ make_with_latency(window.lower, "boundary-lower"),
+ make_with_latency(window.lower + delta, "within"),
+ make_with_latency(window.upper, "boundary-upper"),
+ make_with_latency(window.upper + delta, "greater")};
+
+ window.filterServers(&servers);
+
+ ASSERT_EQ(3, servers.size());
+ ASSERT_EQ("boundary-lower", servers[0]->getAddress());
+ ASSERT_EQ("within", servers[1]->getAddress());
+ ASSERT_EQ("boundary-upper", servers[2]->getAddress());
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldThrowOnWireError) {
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+ auto oldServer = ServerDescriptionBuilder()
+ .withAddress(topologyDescription->getServers().back()->getAddress())
+ .withType(ServerType::kRSPrimary)
+ .withMaxWireVersion(WireVersion::RELEASE_2_4_AND_BEFORE)
+ .withMinWireVersion(WireVersion::RELEASE_2_4_AND_BEFORE)
+ .instance();
+ topologyDescription->installServerDescription(oldServer);
+
+ ASSERT(!topologyDescription->isWireVersionCompatible());
+ ASSERT_THROWS_CODE(selector.selectServers(topologyDescription, ReadPreferenceSetting()),
+ DBException,
+ ErrorCodes::IncompatibleServerVersion);
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldReturnNoneIfTopologyUnknown) {
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+ ASSERT_EQ(TopologyType::kUnknown, topologyDescription->getType());
+ ASSERT_EQ(boost::none, selector.selectServers(topologyDescription, ReadPreferenceSetting()));
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldSelectRandomlyWhenMultipleOptionsAreAvailable) {
+ TopologyStateMachine stateMachine(sdamConfiguration);
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+
+ const auto s0Latency = Milliseconds(1);
+ auto primary = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kRSPrimary)
+ .withRtt(s0Latency)
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withHost("s3")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, primary);
+
+ const auto s1Latency = Milliseconds((s0Latency + selectionConfig.getLocalThresholdMs()) / 2);
+ auto secondaryInLatencyWindow = make_with_latency(s1Latency, "s1", ServerType::kRSSecondary);
+ stateMachine.onServerDescription(*topologyDescription, secondaryInLatencyWindow);
+
+ // s2 is on the boundary of the latency window
+ const auto s2Latency = s0Latency + selectionConfig.getLocalThresholdMs();
+ auto secondaryOnBoundaryOfLatencyWindow =
+ make_with_latency(s2Latency, "s2", ServerType::kRSSecondary);
+ stateMachine.onServerDescription(*topologyDescription, secondaryOnBoundaryOfLatencyWindow);
+
+ // s3 should not be selected
+ const auto s3Latency = s2Latency + Milliseconds(10);
+ auto secondaryTooFar = make_with_latency(s3Latency, "s3", ServerType::kRSSecondary);
+ stateMachine.onServerDescription(*topologyDescription, secondaryTooFar);
+
+ std::map<ServerAddress, int> frequencyInfo{{"s0", 0}, {"s1", 0}, {"s2", 0}, {"s3", 0}};
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ auto server = selector.selectServer(topologyDescription,
+ ReadPreferenceSetting(ReadPreference::Nearest));
+ if (server) {
+ frequencyInfo[(*server)->getAddress()]++;
+ }
+ }
+
+ ASSERT(frequencyInfo["s0"]);
+ ASSERT(frequencyInfo["s1"]);
+ ASSERT(frequencyInfo["s2"]);
+ ASSERT_FALSE(frequencyInfo["s3"]);
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldFilterByLastWriteTime) {
+ TopologyStateMachine stateMachine(sdamConfiguration);
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+
+ const int MAX_STALENESS = 60;
+ const auto sixtySeconds = Seconds(MAX_STALENESS);
+ const auto now = Date_t::now();
+
+
+ const auto d0 = now - Milliseconds(1000);
+ const auto s0 = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kRSPrimary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s0);
+
+ const auto d1 = now - Milliseconds(1000 * 5);
+ const auto s1 = ServerDescriptionBuilder()
+ .withAddress("s1")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d1)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s1);
+
+ // d2 is stale, so s2 should not be selected.
+ const auto d2 = now - sixtySeconds - sixtySeconds;
+ const auto s2 = ServerDescriptionBuilder()
+ .withAddress("s2")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d2)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s2);
+
+ const auto readPref =
+ ReadPreferenceSetting(ReadPreference::Nearest, TagSets::emptySet, sixtySeconds);
+
+ std::map<ServerAddress, int> frequencyInfo{{"s0", 0}, {"s1", 0}, {"s2", 0}};
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ auto server = selector.selectServer(topologyDescription, readPref);
+
+ if (server) {
+ frequencyInfo[(*server)->getAddress()]++;
+ }
+ }
+
+ ASSERT(frequencyInfo["s0"]);
+ ASSERT(frequencyInfo["s1"]);
+ ASSERT_FALSE(frequencyInfo["s2"]);
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldSelectPreferredIfAvailable) {
+ TopologyStateMachine stateMachine(sdamConfiguration);
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+
+ const int MAX_STALENESS = 60;
+ const auto sixtySeconds = Seconds(MAX_STALENESS);
+ const auto now = Date_t::now();
+
+
+ const auto d0 = now - Milliseconds(1000);
+ const auto s0 = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kRSPrimary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .withTag("tag", "primary")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s0);
+
+ const auto s1 = ServerDescriptionBuilder()
+ .withAddress("s1")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .withTag("tag", "secondary")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s1);
+
+ const auto primaryPreferredTagSecondary =
+ ReadPreferenceSetting(ReadPreference::PrimaryPreferred, TagSets::secondarySet);
+ auto result1 = selector.selectServer(topologyDescription, primaryPreferredTagSecondary);
+ ASSERT(result1 != boost::none);
+ ASSERT_EQ("s0", (*result1)->getAddress());
+
+ const auto secondaryPreferredWithTag =
+ ReadPreferenceSetting(ReadPreference::SecondaryPreferred, TagSets::secondarySet);
+ auto result2 = selector.selectServer(topologyDescription, secondaryPreferredWithTag);
+ ASSERT(result2 != boost::none);
+ ASSERT_EQ("s1", (*result2)->getAddress());
+
+ const auto secondaryPreferredNoTag = ReadPreferenceSetting(ReadPreference::SecondaryPreferred);
+ auto result3 = selector.selectServer(topologyDescription, secondaryPreferredNoTag);
+ ASSERT(result3 != boost::none);
+ ASSERT_EQ("s1", (*result2)->getAddress());
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldSelectTaggedSecondaryIfPreferredPrimaryNotAvailable) {
+ TopologyStateMachine stateMachine(sdamConfiguration);
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+
+ const int MAX_STALENESS = 60;
+ const auto sixtySeconds = Seconds(MAX_STALENESS);
+ const auto now = Date_t::now();
+
+ const auto d0 = now - Milliseconds(1000);
+
+ const auto s0 = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kRSPrimary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .withTag("tag", "primary")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s0);
+
+ // old primary unavailable
+ const auto s0_failed = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kUnknown)
+ .withSetName("set")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s0_failed);
+
+ const auto s1 = ServerDescriptionBuilder()
+ .withAddress("s1")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .withTag("tag", "secondary")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s1);
+
+ const auto s2 = ServerDescriptionBuilder()
+ .withAddress("s2")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s2);
+
+ const auto primaryPreferredTagSecondary =
+ ReadPreferenceSetting(ReadPreference::PrimaryPreferred, TagSets::secondarySet);
+ auto result1 = selector.selectServer(topologyDescription, primaryPreferredTagSecondary);
+ ASSERT(result1 != boost::none);
+ ASSERT_EQ("s1", (*result1)->getAddress());
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldFilterByTags) {
+ auto tags = TagSets::productionSet;
+ auto servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(3, servers.size());
+
+ tags = TagSets::eastOrWestProductionSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(2, servers.size());
+
+ tags = TagSets::testSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(2, servers.size());
+
+ tags = TagSets::integrationOrTestSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(2, servers.size());
+
+ tags = TagSets::westProductionSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(1, servers.size());
+
+ tags = TagSets::integrationSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(0, servers.size());
+
+ tags = TagSets::emptySet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(makeServerDescriptionList().size(), servers.size());
+}
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_description.cpp b/src/mongo/client/sdam/topology_description.cpp
index d3783719864..f236f388e90 100644
--- a/src/mongo/client/sdam/topology_description.cpp
+++ b/src/mongo/client/sdam/topology_description.cpp
@@ -107,6 +107,9 @@ const boost::optional<ServerDescriptionPtr> TopologyDescription::findServerByAdd
boost::optional<ServerDescriptionPtr> TopologyDescription::installServerDescription(
const ServerDescriptionPtr& newServerDescription) {
+ LOG(2) << "(" << getSetName() << ") install ServerDescription "
+ << newServerDescription->toString();
+
boost::optional<ServerDescriptionPtr> previousDescription;
if (getType() == TopologyType::kSingle) {
// For Single, there is always one ServerDescription in TopologyDescription.servers;
@@ -131,6 +134,8 @@ boost::optional<ServerDescriptionPtr> TopologyDescription::installServerDescript
}
}
+ newServerDescription->_topologyDescription = shared_from_this();
+
checkWireCompatibilityVersions();
calculateLogicalSessionTimeout();
return previousDescription;
@@ -174,13 +179,12 @@ void TopologyDescription::checkWireCompatibilityVersions() {
break;
}
}
-
_compatibleError = (_compatible) ? boost::none : boost::make_optional(errorOss.str());
}
const std::string TopologyDescription::minimumRequiredMongoVersionString(int version) {
switch (version) {
- case RESUMABLE_INITIAL_SYNC:
+ case RESUMABLE_INITIAL_SYNC:
return "4.4";
case SHARDED_TRANSACTIONS:
return "4.2";
@@ -270,54 +274,15 @@ std::string TopologyDescription::toString() {
return toBSON().toString();
}
-////////////////////////
-// SdamConfiguration
-////////////////////////
-SdamConfiguration::SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList,
- TopologyType initialType,
- mongo::Milliseconds heartBeatFrequencyMs,
- boost::optional<std::string> setName)
- : _seedList(seedList),
- _initialType(initialType),
- _heartBeatFrequencyMs(heartBeatFrequencyMs),
- _setName(setName) {
- uassert(ErrorCodes::InvalidSeedList,
- "seed list size must be >= 1",
- !seedList || (*seedList).size() >= 1);
-
- uassert(ErrorCodes::InvalidSeedList,
- "TopologyType Single must have exactly one entry in the seed list.",
- _initialType != TopologyType::kSingle || (*seedList).size() == 1);
-
- uassert(
- ErrorCodes::InvalidTopologyType,
- "Only ToplogyTypes ReplicaSetNoPrimary and Single are allowed when a setName is provided.",
- !_setName ||
- (_initialType == TopologyType::kReplicaSetNoPrimary ||
- _initialType == TopologyType::kSingle));
-
- uassert(ErrorCodes::TopologySetNameRequired,
- "setName is required for ReplicaSetNoPrimary",
- _initialType != TopologyType::kReplicaSetNoPrimary || _setName);
-
- uassert(ErrorCodes::InvalidHeartBeatFrequency,
- "topology heartbeat must be >= 500ms",
- _heartBeatFrequencyMs >= kMinHeartbeatFrequencyMS);
-}
-const boost::optional<std::vector<ServerAddress>>& SdamConfiguration::getSeedList() const {
- return _seedList;
-}
-
-TopologyType SdamConfiguration::getInitialType() const {
- return _initialType;
-}
-
-Milliseconds SdamConfiguration::getHeartBeatFrequency() const {
- return _heartBeatFrequencyMs;
-}
+boost::optional<ServerDescriptionPtr> TopologyDescription::getPrimary() {
+ if (getType() != TopologyType::kReplicaSetWithPrimary) {
+ return boost::none;
+ }
-const boost::optional<std::string>& SdamConfiguration::getSetName() const {
- return _setName;
+ auto foundPrimaries = findServers(
+ [](const ServerDescriptionPtr& s) { return s->getType() == ServerType::kRSPrimary; });
+ invariant(foundPrimaries.size() == 1);
+ return foundPrimaries[0];
}
-}; // namespace mongo::sdam
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_description.h b/src/mongo/client/sdam/topology_description.h
index 5894ad0d20f..a0469eb118b 100644
--- a/src/mongo/client/sdam/topology_description.h
+++ b/src/mongo/client/sdam/topology_description.h
@@ -36,55 +36,13 @@
#include "mongo/bson/oid.h"
#include "mongo/client/read_preference.h"
+#include "mongo/client/sdam/sdam_configuration.h"
#include "mongo/client/sdam/sdam_datatypes.h"
#include "mongo/client/sdam/server_description.h"
#include "mongo/platform/basic.h"
namespace mongo::sdam {
-class SdamConfiguration {
-public:
- SdamConfiguration() : SdamConfiguration(boost::none){};
-
- /**
- * Initialize the TopologyDescription. This constructor may uassert if the provided
- * configuration options are not valid according to the Server Discovery & Monitoring Spec.
- *
- * Initial Servers
- * initial servers may be set to a seed list of one or more server addresses.
- *
- * Initial TopologyType
- * The initial TopologyType may be set to Single, Unknown, or ReplicaSetNoPrimary.
- *
- * Initial setName
- * The client's initial replica set name is required in order to initially configure the
- * topology type as ReplicaSetNoPrimary.
- *
- * Allowed configuration combinations
- * TopologyType Single cannot be used with multiple seeds.
- * If setName is not null, only TopologyType ReplicaSetNoPrimary and Single, are
- * allowed.
- */
- SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList,
- TopologyType initialType = TopologyType::kUnknown,
- mongo::Milliseconds heartBeatFrequencyMs = kDefaultHeartbeatFrequencyMs,
- boost::optional<std::string> setName = boost::none);
-
- const boost::optional<std::vector<ServerAddress>>& getSeedList() const;
- TopologyType getInitialType() const;
- Milliseconds getHeartBeatFrequency() const;
- const boost::optional<std::string>& getSetName() const;
-
- static inline const mongo::Milliseconds kDefaultHeartbeatFrequencyMs = mongo::Seconds(10);
- static inline const mongo::Milliseconds kMinHeartbeatFrequencyMS = mongo::Milliseconds(500);
-
-private:
- boost::optional<std::vector<ServerAddress>> _seedList;
- TopologyType _initialType;
- mongo::Milliseconds _heartBeatFrequencyMs;
- boost::optional<std::string> _setName;
-};
-
-class TopologyDescription {
+class TopologyDescription : public std::enable_shared_from_this<TopologyDescription> {
public:
TopologyDescription() : TopologyDescription(SdamConfiguration()) {}
TopologyDescription(const TopologyDescription& source) = default;
@@ -113,6 +71,7 @@ public:
bool containsServerAddress(const ServerAddress& address) const;
std::vector<ServerDescriptionPtr> findServers(
std::function<bool(const ServerDescriptionPtr&)> predicate) const;
+ boost::optional<ServerDescriptionPtr> getPrimary();
/**
* Adds the given ServerDescription or swaps it with an existing one
@@ -129,12 +88,29 @@ public:
std::string toString();
private:
+ friend bool operator==(const TopologyDescription& lhs, const TopologyDescription& rhs) {
+ return std::tie(lhs._setName,
+ lhs._type,
+ lhs._maxSetVersion,
+ lhs._maxElectionId,
+ lhs._servers,
+ lhs._compatible,
+ lhs._logicalSessionTimeoutMinutes) ==
+ std::tie(rhs._setName,
+ rhs._type,
+ rhs._maxSetVersion,
+ rhs._maxElectionId,
+ rhs._servers,
+ rhs._compatible,
+ rhs._logicalSessionTimeoutMinutes);
+ }
+
/**
* Checks if all server descriptions are compatible with this server's WireVersion. If an
- * incompatible description is found, we set the topologyDescription's _compatible flag to false
- * and store an error message in _compatibleError. A ServerDescription which is not Unknown is
- * incompatible if:
- * minWireVersion > serverMaxWireVersion, or maxWireVersion < serverMinWireVersion
+ * incompatible description is found, we set the topologyDescription's _compatible flag to
+ * false and store an error message in _compatibleError. A ServerDescription which is not
+ * Unknown is incompatible if: minWireVersion > serverMaxWireVersion, or maxWireVersion <
+ * serverMinWireVersion
*/
void checkWireCompatibilityVersions();
diff --git a/src/mongo/client/sdam/topology_description_test.cpp b/src/mongo/client/sdam/topology_description_test.cpp
index 9f7a6a2dbee..d43be9642b8 100644
--- a/src/mongo/client/sdam/topology_description_test.cpp
+++ b/src/mongo/client/sdam/topology_description_test.cpp
@@ -26,8 +26,6 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-
#include "mongo/client/sdam/sdam_test_base.h"
#include "mongo/client/sdam/topology_description.h"
@@ -36,7 +34,6 @@
#include "mongo/client/sdam/server_description.h"
#include "mongo/client/sdam/server_description_builder.h"
#include "mongo/db/wire_version.h"
-#include "mongo/logv2/log.h"
#include "mongo/unittest/death_test.h"
namespace mongo {
@@ -107,9 +104,10 @@ TEST_F(TopologyDescriptionTestFixture, ShouldAllowTypeSingleWithASingleSeed) {
}
TEST_F(TopologyDescriptionTestFixture, DoesNotAllowMultipleSeedsWithSingle) {
- ASSERT_THROWS_CODE(TopologyDescription({kTwoServersNormalCase, TopologyType::kSingle}),
- DBException,
- ErrorCodes::InvalidSeedList);
+ ASSERT_THROWS_CODE(
+ TopologyDescription(SdamConfiguration(kTwoServersNormalCase, TopologyType::kSingle)),
+ DBException,
+ ErrorCodes::InvalidSeedList);
}
TEST_F(TopologyDescriptionTestFixture, ShouldSetTheReplicaSetName) {
@@ -121,10 +119,10 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetTheReplicaSetName) {
}
TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowSettingTheReplicaSetNameWithWrongType) {
- ASSERT_THROWS_CODE(
- TopologyDescription({kOneServer, TopologyType::kUnknown, mongo::Seconds(10), kSetName}),
- DBException,
- ErrorCodes::InvalidTopologyType);
+ ASSERT_THROWS_CODE(TopologyDescription(SdamConfiguration(
+ kOneServer, TopologyType::kUnknown, mongo::Seconds(10), kSetName)),
+ DBException,
+ ErrorCodes::InvalidTopologyType);
}
TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowTopologyTypeRSNoPrimaryWithoutSetName) {
@@ -146,9 +144,8 @@ TEST_F(TopologyDescriptionTestFixture, ShouldOnlyAllowSingleAndRsNoPrimaryWithSe
topologyTypes.end());
for (const auto topologyType : topologyTypes) {
- LOGV2(20217,
- "Check TopologyType {topologyType} with setName value.",
- "topologyType"_attr = toString(topologyType));
+ unittest::log() << "Check TopologyType " << toString(topologyType)
+ << " with setName value.";
ASSERT_THROWS_CODE(
SdamConfiguration(kOneServer, topologyType, mongo::Seconds(10), kSetName),
DBException,
@@ -180,7 +177,7 @@ TEST_F(TopologyDescriptionTestFixture,
ShouldSetWireCompatibilityErrorForMinWireVersionWhenMinWireVersionIsGreater) {
const auto outgoingMaxWireVersion = WireSpec::instance().outgoing.maxWireVersion;
const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10));
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto serverDescriptionMinVersion = ServerDescriptionBuilder()
.withAddress(kOneServer[0])
.withMe(kOneServer[0])
@@ -188,16 +185,16 @@ TEST_F(TopologyDescriptionTestFixture,
.withMinWireVersion(outgoingMaxWireVersion + 1)
.instance();
- ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
- topologyDescription.installServerDescription(serverDescriptionMinVersion);
- ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
+ ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
+ topologyDescription->installServerDescription(serverDescriptionMinVersion);
+ ASSERT_NOT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
}
TEST_F(TopologyDescriptionTestFixture,
ShouldSetWireCompatibilityErrorForMinWireVersionWhenMaxWireVersionIsLess) {
const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion;
const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10));
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto serverDescriptionMaxVersion = ServerDescriptionBuilder()
.withAddress(kOneServer[0])
.withMe(kOneServer[0])
@@ -205,31 +202,31 @@ TEST_F(TopologyDescriptionTestFixture,
.withMaxWireVersion(outgoingMinWireVersion - 1)
.instance();
- ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
- topologyDescription.installServerDescription(serverDescriptionMaxVersion);
- ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
+ ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
+ topologyDescription->installServerDescription(serverDescriptionMaxVersion);
+ ASSERT_NOT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
}
TEST_F(TopologyDescriptionTestFixture, ShouldNotSetWireCompatibilityErrorWhenServerTypeIsUnknown) {
const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion;
const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10));
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto serverDescriptionMaxVersion =
ServerDescriptionBuilder().withMaxWireVersion(outgoingMinWireVersion - 1).instance();
- ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
- topologyDescription.installServerDescription(serverDescriptionMaxVersion);
- ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
+ ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
+ topologyDescription->installServerDescription(serverDescriptionMaxVersion);
+ ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
}
TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToMinOfAllServerDescriptions) {
const auto config = SdamConfiguration(kThreeServers);
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto logicalSessionTimeouts = std::vector{300, 100, 200};
auto timeoutIt = logicalSessionTimeouts.begin();
const auto serverDescriptionsWithTimeouts = map<ServerDescriptionPtr, ServerDescriptionPtr>(
- topologyDescription.getServers(), [&timeoutIt](const ServerDescriptionPtr& description) {
+ topologyDescription->getServers(), [&timeoutIt](const ServerDescriptionPtr& description) {
auto newInstanceBuilder = ServerDescriptionBuilder()
.withType(ServerType::kRSSecondary)
.withAddress(description->getAddress())
@@ -240,26 +237,26 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToMinOfAllS
});
for (auto description : serverDescriptionsWithTimeouts) {
- topologyDescription.installServerDescription(description);
+ topologyDescription->installServerDescription(description);
}
int expectedLogicalSessionTimeout =
*std::min_element(logicalSessionTimeouts.begin(), logicalSessionTimeouts.end());
ASSERT_EQUALS(expectedLogicalSessionTimeout,
- topologyDescription.getLogicalSessionTimeoutMinutes());
+ topologyDescription->getLogicalSessionTimeoutMinutes());
}
TEST_F(TopologyDescriptionTestFixture,
ShouldSetLogicalSessionTimeoutToNoneIfAnyServerDescriptionHasNone) {
const auto config = SdamConfiguration(kThreeServers);
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto logicalSessionTimeouts = std::vector{300, 100, 200};
auto timeoutIt = logicalSessionTimeouts.begin();
const auto serverDescriptionsWithTimeouts = map<ServerDescriptionPtr, ServerDescriptionPtr>(
- topologyDescription.getServers(), [&](const ServerDescriptionPtr& description) {
+ topologyDescription->getServers(), [&](const ServerDescriptionPtr& description) {
auto timeoutValue = (timeoutIt == logicalSessionTimeouts.begin())
? boost::none
: boost::make_optional(*timeoutIt);
@@ -275,19 +272,19 @@ TEST_F(TopologyDescriptionTestFixture,
});
for (auto description : serverDescriptionsWithTimeouts) {
- topologyDescription.installServerDescription(description);
+ topologyDescription->installServerDescription(description);
}
- ASSERT_EQUALS(boost::none, topologyDescription.getLogicalSessionTimeoutMinutes());
+ ASSERT_EQUALS(boost::none, topologyDescription->getLogicalSessionTimeoutMinutes());
}
TEST_F(TopologyDescriptionTestFixture, ShouldUpdateTopologyVersionOnSuccess) {
const auto config = SdamConfiguration(kThreeServers);
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
// Deafult topologyVersion is null
- ASSERT_EQUALS(topologyDescription.getServers().size(), 3);
- auto serverDescription = topologyDescription.getServers()[1];
+ ASSERT_EQUALS(topologyDescription->getServers().size(), 3);
+ auto serverDescription = topologyDescription->getServers()[1];
ASSERT(serverDescription->getTopologyVersion() == boost::none);
// Create new serverDescription with topologyVersion, topologyDescription should have the new
@@ -300,20 +297,19 @@ TEST_F(TopologyDescriptionTestFixture, ShouldUpdateTopologyVersionOnSuccess) {
.withTopologyVersion(TopologyVersion(processId, 1))
.instance();
- topologyDescription.installServerDescription(newDescription);
- ASSERT_EQUALS(topologyDescription.getServers().size(), 3);
- auto topologyVersion = topologyDescription.getServers()[1]->getTopologyVersion();
- ASSERT(topologyVersion->getProcessId() == processId);
- ASSERT(topologyVersion->getCounter() == 1);
+ topologyDescription->installServerDescription(newDescription);
+ ASSERT_EQUALS(topologyDescription->getServers().size(), 3);
+ auto topologyVersion = topologyDescription->getServers()[1]->getTopologyVersion();
+ ASSERT(topologyVersion == TopologyVersion(processId, 1));
}
TEST_F(TopologyDescriptionTestFixture, ShouldNotUpdateTopologyVersionOnError) {
const auto config = SdamConfiguration(kThreeServers);
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
// Deafult topologyVersion is null
- ASSERT_EQUALS(topologyDescription.getServers().size(), 3);
- auto serverDescription = topologyDescription.getServers()[1];
+ ASSERT_EQUALS(topologyDescription->getServers().size(), 3);
+ auto serverDescription = topologyDescription->getServers()[1];
ASSERT(serverDescription->getTopologyVersion() == boost::none);
auto newDescription = ServerDescriptionBuilder()
@@ -321,9 +317,9 @@ TEST_F(TopologyDescriptionTestFixture, ShouldNotUpdateTopologyVersionOnError) {
.withError("error")
.instance();
- topologyDescription.installServerDescription(newDescription);
- ASSERT_EQUALS(topologyDescription.getServers().size(), 3);
- auto topologyVersion = topologyDescription.getServers()[1]->getTopologyVersion();
+ topologyDescription->installServerDescription(newDescription);
+ ASSERT_EQUALS(topologyDescription->getServers().size(), 3);
+ auto topologyVersion = topologyDescription->getServers()[1]->getTopologyVersion();
ASSERT(topologyVersion == boost::none);
}
}; // namespace sdam
diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp
new file mode 100644
index 00000000000..995685b71b9
--- /dev/null
+++ b/src/mongo/client/sdam/topology_listener.cpp
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/client/sdam/topology_listener.h"
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+#include "mongo/util/log.h"
+
+namespace mongo::sdam {
+void TopologyEventsPublisher::registerListener(TopologyListenerPtr listener) {
+ stdx::lock_guard lock(_mutex);
+ _listeners.push_back(listener);
+}
+
+void TopologyEventsPublisher::removeListener(TopologyListenerPtr listener) {
+ stdx::lock_guard lock(_mutex);
+ _listeners.erase(std::remove(_listeners.begin(), _listeners.end(), listener), _listeners.end());
+}
+
+void TopologyEventsPublisher::close() {
+ stdx::lock_guard lock(_mutex);
+ _listeners.clear();
+ _isClosed = true;
+}
+
+void TopologyEventsPublisher::onTopologyDescriptionChangedEvent(
+ UUID topologyId,
+ TopologyDescriptionPtr previousDescription,
+ TopologyDescriptionPtr newDescription) {
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::TOPOLOGY_DESCRIPTION_CHANGED;
+ event->topologyId = std::move(topologyId);
+ event->previousDescription = previousDescription;
+ event->newDescription = newDescription;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
+
+void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) {
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::HEARTBEAT_SUCCESS;
+ event->duration = duration_cast<IsMasterRTT>(durationMs);
+ event->hostAndPort = hostAndPort;
+ event->reply = reply;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
+
+void TopologyEventsPublisher::onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
+ Status errorStatus,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) {
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::HEARTBEAT_FAILURE;
+ event->duration = duration_cast<IsMasterRTT>(durationMs);
+ event->hostAndPort = hostAndPort;
+ event->reply = reply;
+ event->status = errorStatus;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
+
+void TopologyEventsPublisher::_scheduleNextDelivery() {
+ // run nextDelivery async
+ _executor->schedule(
+ [self = shared_from_this()](const Status& status) { self->_nextDelivery(); });
+}
+
+void TopologyEventsPublisher::onServerPingFailedEvent(const ServerAddress& hostAndPort,
+ const Status& status) {}
+
+void TopologyEventsPublisher::onServerPingSucceededEvent(IsMasterRTT durationMS,
+ const ServerAddress& hostAndPort) {}
+
+// TODO: this could be done in batches if this is a bottleneck.
+void TopologyEventsPublisher::_nextDelivery() {
+ // get the next event to send
+ EventPtr nextEvent;
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ if (!_eventQueue.size()) {
+ return;
+ }
+ nextEvent = std::move(_eventQueue.front());
+ _eventQueue.pop_front();
+ }
+
+ // release the lock before sending to avoid deadlock in the case there
+ // are events generated by sending the current one.
+ std::vector<TopologyListenerPtr> listeners;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (_isClosed) {
+ return;
+ }
+ listeners = _listeners;
+ }
+
+ // send to the listeners outside of the lock.
+ for (auto listener : listeners) {
+ _sendEvent(listener, *nextEvent);
+ }
+}
+
+void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Event& event) {
+ switch (event.type) {
+ case EventType::HEARTBEAT_SUCCESS:
+ listener->onServerHeartbeatSucceededEvent(
+ duration_cast<IsMasterRTT>(event.duration), event.hostAndPort, event.reply);
+ break;
+ case EventType::HEARTBEAT_FAILURE:
+ listener->onServerHeartbeatFailureEvent(duration_cast<IsMasterRTT>(event.duration),
+ event.status,
+ event.hostAndPort,
+ event.reply);
+ break;
+ case EventType::TOPOLOGY_DESCRIPTION_CHANGED:
+ // TODO: fix uuid or just remove
+ listener->onTopologyDescriptionChangedEvent(
+ UUID::gen(), event.previousDescription, event.newDescription);
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+}; // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h
index 0cf38ddf68d..dde1ae3b683 100644
--- a/src/mongo/client/sdam/topology_listener.h
+++ b/src/mongo/client/sdam/topology_listener.h
@@ -26,10 +26,13 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-
#pragma once
+#include <deque>
+#include <memory>
+#include <vector>
#include "mongo/client/sdam/sdam_datatypes.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/util/uuid.h"
namespace mongo::sdam {
@@ -49,13 +52,18 @@ public:
TopologyDescriptionPtr previousDescription,
TopologyDescriptionPtr newDescription){};
+ virtual void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
+ Status errorStatus,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply){};
/**
* Called when a ServerHeartBeatSucceededEvent is published - A heartbeat sent to the server at
* hostAndPort succeeded. durationMS is the execution time of the event, including the time it
* took to send the message and recieve the reply from the server.
*/
virtual void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
- const ServerAddress& hostAndPort){};
+ const ServerAddress& hostAndPort,
+ const BSONObj reply){};
/*
* Called when a ServerPingFailedEvent is published - A monitoring ping to the server at
@@ -70,4 +78,69 @@ public:
virtual void onServerPingSucceededEvent(IsMasterRTT durationMS,
const ServerAddress& hostAndPort){};
};
+
+/**
+ * This class publishes TopologyListener events to a group of registered listeners.
+ *
+ * To publish an event to all registered listeners call the corresponding event function on the
+ * TopologyEventsPublisher instance.
+ */
+class TopologyEventsPublisher final : public TopologyListener,
+ public std::enable_shared_from_this<TopologyEventsPublisher> {
+public:
+ TopologyEventsPublisher(std::shared_ptr<executor::TaskExecutor> executor)
+ : _executor(executor){};
+ void registerListener(TopologyListenerPtr listener);
+ void removeListener(TopologyListenerPtr listener);
+ void close();
+
+ void onTopologyDescriptionChangedEvent(UUID topologyId,
+ TopologyDescriptionPtr previousDescription,
+ TopologyDescriptionPtr newDescription) override;
+ void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) override;
+ void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
+ Status errorStatus,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) override;
+ void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override;
+ void onServerPingSucceededEvent(IsMasterRTT durationMS,
+ const ServerAddress& hostAndPort) override;
+
+
+private:
+ enum class EventType {
+ HEARTBEAT_SUCCESS,
+ HEARTBEAT_FAILURE,
+ PING_SUCCESS,
+ PING_FAILURE,
+ TOPOLOGY_DESCRIPTION_CHANGED
+ };
+ struct Event {
+ EventType type;
+ ServerAddress hostAndPort;
+ IsMasterRTT duration;
+ BSONObj reply;
+ TopologyDescriptionPtr previousDescription;
+ TopologyDescriptionPtr newDescription;
+ boost::optional<UUID> topologyId;
+ Status status = Status::OK();
+ };
+ using EventPtr = std::unique_ptr<Event>;
+
+ void _sendEvent(TopologyListenerPtr listener, const TopologyEventsPublisher::Event& event);
+ void _nextDelivery();
+ void _scheduleNextDelivery();
+
+ // Lock acquisition order to avoid deadlock is _eventQueueMutex -> _mutex
+ Mutex _eventQueueMutex;
+ std::deque<EventPtr> _eventQueue;
+
+ Mutex _mutex;
+ bool _isClosed = false;
+ std::shared_ptr<executor::TaskExecutor> _executor;
+ std::vector<TopologyListenerPtr> _listeners;
+};
+using TopologyEventsPublisherPtr = std::shared_ptr<TopologyEventsPublisher>;
} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp
index 7897e6948f7..64d74e114d9 100644
--- a/src/mongo/client/sdam/topology_manager.cpp
+++ b/src/mongo/client/sdam/topology_manager.cpp
@@ -32,9 +32,9 @@
#include "mongo/client/sdam/topology_state_machine.h"
#include "mongo/logv2/log.h"
+#include "mongo/rpc/topology_version_gen.h"
namespace mongo::sdam {
-
namespace {
/* Compare topologyVersions to determine if the isMaster response's topologyVersion is stale
@@ -57,16 +57,19 @@ bool isStaleTopologyVersion(boost::optional<TopologyVersion> lastTopologyVersion
return false;
}
-
} // namespace
-TopologyManager::TopologyManager(SdamConfiguration config, ClockSource* clockSource)
+
+TopologyManager::TopologyManager(SdamConfiguration config,
+ ClockSource* clockSource,
+ TopologyEventsPublisherPtr eventsPublisher)
: _config(std::move(config)),
_clockSource(clockSource),
_topologyDescription(std::make_unique<TopologyDescription>(_config)),
- _topologyStateMachine(std::make_unique<TopologyStateMachine>(_config)) {}
+ _topologyStateMachine(std::make_unique<TopologyStateMachine>(_config)),
+ _topologyEventsPublisher(eventsPublisher) {}
-void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome) {
+bool TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome) {
stdx::lock_guard<mongo::Mutex> lock(_mutex);
boost::optional<IsMasterRTT> lastRTT;
@@ -85,11 +88,11 @@ void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome
if (isStaleTopologyVersion(lastTopologyVersion, newTopologyVersion)) {
LOGV2(
23930,
- "Ignoring this isMaster response because our topologyVersion: {lastTopologyVersion}is "
+ "Ignoring this isMaster response because our topologyVersion: {lastTopologyVersion} is "
"fresher than the provided topologyVersion: {newTopologyVersion}",
"lastTopologyVersion"_attr = lastTopologyVersion->toBSON(),
"newTopologyVersion"_attr = newTopologyVersion->toBSON());
- return;
+ return false;
}
boost::optional<int> poolResetCounter = lastPoolResetCounter;
@@ -98,17 +101,60 @@ void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome
poolResetCounter = ++lastPoolResetCounter.get();
}
- // newTopologyVersion will be null if the isMaster response did not provide one.
auto newServerDescription = std::make_shared<ServerDescription>(
_clockSource, isMasterOutcome, lastRTT, newTopologyVersion, poolResetCounter);
- auto newTopologyDescription = std::make_unique<TopologyDescription>(*_topologyDescription);
- _topologyStateMachine->onServerDescription(*newTopologyDescription, newServerDescription);
- _topologyDescription = std::move(newTopologyDescription);
+ auto oldTopologyDescription = _topologyDescription;
+ _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription);
+
+ // if we are equal to the old description, just install the new description without
+ // performing any actions on the state machine.
+ auto isEqualToOldServerDescription =
+ (lastServerDescription && (*lastServerDescription->get()) == *newServerDescription);
+ if (isEqualToOldServerDescription) {
+ _topologyDescription->installServerDescription(newServerDescription);
+ } else {
+ _topologyStateMachine->onServerDescription(*_topologyDescription, newServerDescription);
+ }
+
+ _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription);
+ return true;
}
const std::shared_ptr<TopologyDescription> TopologyManager::getTopologyDescription() const {
stdx::lock_guard<mongo::Mutex> lock(_mutex);
return _topologyDescription;
}
+
+void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt) {
+ stdx::lock_guard<mongo::Mutex> lock(_mutex);
+
+ auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort);
+ if (oldServerDescription) {
+ auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt);
+
+ auto oldTopologyDescription = _topologyDescription;
+ _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription);
+ _topologyDescription->installServerDescription(newServerDescription);
+
+ _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription);
+
+ return;
+ }
+
+ // otherwise, the server was removed from the topology. Nothing to do.
+ LOGV2(433301,
+ str::stream() << "Not updating RTT. Server {server}" << hostAndPort
+ << " does not exist in ",
+ "server"_attr = hostAndPort,
+ "setName"_attr = getTopologyDescription()->getSetName());
+}
+
+void TopologyManager::_publishTopologyDescriptionChanged(
+ const TopologyDescriptionPtr& oldTopologyDescription,
+ const TopologyDescriptionPtr& newTopologyDescription) const {
+ if (_topologyEventsPublisher)
+ _topologyEventsPublisher->onTopologyDescriptionChangedEvent(
+ newTopologyDescription->getId(), oldTopologyDescription, newTopologyDescription);
+}
}; // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_manager.h b/src/mongo/client/sdam/topology_manager.h
index 292d48b6e8e..5ae64d94644 100644
--- a/src/mongo/client/sdam/topology_manager.h
+++ b/src/mongo/client/sdam/topology_manager.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2019-present MongoDB, Inc.
+ * Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -31,6 +31,7 @@
#include "mongo/client/sdam/sdam_datatypes.h"
#include "mongo/client/sdam/topology_description.h"
+#include "mongo/client/sdam/topology_listener.h"
#include "mongo/client/sdam/topology_state_machine.h"
namespace mongo::sdam {
@@ -44,7 +45,9 @@ class TopologyManager {
TopologyManager(const TopologyManager&) = delete;
public:
- TopologyManager(SdamConfiguration config, ClockSource* clockSource);
+ explicit TopologyManager(SdamConfiguration config,
+ ClockSource* clockSource,
+ TopologyEventsPublisherPtr eventsPublisher = nullptr);
/**
* This function atomically:
@@ -57,7 +60,19 @@ public:
* IsMasterOutcomes serially, as required by:
* https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#process-one-ismaster-outcome-at-a-time
*/
- void onServerDescription(const IsMasterOutcome& isMasterOutcome);
+ bool onServerDescription(const IsMasterOutcome& isMasterOutcome);
+
+
+ /**
+ * This function updates the RTT value for a server without executing any state machine actions.
+ * It atomically:
+ * 1. Clones the current TopologyDescription
+ * 2. Clones the ServerDescription corresponding to hostAndPort such that it contains the new
+ * RTT value.
+ * 3. Installs the cloned ServerDescription into the TopologyDescription from step 1
+ * 4. Installs the cloned TopologyDescription as the current one.
+ */
+ void onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt);
/**
* Get the current TopologyDescription. This is safe to call from multiple threads.
@@ -65,10 +80,15 @@ public:
const TopologyDescriptionPtr getTopologyDescription() const;
private:
+ void _publishTopologyDescriptionChanged(
+ const TopologyDescriptionPtr& oldTopologyDescription,
+ const TopologyDescriptionPtr& newTopologyDescription) const;
+
mutable mongo::Mutex _mutex = MONGO_MAKE_LATCH("TopologyManager");
const SdamConfiguration _config;
ClockSource* _clockSource;
- std::shared_ptr<TopologyDescription> _topologyDescription;
- std::unique_ptr<TopologyStateMachine> _topologyStateMachine;
+ TopologyDescriptionPtr _topologyDescription;
+ TopologyStateMachinePtr _topologyStateMachine;
+ TopologyEventsPublisherPtr _topologyEventsPublisher;
};
} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_state_machine.h b/src/mongo/client/sdam/topology_state_machine.h
index abed9bc854f..fcb5b7e0c99 100644
--- a/src/mongo/client/sdam/topology_state_machine.h
+++ b/src/mongo/client/sdam/topology_state_machine.h
@@ -101,4 +101,5 @@ private:
static inline auto kLogPrefix = "sdam : ";
};
+using TopologyStateMachinePtr = std::unique_ptr<TopologyStateMachine>;
} // namespace mongo::sdam
diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp
new file mode 100644
index 00000000000..8ae85924058
--- /dev/null
+++ b/src/mongo/client/server_is_master_monitor.cpp
@@ -0,0 +1,375 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/client/server_is_master_monitor.h"
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+#include "mongo/client/sdam/sdam.h"
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/network_interface_thread_pool.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+const BSONObj IS_MASTER_BSON = BSON("isMaster" << 1);
+
+using executor::NetworkInterface;
+using executor::NetworkInterfaceThreadPool;
+using executor::TaskExecutor;
+using executor::ThreadPoolTaskExecutor;
+
+const Milliseconds kZeroMs = Milliseconds{0};
+} // namespace
+
+SingleServerIsMasterMonitor::SingleServerIsMasterMonitor(
+ const MongoURI& setUri,
+ const sdam::ServerAddress& host,
+ Milliseconds heartbeatFrequencyMS,
+ sdam::TopologyEventsPublisherPtr eventListener,
+ std::shared_ptr<executor::TaskExecutor> executor)
+ : _host(host),
+ _eventListener(eventListener),
+ _executor(executor),
+ _heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)),
+ _isShutdown(true),
+ _setUri(setUri) {
+ LOG(kLogLevel.lessSevere()) << "Created Replica Set SingleServerIsMasterMonitor for host "
+ << host;
+}
+
+void SingleServerIsMasterMonitor::init() {
+ stdx::lock_guard lock(_mutex);
+ _isShutdown = false;
+ _scheduleNextIsMaster(lock, Milliseconds(0));
+}
+
+void SingleServerIsMasterMonitor::requestImmediateCheck() {
+ Milliseconds delayUntilNextCheck;
+ stdx::lock_guard lock(_mutex);
+ if (_isShutdown)
+ return;
+
+ // remain in expedited mode until the replica set recovers
+ if (!_isExpedited) {
+ // save some log lines.
+ LOG(kLogLevel) << "[SingleServerIsMasterMonitor] Monitoring " << _host
+ << " in expedited mode until we detect a primary.";
+ _isExpedited = true;
+ }
+
+ // .. but continue with rescheduling the next request.
+
+ if (_isMasterOutstanding) {
+ LOG(kLogLevel) << "[SingleServerIsMasterMonitor] immediate isMaster check requested, but "
+ "there is already an "
+ "outstanding request.";
+ return;
+ }
+
+ const auto currentRefreshPeriod = _currentRefreshPeriod(lock);
+
+ const Milliseconds timeSinceLastCheck =
+ (_lastIsMasterAt) ? _executor->now() - *_lastIsMasterAt : Milliseconds::max();
+
+ delayUntilNextCheck = (_lastIsMasterAt && (timeSinceLastCheck < currentRefreshPeriod))
+ ? currentRefreshPeriod - timeSinceLastCheck
+ : kZeroMs;
+
+ // if our calculated delay is less than the next scheduled call, then run the check sooner.
+ // Otherwise, do nothing. Three cases to cancel existing request:
+ // 1. refresh period has changed to expedited, so (currentRefreshPeriod - timeSinceLastCheck) is
+ // < 0
+ // 2. calculated delay is less then next scheduled isMaster
+ // 3. isMaster was never scheduled.
+ if (((currentRefreshPeriod - timeSinceLastCheck) < kZeroMs) ||
+ (delayUntilNextCheck < (currentRefreshPeriod - timeSinceLastCheck)) ||
+ timeSinceLastCheck == Milliseconds::max()) {
+ _cancelOutstandingRequest(lock);
+ } else {
+ return;
+ }
+
+ LOG(kLogLevel) << "[SingleServerIsMasterMonitor] Rescheduling next isMaster check for "
+ << this->_host << " in " << delayUntilNextCheck;
+ _scheduleNextIsMaster(lock, delayUntilNextCheck);
+}
+
+void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds delay) {
+ if (_isShutdown)
+ return;
+
+ invariant(!_isMasterOutstanding);
+
+ Timer timer;
+ auto swCbHandle = _executor->scheduleWorkAt(
+ _executor->now() + delay,
+ [self = shared_from_this()](const executor::TaskExecutor::CallbackArgs& cbData) {
+ if (!cbData.status.isOK()) {
+ return;
+ }
+ self->_doRemoteCommand();
+ });
+
+ if (!swCbHandle.isOK()) {
+ Microseconds latency(timer.micros());
+ _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
+ return;
+ }
+
+ _nextIsMasterHandle = swCbHandle.getValue();
+}
+
+void SingleServerIsMasterMonitor::_doRemoteCommand() {
+ auto request = executor::RemoteCommandRequest(
+ HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS);
+ request.sslMode = _setUri.getSSLMode();
+
+ stdx::lock_guard lock(_mutex);
+ if (_isShutdown)
+ return;
+
+ Timer timer;
+ auto swCbHandle = _executor->scheduleRemoteCommand(
+ std::move(request),
+ [self = shared_from_this(),
+ timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
+ Milliseconds nextRefreshPeriod;
+ {
+ stdx::lock_guard lk(self->_mutex);
+ self->_isMasterOutstanding = false;
+
+ if (self->_isShutdown || ErrorCodes::isCancelationError(result.response.status)) {
+ LOG(kLogLevel) << "[SingleServerIsMasterMonitor] not processing response: "
+ << result.response.status;
+ return;
+ }
+
+ self->_lastIsMasterAt = self->_executor->now();
+ nextRefreshPeriod = self->_currentRefreshPeriod(lk);
+ LOG(kLogLevel.lessSevere())
+ << "next refresh period in " + nextRefreshPeriod.toString();
+ self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+ }
+
+ Microseconds latency(timer.micros());
+ if (result.response.isOK()) {
+ self->_onIsMasterSuccess(latency, result.response.data);
+ } else {
+ self->_onIsMasterFailure(latency, result.response.status, result.response.data);
+ }
+ });
+
+ if (!swCbHandle.isOK()) {
+ Microseconds latency(timer.micros());
+ _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
+ uasserted(31448, swCbHandle.getStatus().toString());
+ }
+
+ _isMasterOutstanding = true;
+ _remoteCommandHandle = swCbHandle.getValue();
+}
+
+void SingleServerIsMasterMonitor::shutdown() {
+ stdx::lock_guard lock(_mutex);
+ LOG(kLogLevel.lessSevere()) << "Closing Replica Set SingleServerIsMasterMonitor for host "
+ << _host;
+ _isShutdown = true;
+
+ _cancelOutstandingRequest(lock);
+
+ _executor = nullptr;
+ LOG(kLogLevel.lessSevere()) << "Done Closing Replica Set SingleServerIsMasterMonitor for host "
+ << _host;
+}
+
+void SingleServerIsMasterMonitor::_cancelOutstandingRequest(WithLock) {
+ if (_nextIsMasterHandle.isValid()) {
+ _executor->cancel(_nextIsMasterHandle);
+ }
+
+ if (_remoteCommandHandle.isValid()) {
+ _executor->cancel(_remoteCommandHandle);
+ }
+
+ _isMasterOutstanding = false;
+}
+
+void SingleServerIsMasterMonitor::_onIsMasterSuccess(sdam::IsMasterRTT latency,
+ const BSONObj bson) {
+ LOG(kLogLevel.lessSevere()) << "received successful isMaster for server " << _host << " ("
+ << latency << ")"
+ << "; " << bson.toString();
+ _eventListener->onServerHeartbeatSucceededEvent(
+ duration_cast<Milliseconds>(latency), _host, bson);
+}
+
+void SingleServerIsMasterMonitor::_onIsMasterFailure(sdam::IsMasterRTT latency,
+ const Status& status,
+ const BSONObj bson) {
+ LOG(kLogLevel) << "received failed isMaster for server " << _host << ": " << status.toString()
+ << " (" << latency << ")"
+ << "; " << bson.toString();
+ _eventListener->onServerHeartbeatFailureEvent(
+ duration_cast<Milliseconds>(latency), status, _host, bson);
+}
+
+Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds original) {
+ Milliseconds r = original;
+ static constexpr auto kPeriodField = "period"_sd;
+ if (auto modifyReplicaSetMonitorDefaultRefreshPeriod = globalFailPointRegistry().find("modifyReplicaSetMonitorDefaultRefreshPeriod")) {
+ modifyReplicaSetMonitorDefaultRefreshPeriod->executeIf(
+ [&r](const BSONObj& data) {
+ r = duration_cast<Milliseconds>(Seconds{data.getIntField(kPeriodField)});
+ },
+ [](const BSONObj& data) { return data.hasField(kPeriodField); });
+ }
+ return r;
+}
+
+Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) {
+ return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS
+ : _heartbeatFrequencyMS;
+}
+
+void SingleServerIsMasterMonitor::disableExpeditedChecking() {
+ stdx::lock_guard lock(_mutex);
+ _isExpedited = false;
+}
+
+
+ServerIsMasterMonitor::ServerIsMasterMonitor(
+ const MongoURI& setUri,
+ const sdam::SdamConfiguration& sdamConfiguration,
+ sdam::TopologyEventsPublisherPtr eventsPublisher,
+ sdam::TopologyDescriptionPtr initialTopologyDescription,
+ std::shared_ptr<executor::TaskExecutor> executor)
+ : _sdamConfiguration(sdamConfiguration),
+ _eventPublisher(eventsPublisher),
+ _executor(_setupExecutor(executor)),
+ _isShutdown(false),
+ _setUri(setUri) {
+ LOG(kLogLevel) << "Starting Replica Set IsMaster monitor with "
+ << initialTopologyDescription->getServers().size() << " members.";
+ onTopologyDescriptionChangedEvent(
+ initialTopologyDescription->getId(), nullptr, initialTopologyDescription);
+}
+
+void ServerIsMasterMonitor::shutdown() {
+ stdx::lock_guard lock(_mutex);
+ if (_isShutdown)
+ return;
+
+ _isShutdown = true;
+ for (auto singleMonitor : _singleMonitors) {
+ singleMonitor.second->shutdown();
+ }
+}
+
+void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent(
+ UUID topologyId,
+ sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription) {
+ stdx::lock_guard lock(_mutex);
+ if (_isShutdown)
+ return;
+
+ const auto newType = newDescription->getType();
+ using sdam::TopologyType;
+
+ if (newType == TopologyType::kSingle || newType == TopologyType::kReplicaSetWithPrimary ||
+ newType == TopologyType::kSharded) {
+ _disableExpeditedChecking(lock);
+ }
+
+ // remove monitors that are missing from the topology
+ auto it = _singleMonitors.begin();
+ while (it != _singleMonitors.end()) {
+ const auto& serverAddress = it->first;
+ if (newDescription->findServerByAddress(serverAddress) == boost::none) {
+ auto& singleMonitor = _singleMonitors[serverAddress];
+ singleMonitor->shutdown();
+ LOG(kLogLevel) << serverAddress << " was removed from the topology.";
+ it = _singleMonitors.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ // add new monitors
+ newDescription->findServers([this](const sdam::ServerDescriptionPtr& serverDescription) {
+ const auto& serverAddress = serverDescription->getAddress();
+ bool isMissing =
+ _singleMonitors.find(serverDescription->getAddress()) == _singleMonitors.end();
+ if (isMissing) {
+ LOG(kLogLevel) << serverAddress << " was added to the topology.";
+ _singleMonitors[serverAddress] = std::make_shared<SingleServerIsMasterMonitor>(
+ _setUri,
+ serverAddress,
+ _sdamConfiguration.getHeartBeatFrequency(),
+ _eventPublisher,
+ _executor);
+ _singleMonitors[serverAddress]->init();
+ }
+ return isMissing;
+ });
+}
+
+std::shared_ptr<executor::TaskExecutor> ServerIsMasterMonitor::_setupExecutor(
+ const std::shared_ptr<executor::TaskExecutor>& executor) {
+ if (executor)
+ return executor;
+
+ auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
+ auto net = executor::makeNetworkInterface(
+ "ServerIsMasterMonitor-TaskExecutor", nullptr, std::move(hookList));
+ auto pool = std::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
+ auto result = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ result->startup();
+ return result;
+}
+
+void ServerIsMasterMonitor::requestImmediateCheck() {
+ stdx::lock_guard lock(_mutex);
+ if (_isShutdown)
+ return;
+
+ for (auto& addressAndMonitor : _singleMonitors) {
+ addressAndMonitor.second->requestImmediateCheck();
+ }
+}
+
+void ServerIsMasterMonitor::_disableExpeditedChecking(WithLock) {
+ for (auto& addressAndMonitor : _singleMonitors) {
+ addressAndMonitor.second->disableExpeditedChecking();
+ }
+}
+} // namespace mongo
diff --git a/src/mongo/client/server_is_master_monitor.h b/src/mongo/client/server_is_master_monitor.h
new file mode 100644
index 00000000000..14e2e202ba4
--- /dev/null
+++ b/src/mongo/client/server_is_master_monitor.h
@@ -0,0 +1,135 @@
+
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/client/mongo_uri.h"
+#include "mongo/client/sdam/sdam.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/net/hostandport.h"
+
+namespace mongo {
+using namespace sdam;
+
+class SingleServerIsMasterMonitor
+ : public std::enable_shared_from_this<SingleServerIsMasterMonitor> {
+public:
+ explicit SingleServerIsMasterMonitor(const MongoURI& setUri,
+ const ServerAddress& host,
+ Milliseconds heartbeatFrequencyMS,
+ TopologyEventsPublisherPtr eventListener,
+ std::shared_ptr<executor::TaskExecutor> executor);
+
+ void init();
+ void shutdown();
+
+ /**
+ * Request an immediate check. The server will be checked immediately if we haven't completed
+ * an isMaster less than SdamConfiguration::kMinHeartbeatFrequencyMS ago. Otherwise,
+ * we schedule a check that runs after SdamConfiguration::kMinHeartbeatFrequencyMS since
+ * the last isMaster.
+ */
+ void requestImmediateCheck();
+ void disableExpeditedChecking();
+
+private:
+ void _scheduleNextIsMaster(WithLock, Milliseconds delay);
+ void _doRemoteCommand();
+
+ void _onIsMasterSuccess(IsMasterRTT latency, const BSONObj bson);
+ void _onIsMasterFailure(IsMasterRTT latency, const Status& status, const BSONObj bson);
+
+ Milliseconds _overrideRefreshPeriod(Milliseconds original);
+ Milliseconds _currentRefreshPeriod(WithLock);
+ void _cancelOutstandingRequest(WithLock);
+
+ static inline const logger::LogSeverity kLogLevel = logger::LogSeverity::Debug(1);
+
+ Mutex _mutex;
+ ServerAddress _host;
+ TopologyEventsPublisherPtr _eventListener;
+ std::shared_ptr<executor::TaskExecutor> _executor;
+ Milliseconds _heartbeatFrequencyMS;
+ Milliseconds _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS;
+
+ boost::optional<Date_t> _lastIsMasterAt;
+ bool _isMasterOutstanding = false;
+ bool _isExpedited = false;
+ executor::TaskExecutor::CallbackHandle _nextIsMasterHandle;
+ executor::TaskExecutor::CallbackHandle _remoteCommandHandle;
+
+ bool _isShutdown;
+ MongoURI _setUri;
+};
+using SingleServerIsMasterMonitorPtr = std::shared_ptr<SingleServerIsMasterMonitor>;
+
+
+class ServerIsMasterMonitor : public TopologyListener {
+public:
+ ServerIsMasterMonitor(const MongoURI& setUri,
+ const SdamConfiguration& sdamConfiguration,
+ TopologyEventsPublisherPtr eventsPublisher,
+ TopologyDescriptionPtr initialTopologyDescription,
+ std::shared_ptr<executor::TaskExecutor> executor = nullptr);
+
+ virtual ~ServerIsMasterMonitor() {}
+
+ void shutdown();
+
+ /**
+ * Request an immediate check of each member in the replica set.
+ */
+ void requestImmediateCheck();
+
+ /**
+ * Add/Remove Single Monitors based on the current topology membership.
+ */
+ void onTopologyDescriptionChangedEvent(UUID topologyId,
+ TopologyDescriptionPtr previousDescription,
+ TopologyDescriptionPtr newDescription) override;
+
+private:
+ /**
+ * If the provided executor exists, use that one (for testing). Otherwise create a new one.
+ */
+ std::shared_ptr<executor::TaskExecutor> _setupExecutor(
+ const std::shared_ptr<executor::TaskExecutor>& executor);
+ void _disableExpeditedChecking(WithLock);
+
+ static inline const logger::LogSeverity kLogLevel = logger::LogSeverity::Debug(0);
+
+ Mutex _mutex;
+ SdamConfiguration _sdamConfiguration;
+ TopologyEventsPublisherPtr _eventPublisher;
+ std::shared_ptr<executor::TaskExecutor> _executor;
+ std::unordered_map<ServerAddress, SingleServerIsMasterMonitorPtr> _singleMonitors;
+ bool _isShutdown;
+ MongoURI _setUri;
+};
+using ServerIsMasterMonitorPtr = std::shared_ptr<ServerIsMasterMonitor>;
+} // namespace mongo
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 0056a98c567..f497648c82a 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -27,82 +27,565 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/platform/basic.h"
#include "mongo/client/streamable_replica_set_monitor.h"
-#include <functional>
-#include <memory>
-#include <set>
-#include <string>
+#include <algorithm>
+#include <limits>
-#include "mongo/client/mongo_uri.h"
-#include "mongo/client/replica_set_change_notifier.h"
-#include "mongo/client/replica_set_monitor.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/util/concurrency/with_lock.h"
-#include "mongo/util/duration.h"
-#include "mongo/util/net/hostandport.h"
-#include "mongo/util/time_support.h"
+#include "mongo/bson/simple_bsonelement_comparator.h"
+#include "mongo/client/connpool.h"
+#include "mongo/client/global_conn_pool.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/client/streamable_replica_set_monitor_query_processor.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/bson_extract_optime.h"
+#include "mongo/db/server_options.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/log.h"
+#include "mongo/util/string_map.h"
+#include "mongo/util/timer.h"
namespace mongo {
+using namespace mongo::sdam;
-StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri) {}
-void StreamableReplicaSetMonitor::init() {}
+using std::numeric_limits;
+using std::set;
+using std::shared_ptr;
+using std::string;
+using std::vector;
-void StreamableReplicaSetMonitor::drop() {}
+namespace {
+// Pull nested types to top-level scope
+using executor::TaskExecutor;
+using CallbackArgs = TaskExecutor::CallbackArgs;
+using CallbackHandle = TaskExecutor::CallbackHandle;
-SemiFuture<HostAndPort> StreamableReplicaSetMonitor::getHostOrRefresh(
- const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
- MONGO_UNREACHABLE;
+const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet());
+
+// Utility functions to use when finding servers
+bool minWireCompare(const ServerDescriptionPtr& a, const ServerDescriptionPtr& b) {
+ return a->getMinWireVersion() < b->getMinWireVersion();
+}
+
+bool maxWireCompare(const ServerDescriptionPtr& a, const ServerDescriptionPtr& b) {
+ return a->getMaxWireVersion() < b->getMaxWireVersion();
+}
+
+bool secondaryPredicate(const ServerDescriptionPtr& server) {
+ return server->getType() == ServerType::kRSSecondary;
+}
+
+std::string readPrefToStringWithMinOpTime(const ReadPreferenceSetting& readPref) {
+ BSONObjBuilder builder;
+ readPref.toInnerBSON(&builder);
+ if (!readPref.minOpTime.isNull()) {
+ builder.append("minOpTime", readPref.minOpTime.toBSON());
+ }
+ return builder.obj().toString();
+}
+
+// TODO: remove
+std::string hostListToString(boost::optional<std::vector<HostAndPort>> x) {
+ std::stringstream s;
+ if (x) {
+ for (auto h : *x) {
+ s << h.toString() << "; ";
+ }
+ }
+ return s.str();
+}
+
+int32_t pingTimeMillis(const ServerDescriptionPtr& serverDescription) {
+ static const Milliseconds maxLatency = Milliseconds::max();
+ const auto& serverRtt = serverDescription->getRtt();
+ auto latencyMillis = (serverRtt) ? duration_cast<Milliseconds>(*serverRtt) : maxLatency;
+ return std::min(latencyMillis, maxLatency).count();
+}
+
+constexpr auto kZeroMs = Milliseconds(0);
+} // namespace
+
+StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri, std::shared_ptr<TaskExecutor> executor) :
+ _serverSelector(std::make_unique<SdamServerSelector>(kServerSelectionConfig)),
+ _queryProcessor(std::make_shared<StreamableReplicaSetMonitorQueryProcessor>()),
+ _uri(uri),
+ _executor(executor),
+ _random(PseudoRandom(SecureRandom().nextInt64())) {
+
+ // TODO: sdam should use the HostAndPort type for ServerAddress
+ std::vector<ServerAddress> seeds;
+ for (const auto& seed : uri.getServers()) {
+ seeds.push_back(seed.toString());
+ }
+
+ _sdamConfig = SdamConfiguration(seeds);
+}
+
+ReplicaSetMonitorPtr StreamableReplicaSetMonitor::make(const MongoURI& uri,
+ std::shared_ptr<TaskExecutor> executor) {
+ auto result = std::make_shared<StreamableReplicaSetMonitor>(uri, executor);
+ result->init();
+ return result;
+}
+
+void StreamableReplicaSetMonitor::init() {
+ stdx::lock_guard lock(_mutex);
+ LOG(kDefaultLogLevel) << _logPrefix() << "Starting Replica Set Monitor with uri: " << _uri;
+
+ _eventsPublisher = std::make_shared<sdam::TopologyEventsPublisher>(_executor);
+ _topologyManager = std::make_unique<TopologyManager>(
+ _sdamConfig, getGlobalServiceContext()->getPreciseClockSource(), _eventsPublisher);
+ _isMasterMonitor = std::make_unique<ServerIsMasterMonitor>(
+ _uri, _sdamConfig, _eventsPublisher, _topologyManager->getTopologyDescription(), _executor);
+
+ _eventsPublisher->registerListener(shared_from_this());
+ _eventsPublisher->registerListener(_isMasterMonitor);
+ _isDropped.store(false);
+
+ ReplicaSetMonitorManager::get()->getNotifier().onFoundSet(getName());
+}
+
+void StreamableReplicaSetMonitor::drop() {
+ stdx::lock_guard lock(_mutex);
+ if (_isDropped.load())
+ return;
+
+ _isDropped.store(true);
+ LOG(kDefaultLogLevel) << _logPrefix() << "Closing Replica Set Monitor";
+ _eventsPublisher->close();
+ _queryProcessor->shutdown();
+ _isMasterMonitor->shutdown();
+ _failOutstandingWithStatus(
+ lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"});
+
+ ReplicaSetMonitorManager::get()->getNotifier().onDroppedSet(getName());
+
+ LOG(kDefaultLogLevel) << _logPrefix() << "Done closing Replica Set Monitor";
+}
+
+SemiFuture<HostAndPort> StreamableReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
+ Milliseconds maxWait) {
+ return getHostsOrRefresh(criteria, maxWait)
+ .thenRunOn(_executor)
+ .then([self = shared_from_this()](const std::vector<HostAndPort>& result) {
+ invariant(result.size());
+ return result[self->_random.nextInt64(result.size())];
+ })
+ .semi();
+}
+
+std::vector<HostAndPort> StreamableReplicaSetMonitor::_extractHosts(
+ const std::vector<ServerDescriptionPtr>& serverDescriptions) {
+ std::vector<HostAndPort> result;
+ for (const auto& server : serverDescriptions) {
+ result.push_back(HostAndPort(server->getAddress()));
+ }
+ return result;
}
SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefresh(
- const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
- MONGO_UNREACHABLE;
+ const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
+ // In the fast case (stable topology), we avoid mutex acquisition.
+ if (_isDropped.load()) {
+ return _makeReplicaSetMonitorRemovedError();
+ }
+
+ // start counting from the beginning of the operation
+ const auto deadline = _executor->now() + ((maxWait > kZeroMs) ? maxWait : kZeroMs);
+
+ // try to satisfy query immediately
+ auto immediateResult = _getHosts(criteria);
+ if (immediateResult) {
+ LOG(kLowerLogLevel) << _logPrefix()
+ << "getHosts: " << readPrefToStringWithMinOpTime(criteria) << " -> "
+ << hostListToString(immediateResult);
+ return {*immediateResult};
+ }
+
+ _isMasterMonitor->requestImmediateCheck();
+ LOG(kDefaultLogLevel) << _logPrefix()
+ << "start getHosts: " << readPrefToStringWithMinOpTime(criteria);
+
+ // fail fast on timeout
+ const Date_t& now = _executor->now();
+ if (deadline <= now) {
+ return _makeUnsatisfiedReadPrefError(criteria);
+ }
+
+ {
+ stdx::lock_guard lk(_mutex);
+
+ // We check if we are closed under the mutex here since someone could have called
+ // close() concurrently with the code above.
+ if (_isDropped.load()) {
+ return _makeReplicaSetMonitorRemovedError();
+ }
+
+ return _enqueueOutstandingQuery(lk, criteria, deadline);
+ }
+}
+
+SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutstandingQuery(
+ WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline) {
+ using HostAndPortList = std::vector<HostAndPort>;
+ Future<HostAndPortList> result;
+
+ auto query = std::make_shared<HostQuery>();
+ query->criteria = criteria;
+ query->deadline = deadline;
+
+ auto pf = makePromiseFuture<HostAndPortList>();
+ query->promise = std::move(pf.promise);
+
+ auto deadlineCb =
+ [this, query, self = shared_from_this()](const TaskExecutor::CallbackArgs& cbArgs) {
+ stdx::lock_guard lock(_mutex);
+ if (query->done) {
+ return;
+ }
+
+ const auto cbStatus = cbArgs.status;
+ if (!cbStatus.isOK()) {
+ query->promise.setError(cbStatus);
+ query->done = true;
+ return;
+ }
+
+ const auto errorStatus = _makeUnsatisfiedReadPrefError(query->criteria);
+ query->promise.setError(errorStatus);
+ query->done = true;
+ LOG(kDefaultLogLevel) << _logPrefix()
+ << "host selection timeout: " << errorStatus.toString();
+ };
+ auto swDeadlineHandle = _executor->scheduleWorkAt(query->deadline, deadlineCb);
+
+ if (!swDeadlineHandle.isOK()) {
+ log() << "error scheduling deadline handler: " << swDeadlineHandle.getStatus();
+ return SemiFuture<HostAndPortList>::makeReady(swDeadlineHandle.getStatus());
+ }
+ query->deadlineHandle = swDeadlineHandle.getValue();
+ _outstandingQueries.push_back(query);
+
+ // Send topology changes to the query processor to satisfy the future.
+ // It will be removed as a listener when all waiting queries have been satisfied.
+ _eventsPublisher->registerListener(_queryProcessor);
+
+ return std::move(pf.future).semi();
+}
+
+boost::optional<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_getHosts(
+ const TopologyDescriptionPtr& topology, const ReadPreferenceSetting& criteria) {
+ auto result = _serverSelector->selectServers(topology, criteria);
+ if (!result)
+ return boost::none;
+ return _extractHosts(*result);
+}
+
+boost::optional<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_getHosts(
+ const ReadPreferenceSetting& criteria) {
+ return _getHosts(_currentTopology(), criteria);
}
HostAndPort StreamableReplicaSetMonitor::getMasterOrUassert() {
- MONGO_UNREACHABLE;
+ return getHostOrRefresh(kPrimaryOnlyReadPreference).get();
}
-void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {}
+
+void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
+ failedHost(host, BSONObj(), status);
+}
+
+void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, BSONObj bson, const Status& status) {
+ IsMasterOutcome outcome(host.toString(), bson, status.toString());
+ _topologyManager->onServerDescription(outcome);
+}
+
+boost::optional<ServerDescriptionPtr> StreamableReplicaSetMonitor::_currentPrimary() const {
+ return _currentTopology()->getPrimary();
+}
+
bool StreamableReplicaSetMonitor::isPrimary(const HostAndPort& host) const {
- MONGO_UNREACHABLE;
+ const auto currentPrimary = _currentPrimary();
+ return (currentPrimary ? (*currentPrimary)->getAddress() == host.toString() : false);
}
bool StreamableReplicaSetMonitor::isHostUp(const HostAndPort& host) const {
- MONGO_UNREACHABLE;
+ auto currentTopology = _currentTopology();
+ const auto& serverDescription = currentTopology->findServerByAddress(host.toString());
+ return serverDescription && (*serverDescription)->getType() != ServerType::kUnknown;
}
int StreamableReplicaSetMonitor::getMinWireVersion() const {
- MONGO_UNREACHABLE;
+ auto currentTopology = _currentTopology();
+ const std::vector<ServerDescriptionPtr>& servers = currentTopology->getServers();
+ if (servers.size() > 0) {
+ const auto& serverDescription =
+ *std::min_element(servers.begin(), servers.end(), minWireCompare);
+ return serverDescription->getMinWireVersion();
+ } else {
+ return 0;
+ }
}
int StreamableReplicaSetMonitor::getMaxWireVersion() const {
- MONGO_UNREACHABLE;
+ auto currentTopology = _currentTopology();
+ const std::vector<ServerDescriptionPtr>& servers = currentTopology->getServers();
+ if (servers.size() > 0) {
+ const auto& serverDescription =
+ *std::max_element(servers.begin(), servers.end(), maxWireCompare);
+ return serverDescription->getMaxWireVersion();
+ } else {
+ return std::numeric_limits<int>::max();
+ }
}
std::string StreamableReplicaSetMonitor::getName() const {
- MONGO_UNREACHABLE;
+ return _uri.getSetName();
}
std::string StreamableReplicaSetMonitor::getServerAddress() const {
- MONGO_UNREACHABLE;
+ const auto topologyDescription = _currentTopology();
+ const auto servers = topologyDescription->getServers();
+
+ std::stringstream output;
+ output << _uri.getSetName() << "/";
+
+ for (const auto& server : servers) {
+ output << server->getAddress();
+ if (&server != &servers.back())
+ output << ",";
+ }
+
+ auto result = output.str();
+ return output.str();
}
const MongoURI& StreamableReplicaSetMonitor::getOriginalUri() const {
- MONGO_UNREACHABLE;
-};
-bool StreamableReplicaSetMonitor::contains(const HostAndPort& server) const {
- MONGO_UNREACHABLE;
+ return _uri;
}
-void StreamableReplicaSetMonitor::appendInfo(BSONObjBuilder& b, bool forFTDC) const {
- MONGO_UNREACHABLE;
+bool StreamableReplicaSetMonitor::contains(const HostAndPort& host) const {
+ return static_cast<bool>(_currentTopology()->findServerByAddress(host.toString()));
+}
+
+void StreamableReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const {
+ auto topologyDescription = _currentTopology();
+
+ BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName()));
+ if (forFTDC) {
+ for (auto serverDescription : topologyDescription->getServers()) {
+ monitorInfo.appendNumber(serverDescription->getAddress(),
+ pingTimeMillis(serverDescription));
+ }
+ return;
+ }
+
+ // NOTE: the format here must be consistent for backwards compatibility
+ BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts"));
+ for (const auto& serverDescription : topologyDescription->getServers()) {
+ bool isUp = false;
+ bool isMaster = false;
+ bool isSecondary = false;
+ bool isHidden = false;
+
+ switch (serverDescription->getType()) {
+ case ServerType::kRSPrimary:
+ isUp = true;
+ isMaster = true;
+ break;
+ case ServerType::kRSSecondary:
+ isUp = true;
+ isSecondary = true;
+ break;
+ case ServerType::kStandalone:
+ isUp = true;
+ break;
+ case ServerType::kMongos:
+ isUp = true;
+ break;
+ case ServerType::kRSGhost:
+ isHidden = true;
+ break;
+ case ServerType::kRSArbiter:
+ isHidden = true;
+ break;
+ default:
+ break;
+ }
+
+ BSONObjBuilder builder(hosts.subobjStart());
+ builder.append("addr", serverDescription->getAddress());
+ builder.append("ok", isUp);
+ builder.append("ismaster", isMaster); // intentionally not camelCase
+ builder.append("hidden", isHidden);
+ builder.append("secondary", isSecondary);
+ builder.append("pingTimeMillis", pingTimeMillis(serverDescription));
+
+ if (serverDescription->getTags().size()) {
+ BSONObjBuilder tagsBuilder(builder.subobjStart("tags"));
+ serverDescription->appendBsonTags(tagsBuilder);
+ }
+ }
}
bool StreamableReplicaSetMonitor::isKnownToHaveGoodPrimary() const {
- MONGO_UNREACHABLE;
+ return static_cast<bool>(_currentPrimary());
+}
+
+sdam::TopologyDescriptionPtr StreamableReplicaSetMonitor::_currentTopology() const {
+ return _topologyManager->getTopologyDescription();
+}
+
+void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent(
+ UUID topologyId,
+ TopologyDescriptionPtr previousDescription,
+ TopologyDescriptionPtr newDescription) {
+
+ // notify external components, if there are membership
+ // changes in the topology.
+ if (_hasMembershipChange(previousDescription, newDescription)) {
+ LOG(kDefaultLogLevel) << _logPrefix() << "Topology Change: " << newDescription->toString();
+
+ // TODO: remove when HostAndPort conversion is done.
+ std::vector<HostAndPort> servers = _extractHosts(newDescription->getServers());
+
+ auto connectionString = ConnectionString::forReplicaSet(getName(), servers);
+ auto maybePrimary = newDescription->getPrimary();
+ if (maybePrimary) {
+ // TODO: remove need for HostAndPort conversion
+ auto hostList = _extractHosts(newDescription->findServers(secondaryPredicate));
+ std::set<HostAndPort> secondaries(hostList.begin(), hostList.end());
+
+ auto primaryAddress = HostAndPort((*maybePrimary)->getAddress());
+ ReplicaSetMonitorManager::get()->getNotifier().onConfirmedSet(
+ connectionString, primaryAddress, secondaries);
+ } else {
+ ReplicaSetMonitorManager::get()->getNotifier().onPossibleSet(connectionString);
+ }
+ }
+}
+
+void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) {
+ IsMasterOutcome outcome(hostAndPort, reply, durationMs);
+ _topologyManager->onServerDescription(outcome);
}
+void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
+ Status errorStatus,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) {
+ IsMasterOutcome outcome(hostAndPort, reply, errorStatus.toString());
+ _topologyManager->onServerDescription(outcome);
+}
+
+void StreamableReplicaSetMonitor::onServerPingFailedEvent(const ServerAddress& hostAndPort,
+ const Status& status) {
+ failedHost(HostAndPort(hostAndPort), status);
+}
+
+void StreamableReplicaSetMonitor::onServerPingSucceededEvent(sdam::IsMasterRTT durationMS,
+ const ServerAddress& hostAndPort) {
+ _topologyManager->onServerRTTUpdated(hostAndPort, durationMS);
+}
+
+std::string StreamableReplicaSetMonitor::_logPrefix() {
+ return str::stream() << kLogPrefix << " [" << getName() << "] ";
+}
+
+void StreamableReplicaSetMonitor::_failOutstandingWithStatus(WithLock, Status status) {
+ for (const auto& query : _outstandingQueries) {
+ if (query->done)
+ continue;
+
+ query->done = true;
+ _executor->cancel(query->deadlineHandle);
+ query->promise.setError(status);
+ }
+ _outstandingQueries.clear();
+}
+
+bool StreamableReplicaSetMonitor::_hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription,
+ sdam::TopologyDescriptionPtr newDescription) {
+
+ if (oldDescription->getServers().size() != newDescription->getServers().size())
+ return true;
+
+ for (const auto& server : oldDescription->getServers()) {
+ const auto newServer = newDescription->findServerByAddress(server->getAddress());
+ if (!newServer)
+ return true;
+ const ServerDescription& s = *server;
+ const ServerDescription& ns = **newServer;
+ if (s != ns)
+ return true;
+ }
+
+ for (const auto& server : newDescription->getServers()) {
+ auto oldServer = oldDescription->findServerByAddress(server->getAddress());
+ if (!oldServer)
+ return true;
+ }
+
+ return false;
+}
+
+void StreamableReplicaSetMonitor::_processOutstanding(const TopologyDescriptionPtr& topologyDescription) {
+ // TODO: refactor so that we don't call _getHost(s) for every outstanding query
+ // since there might be duplicates.
+ stdx::lock_guard lock(_mutex);
+
+ bool shouldRemove;
+ auto it = _outstandingQueries.begin();
+ while (it != _outstandingQueries.end()) {
+ auto& query = *it;
+ shouldRemove = false;
+
+ if (query->done) {
+ shouldRemove = true;
+ } else {
+ auto result = _getHosts(topologyDescription, query->criteria);
+ if (result) {
+ _executor->cancel(query->deadlineHandle);
+ query->done = true;
+ query->promise.emplaceValue(std::move(*result));
+ LOG(kDefaultLogLevel)
+ << _logPrefix()
+ << "finish getHosts: " << readPrefToStringWithMinOpTime(query->criteria) << " ("
+ << _executor->now() - query->start << ")";
+ shouldRemove = true;
+ }
+ }
+
+ it = (shouldRemove) ? _outstandingQueries.erase(it) : ++it;
+ }
+
+ if (_outstandingQueries.size()) {
+ // enable expedited mode
+ _isMasterMonitor->requestImmediateCheck();
+ } else {
+ // if no more outstanding queries, no need to listen for topology changes in
+ // this monitor.
+ _eventsPublisher->removeListener(_queryProcessor);
+ }
+}
+
+Status StreamableReplicaSetMonitor::_makeUnsatisfiedReadPrefError(
+ const ReadPreferenceSetting& criteria) const {
+ return Status(ErrorCodes::FailedToSatisfyReadPreference,
+ str::stream() << "Could not find host matching read preference "
+ << criteria.toString() << " for set " << getName());
+}
+
+Status StreamableReplicaSetMonitor::_makeReplicaSetMonitorRemovedError() const {
+ return Status(ErrorCodes::ReplicaSetMonitorRemoved,
+ str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed");
+}
} // namespace mongo
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index 8c27301660e..9fbdf858f6e 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -38,7 +38,10 @@
#include "mongo/client/mongo_uri.h"
#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/client/sdam/sdam.h"
+#include "mongo/client/server_is_master_monitor.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/logger/log_component.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/duration.h"
#include "mongo/util/net/hostandport.h"
@@ -46,48 +49,162 @@
namespace mongo {
-class StreamableReplicaSetMonitor : public ReplicaSetMonitor {
- StreamableReplicaSetMonitor(const StreamableReplicaSetMonitor&) = delete;
- StreamableReplicaSetMonitor& operator=(const StreamableReplicaSetMonitor&) = delete;
+class BSONObj;
+class ReplicaSetMonitor;
+class ReplicaSetMonitorTest;
+struct ReadPreferenceSetting;
+using ReplicaSetMonitorPtr = std::shared_ptr<ReplicaSetMonitor>;
+
+/**
+ * Replica set monitor implementation backed by the classes in the mongo::sdam namespace.
+ *
+ * All methods perform the required synchronization to allow callers from multiple threads.
+ */
+class StreamableReplicaSetMonitor :
+ public ReplicaSetMonitor,
+ public sdam::TopologyListener,
+ public std::enable_shared_from_this<StreamableReplicaSetMonitor> {
+
+ StreamableReplicaSetMonitor(const ReplicaSetMonitor&) = delete;
+ StreamableReplicaSetMonitor& operator=(const ReplicaSetMonitor&) = delete;
public:
- StreamableReplicaSetMonitor(const MongoURI& uri);
+ class Refresher;
+
+ static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500);
+ static constexpr auto kCheckTimeout = Seconds(5);
+
+ StreamableReplicaSetMonitor(const MongoURI& uri,
+ std::shared_ptr<executor::TaskExecutor> executor);
+
+ void init();
- void init() override;
+ void drop();
- void drop() override;
+ static ReplicaSetMonitorPtr make(const MongoURI& uri,
+ std::shared_ptr<executor::TaskExecutor> executor = nullptr);
- SemiFuture<HostAndPort> getHostOrRefresh(
- const ReadPreferenceSetting& readPref,
- Milliseconds maxWait = kDefaultFindHostTimeout) override;
+ SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait = kDefaultFindHostTimeout);
SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
- const ReadPreferenceSetting& readPref,
- Milliseconds maxWait = kDefaultFindHostTimeout) override;
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout);
+
+ HostAndPort getMasterOrUassert();
+
+ void failedHost(const HostAndPort& host, const Status& status);
+ void failedHost(const HostAndPort& host, BSONObj bson, const Status& status);
+
+ bool isPrimary(const HostAndPort& host) const;
+
+ bool isHostUp(const HostAndPort& host) const;
+
+ int getMinWireVersion() const;
+
+ int getMaxWireVersion() const;
+
+ std::string getName() const;
+
+ std::string getServerAddress() const;
+
+ const MongoURI& getOriginalUri() const;
+
+ bool contains(const HostAndPort& server) const;
+
+ void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const;
+
+ bool isKnownToHaveGoodPrimary() const;
+
+private:
+ class StreamableReplicaSetMonitorQueryProcessor;
+ using StreamableReplicaSetMontiorQueryProcessorPtr =
+ std::shared_ptr<StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor>;
+
+ struct HostQuery {
+ Date_t deadline;
+ executor::TaskExecutor::CallbackHandle deadlineHandle;
+ ReadPreferenceSetting criteria;
+ Date_t start = Date_t::now();
+ bool done = false;
+ Promise<std::vector<HostAndPort>> promise;
+ };
+ using HostQueryPtr = std::shared_ptr<HostQuery>;
+
+ SemiFuture<std::vector<HostAndPort>> _enqueueOutstandingQuery(
+ WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline);
+
+ std::vector<HostAndPort> _extractHosts(
+ const std::vector<sdam::ServerDescriptionPtr>& serverDescriptions);
+ boost::optional<std::vector<HostAndPort>> _getHosts(const TopologyDescriptionPtr& topology,
+ const ReadPreferenceSetting& criteria);
+ boost::optional<std::vector<HostAndPort>> _getHosts(const ReadPreferenceSetting& criteria);
+
+ void onTopologyDescriptionChangedEvent(UUID topologyId,
+ sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription) override;
+
+ void onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs,
+ const sdam::ServerAddress& hostAndPort,
+ const BSONObj reply) override;
+
+ void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
+ Status errorStatus,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) override;
+
+ void onServerPingFailedEvent(const sdam::ServerAddress& hostAndPort,
+ const Status& status) override;
+
+ void onServerPingSucceededEvent(sdam::IsMasterRTT durationMS,
+ const sdam::ServerAddress& hostAndPort) override;
+
+ // Get a pointer to the current primary's ServerDescription
+ // To ensure a consistent view of the Topology either _currentPrimary or _currentTopology should
+ // be called (not both) since the topology can change between the function invocations.
+ boost::optional<sdam::ServerDescriptionPtr> _currentPrimary() const;
- HostAndPort getMasterOrUassert() override;
+ // Get the current TopologyDescription
+ // Note that most functions will want to save the result of this function once per computation
+ // so that we are operating on a consistent read-only view of the topology.
+ sdam::TopologyDescriptionPtr _currentTopology() const;
- void failedHost(const HostAndPort& host, const Status& status) override;
+ std::string _logPrefix();
- bool isPrimary(const HostAndPort& host) const override;
+ void _failOutstandingWithStatus(WithLock, Status status);
+ bool _hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription,
+ sdam::TopologyDescriptionPtr newDescription);
- bool isHostUp(const HostAndPort& host) const override;
+ Status _makeUnsatisfiedReadPrefError(const ReadPreferenceSetting& criteria) const;
+ Status _makeReplicaSetMonitorRemovedError() const;
- int getMinWireVersion() const override;
+ // Try to satisfy the outstanding queries for this instance with the given topology information.
+ void _processOutstanding(const TopologyDescriptionPtr& topologyDescription);
- int getMaxWireVersion() const override;
+ sdam::SdamConfiguration _sdamConfig;
+ sdam::TopologyManagerPtr _topologyManager;
+ sdam::ServerSelectorPtr _serverSelector;
+ sdam::TopologyEventsPublisherPtr _eventsPublisher;
+ ServerIsMasterMonitorPtr _isMasterMonitor;
- std::string getName() const override;
+ // This object will be registered as a TopologyListener if there are
+ // any outstanding queries for this RSM instance.
+ StreamableReplicaSetMontiorQueryProcessorPtr _queryProcessor;
- std::string getServerAddress() const override;
+ const MongoURI _uri;
- const MongoURI& getOriginalUri() const override;
+ std::shared_ptr<executor::TaskExecutor> _executor;
- bool contains(const HostAndPort& server) const override;
+ AtomicWord<bool> _isDropped{true};
- void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicaSetMonitor");
+ std::vector<HostQueryPtr> _outstandingQueries;
+ mutable PseudoRandom _random;
- bool isKnownToHaveGoodPrimary() const override;
+ static inline const auto kServerSelectionConfig =
+ sdam::ServerSelectionConfiguration::defaultConfiguration();
+ static inline const auto kDefaultLogLevel = logger::LogSeverity::Debug(1);
+ static inline const auto kLowerLogLevel = kDefaultLogLevel.lessSevere();
+ static constexpr auto kLogPrefix = "[ReplicaSetMonitor]";
};
} // namespace mongo
diff --git a/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp b/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp
new file mode 100644
index 00000000000..5cadf92266f
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+#include "mongo/client/streamable_replica_set_monitor_query_processor.h"
+
+#include <memory>
+
+#include "mongo/client/global_conn_pool.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+void StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor::shutdown() {
+ stdx::lock_guard lock(_mutex);
+ _isShutdown = true;
+}
+
+void StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor::
+ onTopologyDescriptionChangedEvent(UUID topologyId,
+ sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription) {
+ {
+ stdx::lock_guard lock(_mutex);
+ if (_isShutdown)
+ return;
+ }
+
+ const auto& setName = newDescription->getSetName();
+ if (setName) {
+ auto replicaSetMonitor = std::static_pointer_cast<StreamableReplicaSetMonitor>(globalRSMonitorManager.getMonitor(*setName));
+ if (!replicaSetMonitor) {
+ LOG(kLogLevel) << "could not find rsm instance " << *setName
+ << " for query processing.";
+ return;
+ }
+ replicaSetMonitor->_processOutstanding(newDescription);
+ }
+
+ // No set name occurs when there is an error monitoring isMaster replies (e.g. HostUnreachable).
+ // There is nothing to do in that case.
+}
+}; // namespace mongo
diff --git a/src/mongo/client/streamable_replica_set_monitor_query_processor.h b/src/mongo/client/streamable_replica_set_monitor_query_processor.h
new file mode 100644
index 00000000000..bac2d42cc45
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor_query_processor.h
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/client/streamable_replica_set_monitor.h"
+#include "mongo/client/sdam/sdam.h"
+
+namespace mongo {
+class StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor final : public sdam::TopologyListener {
+public:
+ void shutdown();
+
+ void onTopologyDescriptionChangedEvent(UUID topologyId,
+ sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription) override;
+
+private:
+ static inline const logger::LogSeverity kLogLevel = logger::LogSeverity::Debug(1);
+
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicaSetMonitorQueryProcessor");
+ bool _isShutdown = false;
+};
+} // namespace mongo