From 50da4a76ed2f3abbdf9d8d40ad57c5fb5169b5fb Mon Sep 17 00:00:00 2001 From: Lamont Nelson Date: Fri, 21 Feb 2020 17:12:29 -0500 Subject: SERVER-43332: new RSM implementation based on sdam --- .../replica_set_connection_error_codes.js | 17 +- src/mongo/client/SConscript | 4 + src/mongo/client/replica_set_monitor_manager.cpp | 305 ++++++------ src/mongo/client/replica_set_monitor_manager.h | 4 +- src/mongo/client/sdam/SConscript | 14 + src/mongo/client/sdam/sdam.h | 37 ++ src/mongo/client/sdam/sdam_configuration.cpp | 95 ++++ src/mongo/client/sdam/sdam_configuration.h | 100 ++++ src/mongo/client/sdam/sdam_datatypes.h | 6 + src/mongo/client/sdam/server_description.cpp | 27 +- src/mongo/client/sdam/server_description.h | 7 + src/mongo/client/sdam/server_selector.cpp | 267 ++++++++++ src/mongo/client/sdam/server_selector.h | 193 +++++++ src/mongo/client/sdam/server_selector_test.cpp | 463 +++++++++++++++++ src/mongo/client/sdam/topology_description.cpp | 65 +-- src/mongo/client/sdam/topology_description.h | 72 +-- .../client/sdam/topology_description_test.cpp | 90 ++-- src/mongo/client/sdam/topology_listener.cpp | 163 ++++++ src/mongo/client/sdam/topology_listener.h | 77 ++- src/mongo/client/sdam/topology_manager.cpp | 68 ++- src/mongo/client/sdam/topology_manager.h | 30 +- src/mongo/client/sdam/topology_state_machine.h | 1 + src/mongo/client/server_is_master_monitor.cpp | 375 ++++++++++++++ src/mongo/client/server_is_master_monitor.h | 135 +++++ .../client/streamable_replica_set_monitor.cpp | 553 +++++++++++++++++++-- src/mongo/client/streamable_replica_set_monitor.h | 163 +++++- ...eamable_replica_set_monitor_query_processor.cpp | 67 +++ ...treamable_replica_set_monitor_query_processor.h | 49 ++ 28 files changed, 3060 insertions(+), 387 deletions(-) create mode 100644 src/mongo/client/sdam/sdam.h create mode 100644 src/mongo/client/sdam/sdam_configuration.cpp create mode 100644 src/mongo/client/sdam/sdam_configuration.h create mode 100644 src/mongo/client/sdam/server_selector.cpp create mode 100644 src/mongo/client/sdam/server_selector.h create mode 100644 src/mongo/client/sdam/server_selector_test.cpp create mode 100644 src/mongo/client/sdam/topology_listener.cpp create mode 100644 src/mongo/client/server_is_master_monitor.cpp create mode 100644 src/mongo/client/server_is_master_monitor.h create mode 100644 src/mongo/client/streamable_replica_set_monitor_query_processor.cpp create mode 100644 src/mongo/client/streamable_replica_set_monitor_query_processor.h diff --git a/jstests/noPassthrough/replica_set_connection_error_codes.js b/jstests/noPassthrough/replica_set_connection_error_codes.js index d431415ee6d..835d712a45e 100644 --- a/jstests/noPassthrough/replica_set_connection_error_codes.js +++ b/jstests/noPassthrough/replica_set_connection_error_codes.js @@ -56,14 +56,15 @@ const awaitShell = stepDownPrimary(rst); rst.getPrimary(); rst.awaitNodesAgreeOnPrimary(); -// DBClientRS will continue to send command requests to the node it believed to be primary even -// after it stepped down so long as it hasn't closed its connection. -assert.commandFailedWithCode(rsConn.getDB("test").runCommand({create: "mycoll"}), - ErrorCodes.NotMaster); - -// However, once the server responds back with a ErrorCodes.NotMaster error, DBClientRS will -// cause the ReplicaSetMonitor to attempt to discover the current primary. -assert.commandWorked(rsConn.getDB("test").runCommand({create: "mycoll"})); +// DBClientRS should discover the current primary eventually and get NotMaster errors in the +// meantime. +assert.soon(() => { + const res = rsConn.getDB("test").runCommand({create: "mycoll"}); + if (!res.ok) { + assert(res.code == ErrorCodes.NotMaster); + } + return res.ok; +}); try { assert.commandWorked(directConn.adminCommand({configureFailPoint: failpoint, mode: "off"})); diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index b06b7d6f20d..dd7c410279c 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -199,6 +199,9 @@ clientDriverEnv.Library( 'replica_set_monitor_manager.cpp', env.Idlc('replica_set_monitor_params.idl')[0], 'server_ping_monitor.cpp', + 'server_is_master_monitor.cpp', + 'streamable_replica_set_monitor.cpp', + 'streamable_replica_set_monitor_query_processor.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/write_concern_options', @@ -210,6 +213,7 @@ clientDriverEnv.Library( '$BUILD_DIR/mongo/util/background_job', '$BUILD_DIR/mongo/util/md5', '$BUILD_DIR/mongo/util/net/network', + '$BUILD_DIR/mongo/client/sdam/sdam', 'clientdriver_minimal', 'read_preference', ], diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index 1e81bbbb688..193a799806d 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -38,8 +38,8 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/connection_string.h" #include "mongo/client/mongo_uri.h" -#include "mongo/client/replica_set_monitor_params_gen.h" #include "mongo/client/scanning_replica_set_monitor.h" +#include "mongo/client/replica_set_monitor.h" #include "mongo/client/streamable_replica_set_monitor.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface_factory.h" @@ -47,201 +47,196 @@ #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/logv2/log.h" #include "mongo/platform/mutex.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/util/map_util.h" +#include "mongo/client/replica_set_monitor_params_gen.h" -namespace mongo { + namespace mongo { -using std::set; -using std::shared_ptr; -using std::string; -using std::vector; + using std::set; + using std::shared_ptr; + using std::string; + using std::vector; -using executor::NetworkInterface; -using executor::NetworkInterfaceThreadPool; -using executor::TaskExecutor; -using executor::TaskExecutorPool; -using executor::ThreadPoolTaskExecutor; + using executor::NetworkInterface; + using executor::NetworkInterfaceThreadPool; + using executor::TaskExecutor; + using executor::TaskExecutorPool; + using executor::ThreadPoolTaskExecutor; -namespace { -const auto getGlobalRSMMonitorManager = - ServiceContext::declareDecoration(); -} // namespace + namespace { + const auto getGlobalRSMMonitorManager = + ServiceContext::declareDecoration(); + } // namespace -ReplicaSetMonitorManager::~ReplicaSetMonitorManager() { - shutdown(); -} + ReplicaSetMonitorManager::~ReplicaSetMonitorManager() { + shutdown(); + } -ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() { - return &getGlobalRSMMonitorManager(getGlobalServiceContext()); -} + ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() { + return &getGlobalRSMMonitorManager(getGlobalServiceContext()); + } -shared_ptr ReplicaSetMonitorManager::getMonitor(StringData setName) { - stdx::lock_guard lk(_mutex); + shared_ptr ReplicaSetMonitorManager::getMonitor(StringData setName) { + stdx::lock_guard lk(_mutex); - if (auto monitor = _monitors[setName].lock()) { - return monitor; - } else { - return shared_ptr(); + if (auto monitor = _monitors[setName].lock()) { + return monitor; + } else { + return shared_ptr(); + } } -} -void ReplicaSetMonitorManager::_setupTaskExecutorInLock() { - if (_isShutdown || _taskExecutor) { - // do not restart taskExecutor if is in shutdown - return; - } + void ReplicaSetMonitorManager::_setupTaskExecutorInLock() { + if (_isShutdown || _taskExecutor) { + // do not restart taskExecutor if is in shutdown + return; + } - // construct task executor - auto hookList = std::make_unique(); - auto net = executor::makeNetworkInterface( - "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList)); - auto pool = std::make_unique(net.get()); - _taskExecutor = std::make_unique(std::move(pool), std::move(net)); - _taskExecutor->startup(); -} - -namespace { -void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) { - uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b); -} -} // namespace - -shared_ptr ReplicaSetMonitorManager::getOrCreateMonitor( - const ConnectionString& connStr) { - return getOrCreateMonitor(MongoURI(connStr)); -} - -shared_ptr ReplicaSetMonitorManager::getOrCreateMonitor(const MongoURI& uri) { - invariant(uri.type() == ConnectionString::SET); - - stdx::lock_guard lk(_mutex); - uassert(ErrorCodes::ShutdownInProgress, - str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown", - !_isShutdown); - - _setupTaskExecutorInLock(); - const auto& setName = uri.getSetName(); - auto monitor = _monitors[setName].lock(); - if (monitor) { - uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode()); - return monitor; + // construct task executor + auto hookList = std::make_unique(); + auto net = executor::makeNetworkInterface( + "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList)); + auto pool = std::make_unique(net.get()); + _taskExecutor = std::make_shared(std::move(pool), std::move(net)); + _taskExecutor->startup(); } - LOGV2(20186, "Starting new replica set monitor for {uri}", "uri"_attr = uri.toString()); + namespace { + void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) { + uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b); + } + } // namespace - if (disableStreamableReplicaSetMonitor.load()) { - auto newMonitor = std::make_shared(uri); - _monitors[setName] = newMonitor; - newMonitor->init(); - return newMonitor; - } else { - uasserted(31451, "StreamableReplicaSetMonitor is not yet implemented"); + shared_ptr ReplicaSetMonitorManager::getOrCreateMonitor( + const ConnectionString& connStr) { + return getOrCreateMonitor(MongoURI(connStr)); } -} -vector ReplicaSetMonitorManager::getAllSetNames() { - vector allNames; + shared_ptr ReplicaSetMonitorManager::getOrCreateMonitor( + const MongoURI& uri) { + invariant(uri.type() == ConnectionString::SET); + stdx::lock_guard lk(_mutex); + uassert(ErrorCodes::ShutdownInProgress, + str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown", + !_isShutdown); + + _setupTaskExecutorInLock(); + const auto& setName = uri.getSetName(); + auto monitor = _monitors[setName].lock(); + if (monitor) { + uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode()); + return monitor; + } - stdx::lock_guard lk(_mutex); + log() << "Starting new replica set monitor for " << uri.toString(); - for (const auto& entry : _monitors) { - allNames.push_back(entry.first); + std::shared_ptr newMonitor; + if (disableStreamableReplicaSetMonitor.load()) { + newMonitor = std::make_shared(uri); + newMonitor->init(); + } else { + newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor()); + } + _monitors[setName] = newMonitor; + return newMonitor; } - return allNames; -} + vector ReplicaSetMonitorManager::getAllSetNames() { + vector allNames; + + stdx::lock_guard lk(_mutex); -void ReplicaSetMonitorManager::removeMonitor(StringData setName) { - stdx::lock_guard lk(_mutex); - ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName); - if (it != _monitors.end()) { - if (auto monitor = it->second.lock()) { - monitor->drop(); + for (const auto& entry : _monitors) { + allNames.push_back(entry.first); } - _monitors.erase(it); - LOGV2( - 20187, "Removed ReplicaSetMonitor for replica set {setName}", "setName"_attr = setName); + + return allNames; } -} -void ReplicaSetMonitorManager::shutdown() { - decltype(_monitors) monitors; - decltype(_taskExecutor) taskExecutor; - { + void ReplicaSetMonitorManager::removeMonitor(StringData setName) { stdx::lock_guard lk(_mutex); - if (std::exchange(_isShutdown, true)) { - return; + ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName); + if (it != _monitors.end()) { + if (auto monitor = it->second.lock()) { + monitor->drop(); + } + _monitors.erase(it); + log() << "Removed ReplicaSetMonitor for replica set " << setName; } - - monitors = std::exchange(_monitors, {}); - taskExecutor = std::exchange(_taskExecutor, {}); } - if (taskExecutor) { - LOGV2_DEBUG(20188, 1, "Shutting down task executor used for monitoring replica sets"); - taskExecutor->shutdown(); - } + void ReplicaSetMonitorManager::shutdown() { + decltype(_monitors) monitors; + decltype(_taskExecutor) taskExecutor; + { + stdx::lock_guard lk(_mutex); + if (std::exchange(_isShutdown, true)) { + return; + } + + monitors = std::exchange(_monitors, {}); + taskExecutor = std::exchange(_taskExecutor, {}); + } - if (monitors.size()) { - LOGV2(20189, "Dropping all ongoing scans against replica sets"); - } - for (auto& [name, monitor] : monitors) { - auto anchor = monitor.lock(); - if (!anchor) { - continue; + if (taskExecutor) { + LOG(1) << "Shutting down task executor used for monitoring replica sets"; + taskExecutor->shutdown(); } - anchor->drop(); - } + for (auto& [name, monitor] : monitors) { + auto anchor = monitor.lock(); + if (!anchor) { + continue; + } + anchor->drop(); + } - if (taskExecutor) { - taskExecutor->join(); + if (taskExecutor) { + taskExecutor->join(); + } } -} -void ReplicaSetMonitorManager::removeAllMonitors() { - shutdown(); + void ReplicaSetMonitorManager::removeAllMonitors() { + shutdown(); - { - stdx::lock_guard lk(_mutex); - _isShutdown = false; - } -} - -void ReplicaSetMonitorManager::report(BSONObjBuilder* builder, bool forFTDC) { - // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the - // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and - // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook potentially - // calling ShardRegistry::updateConfigServerConnectionString. - auto setNames = getAllSetNames(); - - BSONObjBuilder setStats( - builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets")); - - for (const auto& setName : setNames) { - auto monitor = getMonitor(setName); - if (!monitor) { - continue; + { + stdx::lock_guard lk(_mutex); + _isShutdown = false; } - monitor->appendInfo(setStats, forFTDC); } -} -TaskExecutor* ReplicaSetMonitorManager::getExecutor() { - invariant(_taskExecutor); - return _taskExecutor.get(); -} + void ReplicaSetMonitorManager::report(BSONObjBuilder * builder, bool forFTDC) { + // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the + // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and + // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook + // potentially calling ShardRegistry::updateConfigServerConnectionString. + auto setNames = getAllSetNames(); + + BSONObjBuilder setStats( + builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets")); + + for (const auto& setName : setNames) { + auto monitor = getMonitor(setName); + if (!monitor) { + continue; + } + monitor->appendInfo(setStats, forFTDC); + } + } -ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() { - return _notifier; -} + std::shared_ptr ReplicaSetMonitorManager::getExecutor() { + invariant(_taskExecutor); + return _taskExecutor; + } -bool ReplicaSetMonitorManager::isShutdown() const { - stdx::lock_guard lk(_mutex); - return _isShutdown; -} + ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() { + return _notifier; + } + bool ReplicaSetMonitorManager::isShutdown() const { + stdx::lock_guard lk(_mutex); + return _isShutdown; + } } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h index 10f8d5b4d19..7a19071014f 100644 --- a/src/mongo/client/replica_set_monitor_manager.h +++ b/src/mongo/client/replica_set_monitor_manager.h @@ -97,7 +97,7 @@ public: /** * Returns an executor for running RSM tasks. */ - executor::TaskExecutor* getExecutor(); + std::shared_ptr getExecutor(); ReplicaSetChangeNotifier& getNotifier(); @@ -111,7 +111,7 @@ private: MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "ReplicaSetMonitorManager::_mutex"); // Executor for monitoring replica sets. - std::unique_ptr _taskExecutor; + std::shared_ptr _taskExecutor; // Widget to notify listeners when a RSM notices a change ReplicaSetChangeNotifier _notifier; diff --git a/src/mongo/client/sdam/SConscript b/src/mongo/client/sdam/SConscript index 0a10d22332f..6cc5c642ad6 100644 --- a/src/mongo/client/sdam/SConscript +++ b/src/mongo/client/sdam/SConscript @@ -7,16 +7,21 @@ env = env.Clone() env.Library( target='sdam', source=[ + 'sdam_configuration.cpp', 'sdam_datatypes.cpp', 'server_description.cpp', 'topology_description.cpp', + 'topology_listener.cpp', 'topology_state_machine.cpp', 'topology_manager.cpp', + 'server_selector.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/util/clock_sources', + '$BUILD_DIR/mongo/client/read_preference', + '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/db/wire_version', '$BUILD_DIR/mongo/rpc/metadata', ], @@ -67,6 +72,15 @@ env.CppUnitTest( LIBDEPS=['sdam', 'sdam_test'], ) +env.CppUnitTest( + target='server_selector_test', + source=['server_selector_test.cpp'], + LIBDEPS=[ + 'sdam', + 'sdam_test', + ], +) + env.CppUnitTest( target='topology_state_machine_test', source=['topology_state_machine_test.cpp'], diff --git a/src/mongo/client/sdam/sdam.h b/src/mongo/client/sdam/sdam.h new file mode 100644 index 00000000000..16645e2418a --- /dev/null +++ b/src/mongo/client/sdam/sdam.h @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/client/sdam/sdam.h" +#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/client/sdam/server_description.h" +#include "mongo/client/sdam/server_selector.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/client/sdam/topology_listener.h" +#include "mongo/client/sdam/topology_manager.h" diff --git a/src/mongo/client/sdam/sdam_configuration.cpp b/src/mongo/client/sdam/sdam_configuration.cpp new file mode 100644 index 00000000000..d9852ddae94 --- /dev/null +++ b/src/mongo/client/sdam/sdam_configuration.cpp @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "sdam_configuration.h" + +namespace mongo::sdam { +SdamConfiguration::SdamConfiguration(boost::optional> seedList, + TopologyType initialType, + mongo::Milliseconds heartBeatFrequencyMs, + boost::optional setName) + : _seedList(seedList), + _initialType(initialType), + _heartBeatFrequencyMs(heartBeatFrequencyMs), + _setName(setName) { + uassert(ErrorCodes::InvalidSeedList, + "seed list size must be >= 1", + !seedList || (*seedList).size() >= 1); + + uassert(ErrorCodes::InvalidSeedList, + "TopologyType Single must have exactly one entry in the seed list.", + _initialType != TopologyType::kSingle || (*seedList).size() == 1); + + uassert( + ErrorCodes::InvalidTopologyType, + "Only ToplogyTypes ReplicaSetNoPrimary and Single are allowed when a setName is provided.", + !_setName || + (_initialType == TopologyType::kReplicaSetNoPrimary || + _initialType == TopologyType::kSingle)); + + uassert(ErrorCodes::TopologySetNameRequired, + "setName is required for ReplicaSetNoPrimary", + _initialType != TopologyType::kReplicaSetNoPrimary || _setName); + + uassert(ErrorCodes::InvalidHeartBeatFrequency, + "topology heartbeat must be >= 500ms", + _heartBeatFrequencyMs >= kMinHeartbeatFrequencyMS); +} + +const boost::optional>& SdamConfiguration::getSeedList() const { + return _seedList; +} + +TopologyType SdamConfiguration::getInitialType() const { + return _initialType; +} + +Milliseconds SdamConfiguration::getHeartBeatFrequency() const { + return _heartBeatFrequencyMs; +} + +const boost::optional& SdamConfiguration::getSetName() const { + return _setName; +} + + +ServerSelectionConfiguration::ServerSelectionConfiguration( + const Milliseconds localThresholdMs, const Milliseconds serverSelectionTimeoutMs) + : _localThresholdMs(localThresholdMs), _serverSelectionTimeoutMs(serverSelectionTimeoutMs) {} + +Milliseconds ServerSelectionConfiguration::getLocalThresholdMs() const { + return _localThresholdMs; +} + +Milliseconds ServerSelectionConfiguration::getServerSelectionTimeoutMs() const { + return _serverSelectionTimeoutMs; +} +Milliseconds ServerSelectionConfiguration::getHeartBeatFrequencyMs() const { + return _heartBeatFrequencyMs; +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/sdam_configuration.h b/src/mongo/client/sdam/sdam_configuration.h new file mode 100644 index 00000000000..895e52f4d87 --- /dev/null +++ b/src/mongo/client/sdam/sdam_configuration.h @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/client/sdam/sdam_datatypes.h" + +namespace mongo::sdam { +class SdamConfiguration { +public: + SdamConfiguration() : SdamConfiguration(boost::none){}; + + /** + * Initialize the TopologyDescription. This constructor may uassert if the provided + * configuration options are not valid according to the Server Discovery & Monitoring Spec. + * + * Initial Servers + * initial servers may be set to a seed list of one or more server addresses. + * + * Initial TopologyType + * The initial TopologyType may be set to Single, Unknown, or ReplicaSetNoPrimary. + * + * Initial setName + * The client's initial replica set name is required in order to initially configure the + * topology type as ReplicaSetNoPrimary. + * + * Allowed configuration combinations + * TopologyType Single cannot be used with multiple seeds. + * If setName is not null, only TopologyType ReplicaSetNoPrimary and Single, are + * allowed. + */ + explicit SdamConfiguration(boost::optional> seedList, + TopologyType initialType = TopologyType::kUnknown, + Milliseconds heartBeatFrequencyMs = kDefaultHeartbeatFrequencyMs, + boost::optional setName = boost::none); + + const boost::optional>& getSeedList() const; + TopologyType getInitialType() const; + Milliseconds getHeartBeatFrequency() const; + const boost::optional& getSetName() const; + + static constexpr Milliseconds kDefaultHeartbeatFrequencyMs = Seconds(10); + static constexpr Milliseconds kMinHeartbeatFrequencyMS = Milliseconds(500); + static constexpr Milliseconds kDefaultConnectTimeoutMS = Milliseconds(100); + +private: + boost::optional> _seedList; + TopologyType _initialType; + Milliseconds _heartBeatFrequencyMs; + boost::optional _setName; +}; + +class ServerSelectionConfiguration { +public: + explicit ServerSelectionConfiguration(const Milliseconds localThresholdMs, + const Milliseconds serverSelectionTimeoutMs); + + Milliseconds getLocalThresholdMs() const; + Milliseconds getServerSelectionTimeoutMs() const; + Milliseconds getHeartBeatFrequencyMs() const; + + static constexpr Milliseconds kDefaultLocalThresholdMS = Milliseconds(15); + static constexpr Milliseconds kDefaultServerSelectionTimeoutMs = Milliseconds(30000); + + static ServerSelectionConfiguration defaultConfiguration() { + return ServerSelectionConfiguration{kDefaultLocalThresholdMS, + kDefaultServerSelectionTimeoutMs}; + } + +private: + Milliseconds _localThresholdMs = kDefaultLocalThresholdMS; + Milliseconds _serverSelectionTimeoutMs = kDefaultServerSelectionTimeoutMs; + Milliseconds _heartBeatFrequencyMs = SdamConfiguration::kDefaultHeartbeatFrequencyMs; +}; +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h index d3f7e7f3b50..520d517da87 100644 --- a/src/mongo/client/sdam/sdam_datatypes.h +++ b/src/mongo/client/sdam/sdam_datatypes.h @@ -126,4 +126,10 @@ using ServerDescriptionPtr = std::shared_ptr; class TopologyDescription; using TopologyDescriptionPtr = std::shared_ptr; + +class TopologyManager; +using TopologyManagerPtr = std::unique_ptr; + +class TopologyListener; +using TopologyListenerPtr = std::shared_ptr; }; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_description.cpp b/src/mongo/client/sdam/server_description.cpp index 1674840a5a6..42dd37d0cdc 100644 --- a/src/mongo/client/sdam/server_description.cpp +++ b/src/mongo/client/sdam/server_description.cpp @@ -26,9 +26,9 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/client/sdam/server_description.h" +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include #include @@ -137,6 +137,14 @@ void ServerDescription::saveTags(BSONObj tagsObj) { } } +void ServerDescription::appendBsonTags(BSONObjBuilder& builder) const { + for (const auto& pair : _tags) { + const auto& key = pair.first; + const auto& value = pair.second; + builder.append(key, value); + } +} + void ServerDescription::saveElectionId(BSONElement electionId) { if (electionId.type() == jstOID) { _electionId = electionId.OID(); @@ -399,6 +407,11 @@ BSONObj ServerDescription::toBson() const { bson.append("arbiters", _arbiters); bson.append("passives", _passives); + if (getTags().size()) { + BSONObjBuilder tagsBuilder(bson.subobjStart("tags")); + appendBsonTags(tagsBuilder); + } + return bson.obj(); } @@ -414,6 +427,18 @@ std::string ServerDescription::toString() const { return toBson().toString(); } +ServerDescriptionPtr ServerDescription::cloneWithRTT(IsMasterRTT rtt) { + auto newServerDescription = std::make_shared(*this); + newServerDescription->_rtt = rtt; + return newServerDescription; +} + +const boost::optional ServerDescription::getTopologyDescription() { + return (_topologyDescription) + ? boost::optional(_topologyDescription->lock()) + : boost::none; +} + bool operator==(const mongo::sdam::ServerDescription& a, const mongo::sdam::ServerDescription& b) { return a.isEquivalent(b); diff --git a/src/mongo/client/sdam/server_description.h b/src/mongo/client/sdam/server_description.h index 6fe02a5a9d8..7d95331fa0e 100644 --- a/src/mongo/client/sdam/server_description.h +++ b/src/mongo/client/sdam/server_description.h @@ -78,6 +78,7 @@ public: const boost::optional& getMe() const; const boost::optional& getSetName() const; const std::map& getTags() const; + void appendBsonTags(BSONObjBuilder& builder) const; // network attributes const boost::optional& getError() const; @@ -104,9 +105,11 @@ public: const boost::optional& getSetVersion() const; const boost::optional& getElectionId() const; const boost::optional& getTopologyVersion() const; + const boost::optional getTopologyDescription(); BSONObj toBson() const; std::string toString() const; + ServerDescriptionPtr cloneWithRTT(IsMasterRTT rtt); private: /** @@ -202,6 +205,10 @@ private: // pool for server. Incremented on network error or timeout. int _poolResetCounter = 0; + // The topology description of that we are a part of + boost::optional> _topologyDescription; + + friend class TopologyDescription; friend class ServerDescriptionBuilder; }; diff --git a/src/mongo/client/sdam/server_selector.cpp b/src/mongo/client/sdam/server_selector.cpp new file mode 100644 index 00000000000..31bacb1b43e --- /dev/null +++ b/src/mongo/client/sdam/server_selector.cpp @@ -0,0 +1,267 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "server_selector.h" + +#include + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/client/sdam/topology_description.h" +#include "mongo/platform/random.h" +#include "mongo/util/log.h" + +namespace mongo::sdam { +ServerSelector::~ServerSelector() {} + +SdamServerSelector::SdamServerSelector(const ServerSelectionConfiguration& config) + : _config(config), _random(PseudoRandom(SecureRandom().nextInt64())) {} + +void SdamServerSelector::_getCandidateServers(std::vector* result, + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria) { + // when querying the primary we don't need to consider tags + bool shouldTagFilter = true; + + // TODO: check to see if we want to enforce minOpTime at all since + // it was effectively optional in the original implementation. + // TODO: the old version of the RSM does this, and many of + // the tests seem to rely on this behavior for correctness. + if (!criteria.minOpTime.isNull()) { + auto eligibleServers = topologyDescription->findServers([](const ServerDescriptionPtr& s) { + return (s->getType() == ServerType::kRSPrimary || + s->getType() == ServerType::kRSSecondary); + }); + + auto beginIt = eligibleServers.begin(); + auto endIt = eligibleServers.end(); + auto maxIt = std::max_element(beginIt, + endIt, + [topologyDescription](const ServerDescriptionPtr& left, + const ServerDescriptionPtr& right) { + return left->getOpTime() < right->getOpTime(); + }); + if (maxIt != endIt) { + auto maxOpTime = (*maxIt)->getOpTime(); + if (maxOpTime && maxOpTime < criteria.minOpTime) { + // ignore minOpTime + const_cast(criteria) = ReadPreferenceSetting(criteria.pref); + log() << "ignoring minOpTime for " << criteria.toString(); + } + } + } + + switch (criteria.pref) { + case ReadPreference::Nearest: + *result = topologyDescription->findServers(nearestFilter(criteria)); + break; + + case ReadPreference::SecondaryOnly: + *result = topologyDescription->findServers(secondaryFilter(criteria)); + break; + + case ReadPreference::PrimaryOnly: { + const auto primaryCriteria = ReadPreferenceSetting(criteria.pref); + *result = topologyDescription->findServers(primaryFilter(primaryCriteria)); + shouldTagFilter = false; + break; + } + + case ReadPreference::PrimaryPreferred: { + // ignore tags and max staleness for primary query + auto primaryCriteria = ReadPreferenceSetting(ReadPreference::PrimaryOnly); + _getCandidateServers(result, topologyDescription, primaryCriteria); + if (result->size()) { + shouldTagFilter = false; + break; + } + + // keep tags and maxStaleness for secondary query + auto secondaryCriteria = criteria; + secondaryCriteria.pref = ReadPreference::SecondaryOnly; + _getCandidateServers(result, topologyDescription, secondaryCriteria); + break; + } + + case ReadPreference::SecondaryPreferred: { + // keep tags and maxStaleness for secondary query + auto secondaryCriteria = criteria; + secondaryCriteria.pref = ReadPreference::SecondaryOnly; + _getCandidateServers(result, topologyDescription, secondaryCriteria); + if (result->size()) { + break; + } + + // ignore tags and maxStaleness for primary query + shouldTagFilter = false; + auto primaryCriteria = ReadPreferenceSetting(ReadPreference::PrimaryOnly); + _getCandidateServers(result, topologyDescription, primaryCriteria); + break; + } + + default: + MONGO_UNREACHABLE + } + + if (shouldTagFilter) { + filterTags(result, criteria.tags); + } +} + +boost::optional> SdamServerSelector::selectServers( + const TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) { + + // If the topology wire version is invalid, raise an error + if (!topologyDescription->isWireVersionCompatible()) { + uasserted(ErrorCodes::IncompatibleServerVersion, + *topologyDescription->getWireVersionCompatibleError()); + } + + if (topologyDescription->getType() == TopologyType::kUnknown) { + return boost::none; + } + + if (topologyDescription->getType() == TopologyType::kSingle) { + auto servers = topologyDescription->getServers(); + return (servers.size() && servers[0]->getType() != ServerType::kUnknown) + ? boost::optional>{{servers[0]}} + : boost::none; + } + + std::vector results; + _getCandidateServers(&results, topologyDescription, criteria); + + if (results.size()) { + ServerDescriptionPtr minServer = + *std::min_element(results.begin(), results.end(), LatencyWindow::rttCompareFn); + + invariant(minServer->getRtt()); + auto latencyWindow = LatencyWindow(*minServer->getRtt(), _config.getLocalThresholdMs()); + latencyWindow.filterServers(&results); + + // latency window should always leave at least one result + invariant(results.size()); + + return results; + } + + return boost::none; +} + +ServerDescriptionPtr SdamServerSelector::_randomSelect( + const std::vector& servers) const { + return servers[_random.nextInt64(servers.size())]; +} + +boost::optional SdamServerSelector::selectServer( + const TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) { + auto servers = selectServers(topologyDescription, criteria); + return servers ? boost::optional(_randomSelect(*servers)) : boost::none; +} + +bool SdamServerSelector::_containsAllTags(ServerDescriptionPtr server, const BSONObj& tags) { + auto serverTags = server->getTags(); + for (auto& checkTag : tags) { + auto checkKey = checkTag.fieldName(); + auto checkValue = checkTag.String(); + auto pos = serverTags.find(checkKey); + if (pos == serverTags.end() || pos->second != checkValue) { + return false; + } + } + return true; +} + +void SdamServerSelector::filterTags(std::vector* servers, + const TagSet& tagSet) { + const auto& checkTags = tagSet.getTagBSON(); + + if (checkTags.nFields() == 0) + return; + + const auto predicate = [&](const ServerDescriptionPtr& s) { + auto it = checkTags.begin(); + while (it != checkTags.end()) { + if (it->isABSONObj()) { + const BSONObj& tags = it->Obj(); + if (_containsAllTags(s, tags)) { + // found a match -- don't remove the server + return false; + } + } else { + log() << "invalid tags specified for server selection; tags should be specified as " + "a bson Obj: " + << it->toString(); + } + ++it; + } + + // remove the server + return true; + }; + + servers->erase(std::remove_if(servers->begin(), servers->end(), predicate), servers->end()); +} + +bool SdamServerSelector::recencyFilter(const ReadPreferenceSetting& readPref, + const ServerDescriptionPtr& s) { + bool result = true; + + // TODO: check to see if we want to enforce minOpTime at all since + // it was effectively optional in the original implementation. + if (!readPref.minOpTime.isNull()) { + result = result && (s->getOpTime() >= readPref.minOpTime); + } + + if (readPref.maxStalenessSeconds.count()) { + auto topologyDescription = s->getTopologyDescription(); + invariant(topologyDescription); + auto staleness = _calculateStaleness(*topologyDescription, s); + result = result && (staleness <= readPref.maxStalenessSeconds); + } + + return result; +} + + +void LatencyWindow::filterServers(std::vector* servers) { + servers->erase(std::remove_if(servers->begin(), + servers->end(), + [&](const ServerDescriptionPtr& s) { + // Servers that have made it to this stage are not ServerType + // == kUnknown, so they must have an associated latency. + invariant(s->getType() != ServerType::kUnknown); + invariant(s->getRtt()); + return !this->isWithinWindow(*s->getRtt()); + }), + servers->end()); +} + +bool LatencyWindow::isWithinWindow(IsMasterRTT latency) { + return lower <= latency && latency <= upper; +} +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_selector.h b/src/mongo/client/sdam/server_selector.h new file mode 100644 index 00000000000..9cb676bd3b6 --- /dev/null +++ b/src/mongo/client/sdam/server_selector.h @@ -0,0 +1,193 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once +#include +#include + +#include "mongo/client/read_preference.h" +#include "mongo/client/sdam/sdam_configuration.h" +#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/client/sdam/server_description.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/platform/random.h" + +namespace mongo::sdam { +/** + * This is the interface that allows one to select a server to satisfy a DB operation given a + * TopologyDescription and a ReadPreferenceSetting. + */ +class ServerSelector { +public: + /** + * Finds a list of candidate servers according to the ReadPreferenceSetting. + */ + virtual boost::optional> selectServers( + TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) = 0; + + /** + * Select a single server according to the ReadPreference and latency of the + * ServerDescription(s). The server is selected randomly from those that match the criteria. + */ + virtual boost::optional selectServer( + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria) = 0; + + virtual ~ServerSelector(); +}; +using ServerSelectorPtr = std::unique_ptr; + +class SdamServerSelector : public ServerSelector { +public: + explicit SdamServerSelector(const ServerSelectionConfiguration& config); + + boost::optional> selectServers( + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria) override; + + boost::optional selectServer( + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria) override; + + // remove servers that do not match the TagSet + void filterTags(std::vector* servers, const TagSet& tagSet); + +private: + void _getCandidateServers(std::vector* result, + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria); + + bool _containsAllTags(ServerDescriptionPtr server, const BSONObj& tags); + + ServerDescriptionPtr _randomSelect(const std::vector& servers) const; + + // staleness for a ServerDescription is defined here: + // https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#maxstalenessseconds + Milliseconds _calculateStaleness(const TopologyDescriptionPtr& topologyDescription, + const ServerDescriptionPtr& serverDescription) { + if (serverDescription->getType() != ServerType::kRSSecondary) + return Milliseconds(0); + + const Date_t& lastWriteDate = serverDescription->getLastWriteDate() + ? *serverDescription->getLastWriteDate() + : Date_t::min(); + + if (topologyDescription->getType() == TopologyType::kReplicaSetWithPrimary) { + // (S.lastUpdateTime - S.lastWriteDate) - (P.lastUpdateTime - P.lastWriteDate) + + // heartbeatFrequencyMS + + // topologyType == kReplicaSetWithPrimary implies the validity of the primary server + // description. + invariant(topologyDescription->getPrimary()); + const auto& primaryDescription = *topologyDescription->getPrimary(); + + const auto& primaryLastWriteDate = primaryDescription->getLastWriteDate() + ? *primaryDescription->getLastWriteDate() + : Date_t::min(); + + auto result = (serverDescription->getLastUpdateTime() - lastWriteDate) - + (primaryDescription->getLastUpdateTime() - primaryLastWriteDate) + + _config.getHeartBeatFrequencyMs(); + return duration_cast(result); + } else if (topologyDescription->getType() == TopologyType::kReplicaSetNoPrimary) { + // SMax.lastWriteDate - S.lastWriteDate + heartbeatFrequencyMS + Date_t maxLastWriteDate = Date_t::min(); + + // identify secondary with max last write date. + for (const auto& s : topologyDescription->getServers()) { + if (s->getType() != ServerType::kRSSecondary) + continue; + + const auto& sLastWriteDate = + s->getLastWriteDate() ? *s->getLastWriteDate() : Date_t::min(); + + if (sLastWriteDate > maxLastWriteDate) { + maxLastWriteDate = sLastWriteDate; + } + } + + auto result = (maxLastWriteDate - lastWriteDate) + _config.getHeartBeatFrequencyMs(); + return duration_cast(result); + } else { + // Not a replica set + return Milliseconds(0); + } + } + + bool recencyFilter(const ReadPreferenceSetting& readPref, const ServerDescriptionPtr& s); + + // A SelectionFilter is a higher order function used to filter out servers from the current + // Topology. It's return value is used as input to the TopologyDescription::findServers + // function, and is a function that takes a ServerDescriptionPtr and returns a bool indicating + // whether to keep this server or not based on the ReadPreference, server type, and recency + // metrics of the server. + using SelectionFilter = unique_function( + const ReadPreferenceSetting&)>; + + const SelectionFilter secondaryFilter = [this](const ReadPreferenceSetting& readPref) { + return [&](const ServerDescriptionPtr& s) { + return (s->getType() == ServerType::kRSSecondary) && recencyFilter(readPref, s); + }; + }; + + const SelectionFilter primaryFilter = [this](const ReadPreferenceSetting& readPref) { + return [&](const ServerDescriptionPtr& s) { + return (s->getType() == ServerType::kRSPrimary) && recencyFilter(readPref, s); + }; + }; + + const SelectionFilter nearestFilter = [this](const ReadPreferenceSetting& readPref) { + return [&](const ServerDescriptionPtr& s) { + return (s->getType() == ServerType::kRSPrimary || + s->getType() == ServerType::kRSSecondary) && + recencyFilter(readPref, s); + }; + }; + + ServerSelectionConfiguration _config; + mutable PseudoRandom _random; +}; + +// This is used to filter out servers based on their current latency measurements. +struct LatencyWindow { + const IsMasterRTT lower; + const IsMasterRTT upper; + + explicit LatencyWindow(const IsMasterRTT lowerBound, const IsMasterRTT windowWidth) + : lower(lowerBound), upper(lowerBound + windowWidth) {} + + bool isWithinWindow(IsMasterRTT latency); + + // remove servers not in the latency window in-place. + void filterServers(std::vector* servers); + + static bool rttCompareFn(const ServerDescriptionPtr& a, const ServerDescriptionPtr& b) { + return a->getRtt() < b->getRtt(); + } +}; +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_selector_test.cpp b/src/mongo/client/sdam/server_selector_test.cpp new file mode 100644 index 00000000000..233c00b32a5 --- /dev/null +++ b/src/mongo/client/sdam/server_selector_test.cpp @@ -0,0 +1,463 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/sdam/server_selector.h" + +#include + +#include "mongo/client/sdam/sdam_test_base.h" +#include "mongo/client/sdam/server_description_builder.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/client/sdam/topology_manager.h" +#include "mongo/db/wire_version.h" +#include "mongo/util/system_clock_source.h" + +namespace mongo::sdam { + +class ServerSelectorTestFixture : public SdamTestFixture { +public: + static inline const auto clockSource = SystemClockSource::get(); + static inline const auto sdamConfiguration = SdamConfiguration({{"s0"}}); + static inline const auto selectionConfig = + ServerSelectionConfiguration(Milliseconds(10), Milliseconds(10)); + + static constexpr auto SET_NAME = "set"; + static constexpr int NUM_ITERATIONS = 1000; + + struct TagSets { + static inline const auto eastProduction = BSON("dc" + << "east" + << "usage" + << "production"); + static inline const auto westProduction = BSON("dc" + << "west" + << "usage" + << "production"); + static inline const auto northTest = BSON("dc" + << "north" + << "usage" + << "test"); + static inline const auto northProduction = BSON("dc" + << "north" + << "usage" + << "production"); + static inline const auto production = BSON("usage" + << "production"); + + static inline const auto test = BSON("usage" + << "test"); + + static inline const auto integration = BSON("usage" + << "integration"); + + static inline const auto primary = BSON("tag" + << "primary"); + static inline const auto secondary = BSON("tag" + << "secondary"); + + static inline const auto emptySet = TagSet{BSONArray(BSONObj())}; + static inline const auto eastOrWestProductionSet = + TagSet(BSON_ARRAY(eastProduction << westProduction)); + static inline const auto westProductionSet = TagSet(BSON_ARRAY(westProduction)); + static inline const auto productionSet = TagSet(BSON_ARRAY(production)); + static inline const auto testSet = TagSet(BSON_ARRAY(test)); + static inline const auto integrationOrTestSet = TagSet(BSON_ARRAY(integration << test)); + static inline const auto integrationSet = TagSet(BSON_ARRAY(integration)); + + static inline const auto primarySet = TagSet(BSON_ARRAY(primary)); + static inline const auto secondarySet = TagSet(BSON_ARRAY(secondary)); + }; + + static ServerDescriptionPtr make_with_latency(IsMasterRTT latency, + ServerAddress address, + ServerType serverType = ServerType::kRSPrimary, + std::map tags = {}) { + auto builder = ServerDescriptionBuilder() + .withType(serverType) + .withAddress(address) + .withSetName(SET_NAME) + .withRtt(latency) + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastUpdateTime(Date_t::now()); + + for (auto it = tags.begin(); it != tags.end(); ++it) { + builder.withTag(it->first, it->second); + } + + return builder.instance(); + } + + static auto makeServerDescriptionList() { + return std::vector{ + make_with_latency(Milliseconds(1), + "s1", + ServerType::kRSSecondary, + {{"dc", "east"}, {"usage", "production"}}), + make_with_latency(Milliseconds(1), + "s1-test", + ServerType::kRSSecondary, + {{"dc", "east"}, {"usage", "test"}}), + make_with_latency(Milliseconds(1), + "s2", + ServerType::kRSSecondary, + {{"dc", "west"}, {"usage", "production"}}), + make_with_latency(Milliseconds(1), + "s2-test", + ServerType::kRSSecondary, + {{"dc", "west"}, {"usage", "test"}}), + make_with_latency(Milliseconds(1), + "s3", + ServerType::kRSSecondary, + {{"dc", "north"}, {"usage", "production"}})}; + }; + + SdamServerSelector selector = SdamServerSelector(selectionConfig); +}; + +TEST_F(ServerSelectorTestFixture, ShouldFilterCorrectlyByLatencyWindow) { + const auto delta = Milliseconds(10); + const auto windowWidth = Milliseconds(100); + const auto lowerBound = Milliseconds(100); + + auto window = LatencyWindow(lowerBound, windowWidth); + + std::vector servers = { + make_with_latency(window.lower - delta, "less"), + make_with_latency(window.lower, "boundary-lower"), + make_with_latency(window.lower + delta, "within"), + make_with_latency(window.upper, "boundary-upper"), + make_with_latency(window.upper + delta, "greater")}; + + window.filterServers(&servers); + + ASSERT_EQ(3, servers.size()); + ASSERT_EQ("boundary-lower", servers[0]->getAddress()); + ASSERT_EQ("within", servers[1]->getAddress()); + ASSERT_EQ("boundary-upper", servers[2]->getAddress()); +} + +TEST_F(ServerSelectorTestFixture, ShouldThrowOnWireError) { + auto topologyDescription = std::make_shared(sdamConfiguration); + auto oldServer = ServerDescriptionBuilder() + .withAddress(topologyDescription->getServers().back()->getAddress()) + .withType(ServerType::kRSPrimary) + .withMaxWireVersion(WireVersion::RELEASE_2_4_AND_BEFORE) + .withMinWireVersion(WireVersion::RELEASE_2_4_AND_BEFORE) + .instance(); + topologyDescription->installServerDescription(oldServer); + + ASSERT(!topologyDescription->isWireVersionCompatible()); + ASSERT_THROWS_CODE(selector.selectServers(topologyDescription, ReadPreferenceSetting()), + DBException, + ErrorCodes::IncompatibleServerVersion); +} + +TEST_F(ServerSelectorTestFixture, ShouldReturnNoneIfTopologyUnknown) { + auto topologyDescription = std::make_shared(sdamConfiguration); + ASSERT_EQ(TopologyType::kUnknown, topologyDescription->getType()); + ASSERT_EQ(boost::none, selector.selectServers(topologyDescription, ReadPreferenceSetting())); +} + +TEST_F(ServerSelectorTestFixture, ShouldSelectRandomlyWhenMultipleOptionsAreAvailable) { + TopologyStateMachine stateMachine(sdamConfiguration); + auto topologyDescription = std::make_shared(sdamConfiguration); + + const auto s0Latency = Milliseconds(1); + auto primary = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kRSPrimary) + .withRtt(s0Latency) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withHost("s3") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .instance(); + stateMachine.onServerDescription(*topologyDescription, primary); + + const auto s1Latency = Milliseconds((s0Latency + selectionConfig.getLocalThresholdMs()) / 2); + auto secondaryInLatencyWindow = make_with_latency(s1Latency, "s1", ServerType::kRSSecondary); + stateMachine.onServerDescription(*topologyDescription, secondaryInLatencyWindow); + + // s2 is on the boundary of the latency window + const auto s2Latency = s0Latency + selectionConfig.getLocalThresholdMs(); + auto secondaryOnBoundaryOfLatencyWindow = + make_with_latency(s2Latency, "s2", ServerType::kRSSecondary); + stateMachine.onServerDescription(*topologyDescription, secondaryOnBoundaryOfLatencyWindow); + + // s3 should not be selected + const auto s3Latency = s2Latency + Milliseconds(10); + auto secondaryTooFar = make_with_latency(s3Latency, "s3", ServerType::kRSSecondary); + stateMachine.onServerDescription(*topologyDescription, secondaryTooFar); + + std::map frequencyInfo{{"s0", 0}, {"s1", 0}, {"s2", 0}, {"s3", 0}}; + for (int i = 0; i < NUM_ITERATIONS; i++) { + auto server = selector.selectServer(topologyDescription, + ReadPreferenceSetting(ReadPreference::Nearest)); + if (server) { + frequencyInfo[(*server)->getAddress()]++; + } + } + + ASSERT(frequencyInfo["s0"]); + ASSERT(frequencyInfo["s1"]); + ASSERT(frequencyInfo["s2"]); + ASSERT_FALSE(frequencyInfo["s3"]); +} + +TEST_F(ServerSelectorTestFixture, ShouldFilterByLastWriteTime) { + TopologyStateMachine stateMachine(sdamConfiguration); + auto topologyDescription = std::make_shared(sdamConfiguration); + + const int MAX_STALENESS = 60; + const auto sixtySeconds = Seconds(MAX_STALENESS); + const auto now = Date_t::now(); + + + const auto d0 = now - Milliseconds(1000); + const auto s0 = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kRSPrimary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .instance(); + stateMachine.onServerDescription(*topologyDescription, s0); + + const auto d1 = now - Milliseconds(1000 * 5); + const auto s1 = ServerDescriptionBuilder() + .withAddress("s1") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d1) + .instance(); + stateMachine.onServerDescription(*topologyDescription, s1); + + // d2 is stale, so s2 should not be selected. + const auto d2 = now - sixtySeconds - sixtySeconds; + const auto s2 = ServerDescriptionBuilder() + .withAddress("s2") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d2) + .instance(); + stateMachine.onServerDescription(*topologyDescription, s2); + + const auto readPref = + ReadPreferenceSetting(ReadPreference::Nearest, TagSets::emptySet, sixtySeconds); + + std::map frequencyInfo{{"s0", 0}, {"s1", 0}, {"s2", 0}}; + for (int i = 0; i < NUM_ITERATIONS; i++) { + auto server = selector.selectServer(topologyDescription, readPref); + + if (server) { + frequencyInfo[(*server)->getAddress()]++; + } + } + + ASSERT(frequencyInfo["s0"]); + ASSERT(frequencyInfo["s1"]); + ASSERT_FALSE(frequencyInfo["s2"]); +} + +TEST_F(ServerSelectorTestFixture, ShouldSelectPreferredIfAvailable) { + TopologyStateMachine stateMachine(sdamConfiguration); + auto topologyDescription = std::make_shared(sdamConfiguration); + + const int MAX_STALENESS = 60; + const auto sixtySeconds = Seconds(MAX_STALENESS); + const auto now = Date_t::now(); + + + const auto d0 = now - Milliseconds(1000); + const auto s0 = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kRSPrimary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .withTag("tag", "primary") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s0); + + const auto s1 = ServerDescriptionBuilder() + .withAddress("s1") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .withTag("tag", "secondary") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s1); + + const auto primaryPreferredTagSecondary = + ReadPreferenceSetting(ReadPreference::PrimaryPreferred, TagSets::secondarySet); + auto result1 = selector.selectServer(topologyDescription, primaryPreferredTagSecondary); + ASSERT(result1 != boost::none); + ASSERT_EQ("s0", (*result1)->getAddress()); + + const auto secondaryPreferredWithTag = + ReadPreferenceSetting(ReadPreference::SecondaryPreferred, TagSets::secondarySet); + auto result2 = selector.selectServer(topologyDescription, secondaryPreferredWithTag); + ASSERT(result2 != boost::none); + ASSERT_EQ("s1", (*result2)->getAddress()); + + const auto secondaryPreferredNoTag = ReadPreferenceSetting(ReadPreference::SecondaryPreferred); + auto result3 = selector.selectServer(topologyDescription, secondaryPreferredNoTag); + ASSERT(result3 != boost::none); + ASSERT_EQ("s1", (*result2)->getAddress()); +} + +TEST_F(ServerSelectorTestFixture, ShouldSelectTaggedSecondaryIfPreferredPrimaryNotAvailable) { + TopologyStateMachine stateMachine(sdamConfiguration); + auto topologyDescription = std::make_shared(sdamConfiguration); + + const int MAX_STALENESS = 60; + const auto sixtySeconds = Seconds(MAX_STALENESS); + const auto now = Date_t::now(); + + const auto d0 = now - Milliseconds(1000); + + const auto s0 = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kRSPrimary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .withTag("tag", "primary") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s0); + + // old primary unavailable + const auto s0_failed = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kUnknown) + .withSetName("set") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s0_failed); + + const auto s1 = ServerDescriptionBuilder() + .withAddress("s1") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .withTag("tag", "secondary") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s1); + + const auto s2 = ServerDescriptionBuilder() + .withAddress("s2") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .instance(); + stateMachine.onServerDescription(*topologyDescription, s2); + + const auto primaryPreferredTagSecondary = + ReadPreferenceSetting(ReadPreference::PrimaryPreferred, TagSets::secondarySet); + auto result1 = selector.selectServer(topologyDescription, primaryPreferredTagSecondary); + ASSERT(result1 != boost::none); + ASSERT_EQ("s1", (*result1)->getAddress()); +} + +TEST_F(ServerSelectorTestFixture, ShouldFilterByTags) { + auto tags = TagSets::productionSet; + auto servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(3, servers.size()); + + tags = TagSets::eastOrWestProductionSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(2, servers.size()); + + tags = TagSets::testSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(2, servers.size()); + + tags = TagSets::integrationOrTestSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(2, servers.size()); + + tags = TagSets::westProductionSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(1, servers.size()); + + tags = TagSets::integrationSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(0, servers.size()); + + tags = TagSets::emptySet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(makeServerDescriptionList().size(), servers.size()); +} +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_description.cpp b/src/mongo/client/sdam/topology_description.cpp index d3783719864..f236f388e90 100644 --- a/src/mongo/client/sdam/topology_description.cpp +++ b/src/mongo/client/sdam/topology_description.cpp @@ -107,6 +107,9 @@ const boost::optional TopologyDescription::findServerByAdd boost::optional TopologyDescription::installServerDescription( const ServerDescriptionPtr& newServerDescription) { + LOG(2) << "(" << getSetName() << ") install ServerDescription " + << newServerDescription->toString(); + boost::optional previousDescription; if (getType() == TopologyType::kSingle) { // For Single, there is always one ServerDescription in TopologyDescription.servers; @@ -131,6 +134,8 @@ boost::optional TopologyDescription::installServerDescript } } + newServerDescription->_topologyDescription = shared_from_this(); + checkWireCompatibilityVersions(); calculateLogicalSessionTimeout(); return previousDescription; @@ -174,13 +179,12 @@ void TopologyDescription::checkWireCompatibilityVersions() { break; } } - _compatibleError = (_compatible) ? boost::none : boost::make_optional(errorOss.str()); } const std::string TopologyDescription::minimumRequiredMongoVersionString(int version) { switch (version) { - case RESUMABLE_INITIAL_SYNC: + case RESUMABLE_INITIAL_SYNC: return "4.4"; case SHARDED_TRANSACTIONS: return "4.2"; @@ -270,54 +274,15 @@ std::string TopologyDescription::toString() { return toBSON().toString(); } -//////////////////////// -// SdamConfiguration -//////////////////////// -SdamConfiguration::SdamConfiguration(boost::optional> seedList, - TopologyType initialType, - mongo::Milliseconds heartBeatFrequencyMs, - boost::optional setName) - : _seedList(seedList), - _initialType(initialType), - _heartBeatFrequencyMs(heartBeatFrequencyMs), - _setName(setName) { - uassert(ErrorCodes::InvalidSeedList, - "seed list size must be >= 1", - !seedList || (*seedList).size() >= 1); - - uassert(ErrorCodes::InvalidSeedList, - "TopologyType Single must have exactly one entry in the seed list.", - _initialType != TopologyType::kSingle || (*seedList).size() == 1); - - uassert( - ErrorCodes::InvalidTopologyType, - "Only ToplogyTypes ReplicaSetNoPrimary and Single are allowed when a setName is provided.", - !_setName || - (_initialType == TopologyType::kReplicaSetNoPrimary || - _initialType == TopologyType::kSingle)); - - uassert(ErrorCodes::TopologySetNameRequired, - "setName is required for ReplicaSetNoPrimary", - _initialType != TopologyType::kReplicaSetNoPrimary || _setName); - - uassert(ErrorCodes::InvalidHeartBeatFrequency, - "topology heartbeat must be >= 500ms", - _heartBeatFrequencyMs >= kMinHeartbeatFrequencyMS); -} -const boost::optional>& SdamConfiguration::getSeedList() const { - return _seedList; -} - -TopologyType SdamConfiguration::getInitialType() const { - return _initialType; -} - -Milliseconds SdamConfiguration::getHeartBeatFrequency() const { - return _heartBeatFrequencyMs; -} +boost::optional TopologyDescription::getPrimary() { + if (getType() != TopologyType::kReplicaSetWithPrimary) { + return boost::none; + } -const boost::optional& SdamConfiguration::getSetName() const { - return _setName; + auto foundPrimaries = findServers( + [](const ServerDescriptionPtr& s) { return s->getType() == ServerType::kRSPrimary; }); + invariant(foundPrimaries.size() == 1); + return foundPrimaries[0]; } -}; // namespace mongo::sdam +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_description.h b/src/mongo/client/sdam/topology_description.h index 5894ad0d20f..a0469eb118b 100644 --- a/src/mongo/client/sdam/topology_description.h +++ b/src/mongo/client/sdam/topology_description.h @@ -36,55 +36,13 @@ #include "mongo/bson/oid.h" #include "mongo/client/read_preference.h" +#include "mongo/client/sdam/sdam_configuration.h" #include "mongo/client/sdam/sdam_datatypes.h" #include "mongo/client/sdam/server_description.h" #include "mongo/platform/basic.h" namespace mongo::sdam { -class SdamConfiguration { -public: - SdamConfiguration() : SdamConfiguration(boost::none){}; - - /** - * Initialize the TopologyDescription. This constructor may uassert if the provided - * configuration options are not valid according to the Server Discovery & Monitoring Spec. - * - * Initial Servers - * initial servers may be set to a seed list of one or more server addresses. - * - * Initial TopologyType - * The initial TopologyType may be set to Single, Unknown, or ReplicaSetNoPrimary. - * - * Initial setName - * The client's initial replica set name is required in order to initially configure the - * topology type as ReplicaSetNoPrimary. - * - * Allowed configuration combinations - * TopologyType Single cannot be used with multiple seeds. - * If setName is not null, only TopologyType ReplicaSetNoPrimary and Single, are - * allowed. - */ - SdamConfiguration(boost::optional> seedList, - TopologyType initialType = TopologyType::kUnknown, - mongo::Milliseconds heartBeatFrequencyMs = kDefaultHeartbeatFrequencyMs, - boost::optional setName = boost::none); - - const boost::optional>& getSeedList() const; - TopologyType getInitialType() const; - Milliseconds getHeartBeatFrequency() const; - const boost::optional& getSetName() const; - - static inline const mongo::Milliseconds kDefaultHeartbeatFrequencyMs = mongo::Seconds(10); - static inline const mongo::Milliseconds kMinHeartbeatFrequencyMS = mongo::Milliseconds(500); - -private: - boost::optional> _seedList; - TopologyType _initialType; - mongo::Milliseconds _heartBeatFrequencyMs; - boost::optional _setName; -}; - -class TopologyDescription { +class TopologyDescription : public std::enable_shared_from_this { public: TopologyDescription() : TopologyDescription(SdamConfiguration()) {} TopologyDescription(const TopologyDescription& source) = default; @@ -113,6 +71,7 @@ public: bool containsServerAddress(const ServerAddress& address) const; std::vector findServers( std::function predicate) const; + boost::optional getPrimary(); /** * Adds the given ServerDescription or swaps it with an existing one @@ -129,12 +88,29 @@ public: std::string toString(); private: + friend bool operator==(const TopologyDescription& lhs, const TopologyDescription& rhs) { + return std::tie(lhs._setName, + lhs._type, + lhs._maxSetVersion, + lhs._maxElectionId, + lhs._servers, + lhs._compatible, + lhs._logicalSessionTimeoutMinutes) == + std::tie(rhs._setName, + rhs._type, + rhs._maxSetVersion, + rhs._maxElectionId, + rhs._servers, + rhs._compatible, + rhs._logicalSessionTimeoutMinutes); + } + /** * Checks if all server descriptions are compatible with this server's WireVersion. If an - * incompatible description is found, we set the topologyDescription's _compatible flag to false - * and store an error message in _compatibleError. A ServerDescription which is not Unknown is - * incompatible if: - * minWireVersion > serverMaxWireVersion, or maxWireVersion < serverMinWireVersion + * incompatible description is found, we set the topologyDescription's _compatible flag to + * false and store an error message in _compatibleError. A ServerDescription which is not + * Unknown is incompatible if: minWireVersion > serverMaxWireVersion, or maxWireVersion < + * serverMinWireVersion */ void checkWireCompatibilityVersions(); diff --git a/src/mongo/client/sdam/topology_description_test.cpp b/src/mongo/client/sdam/topology_description_test.cpp index 9f7a6a2dbee..d43be9642b8 100644 --- a/src/mongo/client/sdam/topology_description_test.cpp +++ b/src/mongo/client/sdam/topology_description_test.cpp @@ -26,8 +26,6 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - #include "mongo/client/sdam/sdam_test_base.h" #include "mongo/client/sdam/topology_description.h" @@ -36,7 +34,6 @@ #include "mongo/client/sdam/server_description.h" #include "mongo/client/sdam/server_description_builder.h" #include "mongo/db/wire_version.h" -#include "mongo/logv2/log.h" #include "mongo/unittest/death_test.h" namespace mongo { @@ -107,9 +104,10 @@ TEST_F(TopologyDescriptionTestFixture, ShouldAllowTypeSingleWithASingleSeed) { } TEST_F(TopologyDescriptionTestFixture, DoesNotAllowMultipleSeedsWithSingle) { - ASSERT_THROWS_CODE(TopologyDescription({kTwoServersNormalCase, TopologyType::kSingle}), - DBException, - ErrorCodes::InvalidSeedList); + ASSERT_THROWS_CODE( + TopologyDescription(SdamConfiguration(kTwoServersNormalCase, TopologyType::kSingle)), + DBException, + ErrorCodes::InvalidSeedList); } TEST_F(TopologyDescriptionTestFixture, ShouldSetTheReplicaSetName) { @@ -121,10 +119,10 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetTheReplicaSetName) { } TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowSettingTheReplicaSetNameWithWrongType) { - ASSERT_THROWS_CODE( - TopologyDescription({kOneServer, TopologyType::kUnknown, mongo::Seconds(10), kSetName}), - DBException, - ErrorCodes::InvalidTopologyType); + ASSERT_THROWS_CODE(TopologyDescription(SdamConfiguration( + kOneServer, TopologyType::kUnknown, mongo::Seconds(10), kSetName)), + DBException, + ErrorCodes::InvalidTopologyType); } TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowTopologyTypeRSNoPrimaryWithoutSetName) { @@ -146,9 +144,8 @@ TEST_F(TopologyDescriptionTestFixture, ShouldOnlyAllowSingleAndRsNoPrimaryWithSe topologyTypes.end()); for (const auto topologyType : topologyTypes) { - LOGV2(20217, - "Check TopologyType {topologyType} with setName value.", - "topologyType"_attr = toString(topologyType)); + unittest::log() << "Check TopologyType " << toString(topologyType) + << " with setName value."; ASSERT_THROWS_CODE( SdamConfiguration(kOneServer, topologyType, mongo::Seconds(10), kSetName), DBException, @@ -180,7 +177,7 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetWireCompatibilityErrorForMinWireVersionWhenMinWireVersionIsGreater) { const auto outgoingMaxWireVersion = WireSpec::instance().outgoing.maxWireVersion; const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10)); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared(config); const auto serverDescriptionMinVersion = ServerDescriptionBuilder() .withAddress(kOneServer[0]) .withMe(kOneServer[0]) @@ -188,16 +185,16 @@ TEST_F(TopologyDescriptionTestFixture, .withMinWireVersion(outgoingMaxWireVersion + 1) .instance(); - ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); - topologyDescription.installServerDescription(serverDescriptionMinVersion); - ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); + topologyDescription->installServerDescription(serverDescriptionMinVersion); + ASSERT_NOT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); } TEST_F(TopologyDescriptionTestFixture, ShouldSetWireCompatibilityErrorForMinWireVersionWhenMaxWireVersionIsLess) { const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion; const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10)); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared(config); const auto serverDescriptionMaxVersion = ServerDescriptionBuilder() .withAddress(kOneServer[0]) .withMe(kOneServer[0]) @@ -205,31 +202,31 @@ TEST_F(TopologyDescriptionTestFixture, .withMaxWireVersion(outgoingMinWireVersion - 1) .instance(); - ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); - topologyDescription.installServerDescription(serverDescriptionMaxVersion); - ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); + topologyDescription->installServerDescription(serverDescriptionMaxVersion); + ASSERT_NOT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); } TEST_F(TopologyDescriptionTestFixture, ShouldNotSetWireCompatibilityErrorWhenServerTypeIsUnknown) { const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion; const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10)); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared(config); const auto serverDescriptionMaxVersion = ServerDescriptionBuilder().withMaxWireVersion(outgoingMinWireVersion - 1).instance(); - ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); - topologyDescription.installServerDescription(serverDescriptionMaxVersion); - ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); + topologyDescription->installServerDescription(serverDescriptionMaxVersion); + ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); } TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToMinOfAllServerDescriptions) { const auto config = SdamConfiguration(kThreeServers); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared(config); const auto logicalSessionTimeouts = std::vector{300, 100, 200}; auto timeoutIt = logicalSessionTimeouts.begin(); const auto serverDescriptionsWithTimeouts = map( - topologyDescription.getServers(), [&timeoutIt](const ServerDescriptionPtr& description) { + topologyDescription->getServers(), [&timeoutIt](const ServerDescriptionPtr& description) { auto newInstanceBuilder = ServerDescriptionBuilder() .withType(ServerType::kRSSecondary) .withAddress(description->getAddress()) @@ -240,26 +237,26 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToMinOfAllS }); for (auto description : serverDescriptionsWithTimeouts) { - topologyDescription.installServerDescription(description); + topologyDescription->installServerDescription(description); } int expectedLogicalSessionTimeout = *std::min_element(logicalSessionTimeouts.begin(), logicalSessionTimeouts.end()); ASSERT_EQUALS(expectedLogicalSessionTimeout, - topologyDescription.getLogicalSessionTimeoutMinutes()); + topologyDescription->getLogicalSessionTimeoutMinutes()); } TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToNoneIfAnyServerDescriptionHasNone) { const auto config = SdamConfiguration(kThreeServers); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared(config); const auto logicalSessionTimeouts = std::vector{300, 100, 200}; auto timeoutIt = logicalSessionTimeouts.begin(); const auto serverDescriptionsWithTimeouts = map( - topologyDescription.getServers(), [&](const ServerDescriptionPtr& description) { + topologyDescription->getServers(), [&](const ServerDescriptionPtr& description) { auto timeoutValue = (timeoutIt == logicalSessionTimeouts.begin()) ? boost::none : boost::make_optional(*timeoutIt); @@ -275,19 +272,19 @@ TEST_F(TopologyDescriptionTestFixture, }); for (auto description : serverDescriptionsWithTimeouts) { - topologyDescription.installServerDescription(description); + topologyDescription->installServerDescription(description); } - ASSERT_EQUALS(boost::none, topologyDescription.getLogicalSessionTimeoutMinutes()); + ASSERT_EQUALS(boost::none, topologyDescription->getLogicalSessionTimeoutMinutes()); } TEST_F(TopologyDescriptionTestFixture, ShouldUpdateTopologyVersionOnSuccess) { const auto config = SdamConfiguration(kThreeServers); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared(config); // Deafult topologyVersion is null - ASSERT_EQUALS(topologyDescription.getServers().size(), 3); - auto serverDescription = topologyDescription.getServers()[1]; + ASSERT_EQUALS(topologyDescription->getServers().size(), 3); + auto serverDescription = topologyDescription->getServers()[1]; ASSERT(serverDescription->getTopologyVersion() == boost::none); // Create new serverDescription with topologyVersion, topologyDescription should have the new @@ -300,20 +297,19 @@ TEST_F(TopologyDescriptionTestFixture, ShouldUpdateTopologyVersionOnSuccess) { .withTopologyVersion(TopologyVersion(processId, 1)) .instance(); - topologyDescription.installServerDescription(newDescription); - ASSERT_EQUALS(topologyDescription.getServers().size(), 3); - auto topologyVersion = topologyDescription.getServers()[1]->getTopologyVersion(); - ASSERT(topologyVersion->getProcessId() == processId); - ASSERT(topologyVersion->getCounter() == 1); + topologyDescription->installServerDescription(newDescription); + ASSERT_EQUALS(topologyDescription->getServers().size(), 3); + auto topologyVersion = topologyDescription->getServers()[1]->getTopologyVersion(); + ASSERT(topologyVersion == TopologyVersion(processId, 1)); } TEST_F(TopologyDescriptionTestFixture, ShouldNotUpdateTopologyVersionOnError) { const auto config = SdamConfiguration(kThreeServers); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared(config); // Deafult topologyVersion is null - ASSERT_EQUALS(topologyDescription.getServers().size(), 3); - auto serverDescription = topologyDescription.getServers()[1]; + ASSERT_EQUALS(topologyDescription->getServers().size(), 3); + auto serverDescription = topologyDescription->getServers()[1]; ASSERT(serverDescription->getTopologyVersion() == boost::none); auto newDescription = ServerDescriptionBuilder() @@ -321,9 +317,9 @@ TEST_F(TopologyDescriptionTestFixture, ShouldNotUpdateTopologyVersionOnError) { .withError("error") .instance(); - topologyDescription.installServerDescription(newDescription); - ASSERT_EQUALS(topologyDescription.getServers().size(), 3); - auto topologyVersion = topologyDescription.getServers()[1]->getTopologyVersion(); + topologyDescription->installServerDescription(newDescription); + ASSERT_EQUALS(topologyDescription->getServers().size(), 3); + auto topologyVersion = topologyDescription->getServers()[1]->getTopologyVersion(); ASSERT(topologyVersion == boost::none); } }; // namespace sdam diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp new file mode 100644 index 00000000000..995685b71b9 --- /dev/null +++ b/src/mongo/client/sdam/topology_listener.cpp @@ -0,0 +1,163 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/client/sdam/topology_listener.h" + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/util/log.h" + +namespace mongo::sdam { +void TopologyEventsPublisher::registerListener(TopologyListenerPtr listener) { + stdx::lock_guard lock(_mutex); + _listeners.push_back(listener); +} + +void TopologyEventsPublisher::removeListener(TopologyListenerPtr listener) { + stdx::lock_guard lock(_mutex); + _listeners.erase(std::remove(_listeners.begin(), _listeners.end(), listener), _listeners.end()); +} + +void TopologyEventsPublisher::close() { + stdx::lock_guard lock(_mutex); + _listeners.clear(); + _isClosed = true; +} + +void TopologyEventsPublisher::onTopologyDescriptionChangedEvent( + UUID topologyId, + TopologyDescriptionPtr previousDescription, + TopologyDescriptionPtr newDescription) { + { + stdx::lock_guard lock(_eventQueueMutex); + EventPtr event = std::make_unique(); + event->type = EventType::TOPOLOGY_DESCRIPTION_CHANGED; + event->topologyId = std::move(topologyId); + event->previousDescription = previousDescription; + event->newDescription = newDescription; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} + +void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, + const ServerAddress& hostAndPort, + const BSONObj reply) { + { + stdx::lock_guard lock(_eventQueueMutex); + EventPtr event = std::make_unique(); + event->type = EventType::HEARTBEAT_SUCCESS; + event->duration = duration_cast(durationMs); + event->hostAndPort = hostAndPort; + event->reply = reply; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} + +void TopologyEventsPublisher::onServerHeartbeatFailureEvent(IsMasterRTT durationMs, + Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply) { + { + stdx::lock_guard lock(_eventQueueMutex); + EventPtr event = std::make_unique(); + event->type = EventType::HEARTBEAT_FAILURE; + event->duration = duration_cast(durationMs); + event->hostAndPort = hostAndPort; + event->reply = reply; + event->status = errorStatus; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} + +void TopologyEventsPublisher::_scheduleNextDelivery() { + // run nextDelivery async + _executor->schedule( + [self = shared_from_this()](const Status& status) { self->_nextDelivery(); }); +} + +void TopologyEventsPublisher::onServerPingFailedEvent(const ServerAddress& hostAndPort, + const Status& status) {} + +void TopologyEventsPublisher::onServerPingSucceededEvent(IsMasterRTT durationMS, + const ServerAddress& hostAndPort) {} + +// TODO: this could be done in batches if this is a bottleneck. +void TopologyEventsPublisher::_nextDelivery() { + // get the next event to send + EventPtr nextEvent; + { + stdx::lock_guard lock(_eventQueueMutex); + if (!_eventQueue.size()) { + return; + } + nextEvent = std::move(_eventQueue.front()); + _eventQueue.pop_front(); + } + + // release the lock before sending to avoid deadlock in the case there + // are events generated by sending the current one. + std::vector listeners; + { + stdx::lock_guard lock(_mutex); + if (_isClosed) { + return; + } + listeners = _listeners; + } + + // send to the listeners outside of the lock. + for (auto listener : listeners) { + _sendEvent(listener, *nextEvent); + } +} + +void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Event& event) { + switch (event.type) { + case EventType::HEARTBEAT_SUCCESS: + listener->onServerHeartbeatSucceededEvent( + duration_cast(event.duration), event.hostAndPort, event.reply); + break; + case EventType::HEARTBEAT_FAILURE: + listener->onServerHeartbeatFailureEvent(duration_cast(event.duration), + event.status, + event.hostAndPort, + event.reply); + break; + case EventType::TOPOLOGY_DESCRIPTION_CHANGED: + // TODO: fix uuid or just remove + listener->onTopologyDescriptionChangedEvent( + UUID::gen(), event.previousDescription, event.newDescription); + break; + default: + MONGO_UNREACHABLE; + } +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h index 0cf38ddf68d..dde1ae3b683 100644 --- a/src/mongo/client/sdam/topology_listener.h +++ b/src/mongo/client/sdam/topology_listener.h @@ -26,10 +26,13 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ - #pragma once +#include +#include +#include #include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/executor/task_executor.h" #include "mongo/util/uuid.h" namespace mongo::sdam { @@ -49,13 +52,18 @@ public: TopologyDescriptionPtr previousDescription, TopologyDescriptionPtr newDescription){}; + virtual void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, + Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply){}; /** * Called when a ServerHeartBeatSucceededEvent is published - A heartbeat sent to the server at * hostAndPort succeeded. durationMS is the execution time of the event, including the time it * took to send the message and recieve the reply from the server. */ virtual void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, - const ServerAddress& hostAndPort){}; + const ServerAddress& hostAndPort, + const BSONObj reply){}; /* * Called when a ServerPingFailedEvent is published - A monitoring ping to the server at @@ -70,4 +78,69 @@ public: virtual void onServerPingSucceededEvent(IsMasterRTT durationMS, const ServerAddress& hostAndPort){}; }; + +/** + * This class publishes TopologyListener events to a group of registered listeners. + * + * To publish an event to all registered listeners call the corresponding event function on the + * TopologyEventsPublisher instance. + */ +class TopologyEventsPublisher final : public TopologyListener, + public std::enable_shared_from_this { +public: + TopologyEventsPublisher(std::shared_ptr executor) + : _executor(executor){}; + void registerListener(TopologyListenerPtr listener); + void removeListener(TopologyListenerPtr listener); + void close(); + + void onTopologyDescriptionChangedEvent(UUID topologyId, + TopologyDescriptionPtr previousDescription, + TopologyDescriptionPtr newDescription) override; + void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, + const ServerAddress& hostAndPort, + const BSONObj reply) override; + void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, + Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply) override; + void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override; + void onServerPingSucceededEvent(IsMasterRTT durationMS, + const ServerAddress& hostAndPort) override; + + +private: + enum class EventType { + HEARTBEAT_SUCCESS, + HEARTBEAT_FAILURE, + PING_SUCCESS, + PING_FAILURE, + TOPOLOGY_DESCRIPTION_CHANGED + }; + struct Event { + EventType type; + ServerAddress hostAndPort; + IsMasterRTT duration; + BSONObj reply; + TopologyDescriptionPtr previousDescription; + TopologyDescriptionPtr newDescription; + boost::optional topologyId; + Status status = Status::OK(); + }; + using EventPtr = std::unique_ptr; + + void _sendEvent(TopologyListenerPtr listener, const TopologyEventsPublisher::Event& event); + void _nextDelivery(); + void _scheduleNextDelivery(); + + // Lock acquisition order to avoid deadlock is _eventQueueMutex -> _mutex + Mutex _eventQueueMutex; + std::deque _eventQueue; + + Mutex _mutex; + bool _isClosed = false; + std::shared_ptr _executor; + std::vector _listeners; +}; +using TopologyEventsPublisherPtr = std::shared_ptr; } // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp index 7897e6948f7..64d74e114d9 100644 --- a/src/mongo/client/sdam/topology_manager.cpp +++ b/src/mongo/client/sdam/topology_manager.cpp @@ -32,9 +32,9 @@ #include "mongo/client/sdam/topology_state_machine.h" #include "mongo/logv2/log.h" +#include "mongo/rpc/topology_version_gen.h" namespace mongo::sdam { - namespace { /* Compare topologyVersions to determine if the isMaster response's topologyVersion is stale @@ -57,16 +57,19 @@ bool isStaleTopologyVersion(boost::optional lastTopologyVersion return false; } - } // namespace -TopologyManager::TopologyManager(SdamConfiguration config, ClockSource* clockSource) + +TopologyManager::TopologyManager(SdamConfiguration config, + ClockSource* clockSource, + TopologyEventsPublisherPtr eventsPublisher) : _config(std::move(config)), _clockSource(clockSource), _topologyDescription(std::make_unique(_config)), - _topologyStateMachine(std::make_unique(_config)) {} + _topologyStateMachine(std::make_unique(_config)), + _topologyEventsPublisher(eventsPublisher) {} -void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome) { +bool TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome) { stdx::lock_guard lock(_mutex); boost::optional lastRTT; @@ -85,11 +88,11 @@ void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome if (isStaleTopologyVersion(lastTopologyVersion, newTopologyVersion)) { LOGV2( 23930, - "Ignoring this isMaster response because our topologyVersion: {lastTopologyVersion}is " + "Ignoring this isMaster response because our topologyVersion: {lastTopologyVersion} is " "fresher than the provided topologyVersion: {newTopologyVersion}", "lastTopologyVersion"_attr = lastTopologyVersion->toBSON(), "newTopologyVersion"_attr = newTopologyVersion->toBSON()); - return; + return false; } boost::optional poolResetCounter = lastPoolResetCounter; @@ -98,17 +101,60 @@ void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome poolResetCounter = ++lastPoolResetCounter.get(); } - // newTopologyVersion will be null if the isMaster response did not provide one. auto newServerDescription = std::make_shared( _clockSource, isMasterOutcome, lastRTT, newTopologyVersion, poolResetCounter); - auto newTopologyDescription = std::make_unique(*_topologyDescription); - _topologyStateMachine->onServerDescription(*newTopologyDescription, newServerDescription); - _topologyDescription = std::move(newTopologyDescription); + auto oldTopologyDescription = _topologyDescription; + _topologyDescription = std::make_shared(*_topologyDescription); + + // if we are equal to the old description, just install the new description without + // performing any actions on the state machine. + auto isEqualToOldServerDescription = + (lastServerDescription && (*lastServerDescription->get()) == *newServerDescription); + if (isEqualToOldServerDescription) { + _topologyDescription->installServerDescription(newServerDescription); + } else { + _topologyStateMachine->onServerDescription(*_topologyDescription, newServerDescription); + } + + _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription); + return true; } const std::shared_ptr TopologyManager::getTopologyDescription() const { stdx::lock_guard lock(_mutex); return _topologyDescription; } + +void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt) { + stdx::lock_guard lock(_mutex); + + auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort); + if (oldServerDescription) { + auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt); + + auto oldTopologyDescription = _topologyDescription; + _topologyDescription = std::make_shared(*_topologyDescription); + _topologyDescription->installServerDescription(newServerDescription); + + _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription); + + return; + } + + // otherwise, the server was removed from the topology. Nothing to do. + LOGV2(433301, + str::stream() << "Not updating RTT. Server {server}" << hostAndPort + << " does not exist in ", + "server"_attr = hostAndPort, + "setName"_attr = getTopologyDescription()->getSetName()); +} + +void TopologyManager::_publishTopologyDescriptionChanged( + const TopologyDescriptionPtr& oldTopologyDescription, + const TopologyDescriptionPtr& newTopologyDescription) const { + if (_topologyEventsPublisher) + _topologyEventsPublisher->onTopologyDescriptionChangedEvent( + newTopologyDescription->getId(), oldTopologyDescription, newTopologyDescription); +} }; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_manager.h b/src/mongo/client/sdam/topology_manager.h index 292d48b6e8e..5ae64d94644 100644 --- a/src/mongo/client/sdam/topology_manager.h +++ b/src/mongo/client/sdam/topology_manager.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-present MongoDB, Inc. + * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -31,6 +31,7 @@ #include "mongo/client/sdam/sdam_datatypes.h" #include "mongo/client/sdam/topology_description.h" +#include "mongo/client/sdam/topology_listener.h" #include "mongo/client/sdam/topology_state_machine.h" namespace mongo::sdam { @@ -44,7 +45,9 @@ class TopologyManager { TopologyManager(const TopologyManager&) = delete; public: - TopologyManager(SdamConfiguration config, ClockSource* clockSource); + explicit TopologyManager(SdamConfiguration config, + ClockSource* clockSource, + TopologyEventsPublisherPtr eventsPublisher = nullptr); /** * This function atomically: @@ -57,7 +60,19 @@ public: * IsMasterOutcomes serially, as required by: * https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#process-one-ismaster-outcome-at-a-time */ - void onServerDescription(const IsMasterOutcome& isMasterOutcome); + bool onServerDescription(const IsMasterOutcome& isMasterOutcome); + + + /** + * This function updates the RTT value for a server without executing any state machine actions. + * It atomically: + * 1. Clones the current TopologyDescription + * 2. Clones the ServerDescription corresponding to hostAndPort such that it contains the new + * RTT value. + * 3. Installs the cloned ServerDescription into the TopologyDescription from step 1 + * 4. Installs the cloned TopologyDescription as the current one. + */ + void onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt); /** * Get the current TopologyDescription. This is safe to call from multiple threads. @@ -65,10 +80,15 @@ public: const TopologyDescriptionPtr getTopologyDescription() const; private: + void _publishTopologyDescriptionChanged( + const TopologyDescriptionPtr& oldTopologyDescription, + const TopologyDescriptionPtr& newTopologyDescription) const; + mutable mongo::Mutex _mutex = MONGO_MAKE_LATCH("TopologyManager"); const SdamConfiguration _config; ClockSource* _clockSource; - std::shared_ptr _topologyDescription; - std::unique_ptr _topologyStateMachine; + TopologyDescriptionPtr _topologyDescription; + TopologyStateMachinePtr _topologyStateMachine; + TopologyEventsPublisherPtr _topologyEventsPublisher; }; } // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_state_machine.h b/src/mongo/client/sdam/topology_state_machine.h index abed9bc854f..fcb5b7e0c99 100644 --- a/src/mongo/client/sdam/topology_state_machine.h +++ b/src/mongo/client/sdam/topology_state_machine.h @@ -101,4 +101,5 @@ private: static inline auto kLogPrefix = "sdam : "; }; +using TopologyStateMachinePtr = std::unique_ptr; } // namespace mongo::sdam diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp new file mode 100644 index 00000000000..8ae85924058 --- /dev/null +++ b/src/mongo/client/server_is_master_monitor.cpp @@ -0,0 +1,375 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/server_is_master_monitor.h" + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault +#include "mongo/client/sdam/sdam.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +const BSONObj IS_MASTER_BSON = BSON("isMaster" << 1); + +using executor::NetworkInterface; +using executor::NetworkInterfaceThreadPool; +using executor::TaskExecutor; +using executor::ThreadPoolTaskExecutor; + +const Milliseconds kZeroMs = Milliseconds{0}; +} // namespace + +SingleServerIsMasterMonitor::SingleServerIsMasterMonitor( + const MongoURI& setUri, + const sdam::ServerAddress& host, + Milliseconds heartbeatFrequencyMS, + sdam::TopologyEventsPublisherPtr eventListener, + std::shared_ptr executor) + : _host(host), + _eventListener(eventListener), + _executor(executor), + _heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)), + _isShutdown(true), + _setUri(setUri) { + LOG(kLogLevel.lessSevere()) << "Created Replica Set SingleServerIsMasterMonitor for host " + << host; +} + +void SingleServerIsMasterMonitor::init() { + stdx::lock_guard lock(_mutex); + _isShutdown = false; + _scheduleNextIsMaster(lock, Milliseconds(0)); +} + +void SingleServerIsMasterMonitor::requestImmediateCheck() { + Milliseconds delayUntilNextCheck; + stdx::lock_guard lock(_mutex); + if (_isShutdown) + return; + + // remain in expedited mode until the replica set recovers + if (!_isExpedited) { + // save some log lines. + LOG(kLogLevel) << "[SingleServerIsMasterMonitor] Monitoring " << _host + << " in expedited mode until we detect a primary."; + _isExpedited = true; + } + + // .. but continue with rescheduling the next request. + + if (_isMasterOutstanding) { + LOG(kLogLevel) << "[SingleServerIsMasterMonitor] immediate isMaster check requested, but " + "there is already an " + "outstanding request."; + return; + } + + const auto currentRefreshPeriod = _currentRefreshPeriod(lock); + + const Milliseconds timeSinceLastCheck = + (_lastIsMasterAt) ? _executor->now() - *_lastIsMasterAt : Milliseconds::max(); + + delayUntilNextCheck = (_lastIsMasterAt && (timeSinceLastCheck < currentRefreshPeriod)) + ? currentRefreshPeriod - timeSinceLastCheck + : kZeroMs; + + // if our calculated delay is less than the next scheduled call, then run the check sooner. + // Otherwise, do nothing. Three cases to cancel existing request: + // 1. refresh period has changed to expedited, so (currentRefreshPeriod - timeSinceLastCheck) is + // < 0 + // 2. calculated delay is less then next scheduled isMaster + // 3. isMaster was never scheduled. + if (((currentRefreshPeriod - timeSinceLastCheck) < kZeroMs) || + (delayUntilNextCheck < (currentRefreshPeriod - timeSinceLastCheck)) || + timeSinceLastCheck == Milliseconds::max()) { + _cancelOutstandingRequest(lock); + } else { + return; + } + + LOG(kLogLevel) << "[SingleServerIsMasterMonitor] Rescheduling next isMaster check for " + << this->_host << " in " << delayUntilNextCheck; + _scheduleNextIsMaster(lock, delayUntilNextCheck); +} + +void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds delay) { + if (_isShutdown) + return; + + invariant(!_isMasterOutstanding); + + Timer timer; + auto swCbHandle = _executor->scheduleWorkAt( + _executor->now() + delay, + [self = shared_from_this()](const executor::TaskExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; + } + self->_doRemoteCommand(); + }); + + if (!swCbHandle.isOK()) { + Microseconds latency(timer.micros()); + _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); + return; + } + + _nextIsMasterHandle = swCbHandle.getValue(); +} + +void SingleServerIsMasterMonitor::_doRemoteCommand() { + auto request = executor::RemoteCommandRequest( + HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS); + request.sslMode = _setUri.getSSLMode(); + + stdx::lock_guard lock(_mutex); + if (_isShutdown) + return; + + Timer timer; + auto swCbHandle = _executor->scheduleRemoteCommand( + std::move(request), + [self = shared_from_this(), + timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { + Milliseconds nextRefreshPeriod; + { + stdx::lock_guard lk(self->_mutex); + self->_isMasterOutstanding = false; + + if (self->_isShutdown || ErrorCodes::isCancelationError(result.response.status)) { + LOG(kLogLevel) << "[SingleServerIsMasterMonitor] not processing response: " + << result.response.status; + return; + } + + self->_lastIsMasterAt = self->_executor->now(); + nextRefreshPeriod = self->_currentRefreshPeriod(lk); + LOG(kLogLevel.lessSevere()) + << "next refresh period in " + nextRefreshPeriod.toString(); + self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + } + + Microseconds latency(timer.micros()); + if (result.response.isOK()) { + self->_onIsMasterSuccess(latency, result.response.data); + } else { + self->_onIsMasterFailure(latency, result.response.status, result.response.data); + } + }); + + if (!swCbHandle.isOK()) { + Microseconds latency(timer.micros()); + _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); + uasserted(31448, swCbHandle.getStatus().toString()); + } + + _isMasterOutstanding = true; + _remoteCommandHandle = swCbHandle.getValue(); +} + +void SingleServerIsMasterMonitor::shutdown() { + stdx::lock_guard lock(_mutex); + LOG(kLogLevel.lessSevere()) << "Closing Replica Set SingleServerIsMasterMonitor for host " + << _host; + _isShutdown = true; + + _cancelOutstandingRequest(lock); + + _executor = nullptr; + LOG(kLogLevel.lessSevere()) << "Done Closing Replica Set SingleServerIsMasterMonitor for host " + << _host; +} + +void SingleServerIsMasterMonitor::_cancelOutstandingRequest(WithLock) { + if (_nextIsMasterHandle.isValid()) { + _executor->cancel(_nextIsMasterHandle); + } + + if (_remoteCommandHandle.isValid()) { + _executor->cancel(_remoteCommandHandle); + } + + _isMasterOutstanding = false; +} + +void SingleServerIsMasterMonitor::_onIsMasterSuccess(sdam::IsMasterRTT latency, + const BSONObj bson) { + LOG(kLogLevel.lessSevere()) << "received successful isMaster for server " << _host << " (" + << latency << ")" + << "; " << bson.toString(); + _eventListener->onServerHeartbeatSucceededEvent( + duration_cast(latency), _host, bson); +} + +void SingleServerIsMasterMonitor::_onIsMasterFailure(sdam::IsMasterRTT latency, + const Status& status, + const BSONObj bson) { + LOG(kLogLevel) << "received failed isMaster for server " << _host << ": " << status.toString() + << " (" << latency << ")" + << "; " << bson.toString(); + _eventListener->onServerHeartbeatFailureEvent( + duration_cast(latency), status, _host, bson); +} + +Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds original) { + Milliseconds r = original; + static constexpr auto kPeriodField = "period"_sd; + if (auto modifyReplicaSetMonitorDefaultRefreshPeriod = globalFailPointRegistry().find("modifyReplicaSetMonitorDefaultRefreshPeriod")) { + modifyReplicaSetMonitorDefaultRefreshPeriod->executeIf( + [&r](const BSONObj& data) { + r = duration_cast(Seconds{data.getIntField(kPeriodField)}); + }, + [](const BSONObj& data) { return data.hasField(kPeriodField); }); + } + return r; +} + +Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) { + return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS + : _heartbeatFrequencyMS; +} + +void SingleServerIsMasterMonitor::disableExpeditedChecking() { + stdx::lock_guard lock(_mutex); + _isExpedited = false; +} + + +ServerIsMasterMonitor::ServerIsMasterMonitor( + const MongoURI& setUri, + const sdam::SdamConfiguration& sdamConfiguration, + sdam::TopologyEventsPublisherPtr eventsPublisher, + sdam::TopologyDescriptionPtr initialTopologyDescription, + std::shared_ptr executor) + : _sdamConfiguration(sdamConfiguration), + _eventPublisher(eventsPublisher), + _executor(_setupExecutor(executor)), + _isShutdown(false), + _setUri(setUri) { + LOG(kLogLevel) << "Starting Replica Set IsMaster monitor with " + << initialTopologyDescription->getServers().size() << " members."; + onTopologyDescriptionChangedEvent( + initialTopologyDescription->getId(), nullptr, initialTopologyDescription); +} + +void ServerIsMasterMonitor::shutdown() { + stdx::lock_guard lock(_mutex); + if (_isShutdown) + return; + + _isShutdown = true; + for (auto singleMonitor : _singleMonitors) { + singleMonitor.second->shutdown(); + } +} + +void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent( + UUID topologyId, + sdam::TopologyDescriptionPtr previousDescription, + sdam::TopologyDescriptionPtr newDescription) { + stdx::lock_guard lock(_mutex); + if (_isShutdown) + return; + + const auto newType = newDescription->getType(); + using sdam::TopologyType; + + if (newType == TopologyType::kSingle || newType == TopologyType::kReplicaSetWithPrimary || + newType == TopologyType::kSharded) { + _disableExpeditedChecking(lock); + } + + // remove monitors that are missing from the topology + auto it = _singleMonitors.begin(); + while (it != _singleMonitors.end()) { + const auto& serverAddress = it->first; + if (newDescription->findServerByAddress(serverAddress) == boost::none) { + auto& singleMonitor = _singleMonitors[serverAddress]; + singleMonitor->shutdown(); + LOG(kLogLevel) << serverAddress << " was removed from the topology."; + it = _singleMonitors.erase(it); + } else { + ++it; + } + } + + // add new monitors + newDescription->findServers([this](const sdam::ServerDescriptionPtr& serverDescription) { + const auto& serverAddress = serverDescription->getAddress(); + bool isMissing = + _singleMonitors.find(serverDescription->getAddress()) == _singleMonitors.end(); + if (isMissing) { + LOG(kLogLevel) << serverAddress << " was added to the topology."; + _singleMonitors[serverAddress] = std::make_shared( + _setUri, + serverAddress, + _sdamConfiguration.getHeartBeatFrequency(), + _eventPublisher, + _executor); + _singleMonitors[serverAddress]->init(); + } + return isMissing; + }); +} + +std::shared_ptr ServerIsMasterMonitor::_setupExecutor( + const std::shared_ptr& executor) { + if (executor) + return executor; + + auto hookList = std::make_unique(); + auto net = executor::makeNetworkInterface( + "ServerIsMasterMonitor-TaskExecutor", nullptr, std::move(hookList)); + auto pool = std::make_unique(net.get()); + auto result = std::make_shared(std::move(pool), std::move(net)); + result->startup(); + return result; +} + +void ServerIsMasterMonitor::requestImmediateCheck() { + stdx::lock_guard lock(_mutex); + if (_isShutdown) + return; + + for (auto& addressAndMonitor : _singleMonitors) { + addressAndMonitor.second->requestImmediateCheck(); + } +} + +void ServerIsMasterMonitor::_disableExpeditedChecking(WithLock) { + for (auto& addressAndMonitor : _singleMonitors) { + addressAndMonitor.second->disableExpeditedChecking(); + } +} +} // namespace mongo diff --git a/src/mongo/client/server_is_master_monitor.h b/src/mongo/client/server_is_master_monitor.h new file mode 100644 index 00000000000..14e2e202ba4 --- /dev/null +++ b/src/mongo/client/server_is_master_monitor.h @@ -0,0 +1,135 @@ + +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/mongo_uri.h" +#include "mongo/client/sdam/sdam.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/unordered_map.h" +#include "mongo/util/net/hostandport.h" + +namespace mongo { +using namespace sdam; + +class SingleServerIsMasterMonitor + : public std::enable_shared_from_this { +public: + explicit SingleServerIsMasterMonitor(const MongoURI& setUri, + const ServerAddress& host, + Milliseconds heartbeatFrequencyMS, + TopologyEventsPublisherPtr eventListener, + std::shared_ptr executor); + + void init(); + void shutdown(); + + /** + * Request an immediate check. The server will be checked immediately if we haven't completed + * an isMaster less than SdamConfiguration::kMinHeartbeatFrequencyMS ago. Otherwise, + * we schedule a check that runs after SdamConfiguration::kMinHeartbeatFrequencyMS since + * the last isMaster. + */ + void requestImmediateCheck(); + void disableExpeditedChecking(); + +private: + void _scheduleNextIsMaster(WithLock, Milliseconds delay); + void _doRemoteCommand(); + + void _onIsMasterSuccess(IsMasterRTT latency, const BSONObj bson); + void _onIsMasterFailure(IsMasterRTT latency, const Status& status, const BSONObj bson); + + Milliseconds _overrideRefreshPeriod(Milliseconds original); + Milliseconds _currentRefreshPeriod(WithLock); + void _cancelOutstandingRequest(WithLock); + + static inline const logger::LogSeverity kLogLevel = logger::LogSeverity::Debug(1); + + Mutex _mutex; + ServerAddress _host; + TopologyEventsPublisherPtr _eventListener; + std::shared_ptr _executor; + Milliseconds _heartbeatFrequencyMS; + Milliseconds _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS; + + boost::optional _lastIsMasterAt; + bool _isMasterOutstanding = false; + bool _isExpedited = false; + executor::TaskExecutor::CallbackHandle _nextIsMasterHandle; + executor::TaskExecutor::CallbackHandle _remoteCommandHandle; + + bool _isShutdown; + MongoURI _setUri; +}; +using SingleServerIsMasterMonitorPtr = std::shared_ptr; + + +class ServerIsMasterMonitor : public TopologyListener { +public: + ServerIsMasterMonitor(const MongoURI& setUri, + const SdamConfiguration& sdamConfiguration, + TopologyEventsPublisherPtr eventsPublisher, + TopologyDescriptionPtr initialTopologyDescription, + std::shared_ptr executor = nullptr); + + virtual ~ServerIsMasterMonitor() {} + + void shutdown(); + + /** + * Request an immediate check of each member in the replica set. + */ + void requestImmediateCheck(); + + /** + * Add/Remove Single Monitors based on the current topology membership. + */ + void onTopologyDescriptionChangedEvent(UUID topologyId, + TopologyDescriptionPtr previousDescription, + TopologyDescriptionPtr newDescription) override; + +private: + /** + * If the provided executor exists, use that one (for testing). Otherwise create a new one. + */ + std::shared_ptr _setupExecutor( + const std::shared_ptr& executor); + void _disableExpeditedChecking(WithLock); + + static inline const logger::LogSeverity kLogLevel = logger::LogSeverity::Debug(0); + + Mutex _mutex; + SdamConfiguration _sdamConfiguration; + TopologyEventsPublisherPtr _eventPublisher; + std::shared_ptr _executor; + std::unordered_map _singleMonitors; + bool _isShutdown; + MongoURI _setUri; +}; +using ServerIsMasterMonitorPtr = std::shared_ptr; +} // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 0056a98c567..f497648c82a 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -27,82 +27,565 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include "mongo/client/streamable_replica_set_monitor.h" -#include -#include -#include -#include +#include +#include -#include "mongo/client/mongo_uri.h" -#include "mongo/client/replica_set_change_notifier.h" -#include "mongo/client/replica_set_monitor.h" -#include "mongo/executor/task_executor.h" -#include "mongo/util/concurrency/with_lock.h" -#include "mongo/util/duration.h" -#include "mongo/util/net/hostandport.h" -#include "mongo/util/time_support.h" +#include "mongo/bson/simple_bsonelement_comparator.h" +#include "mongo/client/connpool.h" +#include "mongo/client/global_conn_pool.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/streamable_replica_set_monitor_query_processor.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/bson_extract_optime.h" +#include "mongo/db/server_options.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" +#include "mongo/rpc/metadata/egress_metadata_hook_list.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/log.h" +#include "mongo/util/string_map.h" +#include "mongo/util/timer.h" namespace mongo { +using namespace mongo::sdam; -StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri) {} -void StreamableReplicaSetMonitor::init() {} +using std::numeric_limits; +using std::set; +using std::shared_ptr; +using std::string; +using std::vector; -void StreamableReplicaSetMonitor::drop() {} +namespace { +// Pull nested types to top-level scope +using executor::TaskExecutor; +using CallbackArgs = TaskExecutor::CallbackArgs; +using CallbackHandle = TaskExecutor::CallbackHandle; -SemiFuture StreamableReplicaSetMonitor::getHostOrRefresh( - const ReadPreferenceSetting& readPref, Milliseconds maxWait) { - MONGO_UNREACHABLE; +const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet()); + +// Utility functions to use when finding servers +bool minWireCompare(const ServerDescriptionPtr& a, const ServerDescriptionPtr& b) { + return a->getMinWireVersion() < b->getMinWireVersion(); +} + +bool maxWireCompare(const ServerDescriptionPtr& a, const ServerDescriptionPtr& b) { + return a->getMaxWireVersion() < b->getMaxWireVersion(); +} + +bool secondaryPredicate(const ServerDescriptionPtr& server) { + return server->getType() == ServerType::kRSSecondary; +} + +std::string readPrefToStringWithMinOpTime(const ReadPreferenceSetting& readPref) { + BSONObjBuilder builder; + readPref.toInnerBSON(&builder); + if (!readPref.minOpTime.isNull()) { + builder.append("minOpTime", readPref.minOpTime.toBSON()); + } + return builder.obj().toString(); +} + +// TODO: remove +std::string hostListToString(boost::optional> x) { + std::stringstream s; + if (x) { + for (auto h : *x) { + s << h.toString() << "; "; + } + } + return s.str(); +} + +int32_t pingTimeMillis(const ServerDescriptionPtr& serverDescription) { + static const Milliseconds maxLatency = Milliseconds::max(); + const auto& serverRtt = serverDescription->getRtt(); + auto latencyMillis = (serverRtt) ? duration_cast(*serverRtt) : maxLatency; + return std::min(latencyMillis, maxLatency).count(); +} + +constexpr auto kZeroMs = Milliseconds(0); +} // namespace + +StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri, std::shared_ptr executor) : + _serverSelector(std::make_unique(kServerSelectionConfig)), + _queryProcessor(std::make_shared()), + _uri(uri), + _executor(executor), + _random(PseudoRandom(SecureRandom().nextInt64())) { + + // TODO: sdam should use the HostAndPort type for ServerAddress + std::vector seeds; + for (const auto& seed : uri.getServers()) { + seeds.push_back(seed.toString()); + } + + _sdamConfig = SdamConfiguration(seeds); +} + +ReplicaSetMonitorPtr StreamableReplicaSetMonitor::make(const MongoURI& uri, + std::shared_ptr executor) { + auto result = std::make_shared(uri, executor); + result->init(); + return result; +} + +void StreamableReplicaSetMonitor::init() { + stdx::lock_guard lock(_mutex); + LOG(kDefaultLogLevel) << _logPrefix() << "Starting Replica Set Monitor with uri: " << _uri; + + _eventsPublisher = std::make_shared(_executor); + _topologyManager = std::make_unique( + _sdamConfig, getGlobalServiceContext()->getPreciseClockSource(), _eventsPublisher); + _isMasterMonitor = std::make_unique( + _uri, _sdamConfig, _eventsPublisher, _topologyManager->getTopologyDescription(), _executor); + + _eventsPublisher->registerListener(shared_from_this()); + _eventsPublisher->registerListener(_isMasterMonitor); + _isDropped.store(false); + + ReplicaSetMonitorManager::get()->getNotifier().onFoundSet(getName()); +} + +void StreamableReplicaSetMonitor::drop() { + stdx::lock_guard lock(_mutex); + if (_isDropped.load()) + return; + + _isDropped.store(true); + LOG(kDefaultLogLevel) << _logPrefix() << "Closing Replica Set Monitor"; + _eventsPublisher->close(); + _queryProcessor->shutdown(); + _isMasterMonitor->shutdown(); + _failOutstandingWithStatus( + lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"}); + + ReplicaSetMonitorManager::get()->getNotifier().onDroppedSet(getName()); + + LOG(kDefaultLogLevel) << _logPrefix() << "Done closing Replica Set Monitor"; +} + +SemiFuture StreamableReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria, + Milliseconds maxWait) { + return getHostsOrRefresh(criteria, maxWait) + .thenRunOn(_executor) + .then([self = shared_from_this()](const std::vector& result) { + invariant(result.size()); + return result[self->_random.nextInt64(result.size())]; + }) + .semi(); +} + +std::vector StreamableReplicaSetMonitor::_extractHosts( + const std::vector& serverDescriptions) { + std::vector result; + for (const auto& server : serverDescriptions) { + result.push_back(HostAndPort(server->getAddress())); + } + return result; } SemiFuture> StreamableReplicaSetMonitor::getHostsOrRefresh( - const ReadPreferenceSetting& readPref, Milliseconds maxWait) { - MONGO_UNREACHABLE; + const ReadPreferenceSetting& criteria, Milliseconds maxWait) { + // In the fast case (stable topology), we avoid mutex acquisition. + if (_isDropped.load()) { + return _makeReplicaSetMonitorRemovedError(); + } + + // start counting from the beginning of the operation + const auto deadline = _executor->now() + ((maxWait > kZeroMs) ? maxWait : kZeroMs); + + // try to satisfy query immediately + auto immediateResult = _getHosts(criteria); + if (immediateResult) { + LOG(kLowerLogLevel) << _logPrefix() + << "getHosts: " << readPrefToStringWithMinOpTime(criteria) << " -> " + << hostListToString(immediateResult); + return {*immediateResult}; + } + + _isMasterMonitor->requestImmediateCheck(); + LOG(kDefaultLogLevel) << _logPrefix() + << "start getHosts: " << readPrefToStringWithMinOpTime(criteria); + + // fail fast on timeout + const Date_t& now = _executor->now(); + if (deadline <= now) { + return _makeUnsatisfiedReadPrefError(criteria); + } + + { + stdx::lock_guard lk(_mutex); + + // We check if we are closed under the mutex here since someone could have called + // close() concurrently with the code above. + if (_isDropped.load()) { + return _makeReplicaSetMonitorRemovedError(); + } + + return _enqueueOutstandingQuery(lk, criteria, deadline); + } +} + +SemiFuture> StreamableReplicaSetMonitor::_enqueueOutstandingQuery( + WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline) { + using HostAndPortList = std::vector; + Future result; + + auto query = std::make_shared(); + query->criteria = criteria; + query->deadline = deadline; + + auto pf = makePromiseFuture(); + query->promise = std::move(pf.promise); + + auto deadlineCb = + [this, query, self = shared_from_this()](const TaskExecutor::CallbackArgs& cbArgs) { + stdx::lock_guard lock(_mutex); + if (query->done) { + return; + } + + const auto cbStatus = cbArgs.status; + if (!cbStatus.isOK()) { + query->promise.setError(cbStatus); + query->done = true; + return; + } + + const auto errorStatus = _makeUnsatisfiedReadPrefError(query->criteria); + query->promise.setError(errorStatus); + query->done = true; + LOG(kDefaultLogLevel) << _logPrefix() + << "host selection timeout: " << errorStatus.toString(); + }; + auto swDeadlineHandle = _executor->scheduleWorkAt(query->deadline, deadlineCb); + + if (!swDeadlineHandle.isOK()) { + log() << "error scheduling deadline handler: " << swDeadlineHandle.getStatus(); + return SemiFuture::makeReady(swDeadlineHandle.getStatus()); + } + query->deadlineHandle = swDeadlineHandle.getValue(); + _outstandingQueries.push_back(query); + + // Send topology changes to the query processor to satisfy the future. + // It will be removed as a listener when all waiting queries have been satisfied. + _eventsPublisher->registerListener(_queryProcessor); + + return std::move(pf.future).semi(); +} + +boost::optional> StreamableReplicaSetMonitor::_getHosts( + const TopologyDescriptionPtr& topology, const ReadPreferenceSetting& criteria) { + auto result = _serverSelector->selectServers(topology, criteria); + if (!result) + return boost::none; + return _extractHosts(*result); +} + +boost::optional> StreamableReplicaSetMonitor::_getHosts( + const ReadPreferenceSetting& criteria) { + return _getHosts(_currentTopology(), criteria); } HostAndPort StreamableReplicaSetMonitor::getMasterOrUassert() { - MONGO_UNREACHABLE; + return getHostOrRefresh(kPrimaryOnlyReadPreference).get(); } -void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {} + +void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) { + failedHost(host, BSONObj(), status); +} + +void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, BSONObj bson, const Status& status) { + IsMasterOutcome outcome(host.toString(), bson, status.toString()); + _topologyManager->onServerDescription(outcome); +} + +boost::optional StreamableReplicaSetMonitor::_currentPrimary() const { + return _currentTopology()->getPrimary(); +} + bool StreamableReplicaSetMonitor::isPrimary(const HostAndPort& host) const { - MONGO_UNREACHABLE; + const auto currentPrimary = _currentPrimary(); + return (currentPrimary ? (*currentPrimary)->getAddress() == host.toString() : false); } bool StreamableReplicaSetMonitor::isHostUp(const HostAndPort& host) const { - MONGO_UNREACHABLE; + auto currentTopology = _currentTopology(); + const auto& serverDescription = currentTopology->findServerByAddress(host.toString()); + return serverDescription && (*serverDescription)->getType() != ServerType::kUnknown; } int StreamableReplicaSetMonitor::getMinWireVersion() const { - MONGO_UNREACHABLE; + auto currentTopology = _currentTopology(); + const std::vector& servers = currentTopology->getServers(); + if (servers.size() > 0) { + const auto& serverDescription = + *std::min_element(servers.begin(), servers.end(), minWireCompare); + return serverDescription->getMinWireVersion(); + } else { + return 0; + } } int StreamableReplicaSetMonitor::getMaxWireVersion() const { - MONGO_UNREACHABLE; + auto currentTopology = _currentTopology(); + const std::vector& servers = currentTopology->getServers(); + if (servers.size() > 0) { + const auto& serverDescription = + *std::max_element(servers.begin(), servers.end(), maxWireCompare); + return serverDescription->getMaxWireVersion(); + } else { + return std::numeric_limits::max(); + } } std::string StreamableReplicaSetMonitor::getName() const { - MONGO_UNREACHABLE; + return _uri.getSetName(); } std::string StreamableReplicaSetMonitor::getServerAddress() const { - MONGO_UNREACHABLE; + const auto topologyDescription = _currentTopology(); + const auto servers = topologyDescription->getServers(); + + std::stringstream output; + output << _uri.getSetName() << "/"; + + for (const auto& server : servers) { + output << server->getAddress(); + if (&server != &servers.back()) + output << ","; + } + + auto result = output.str(); + return output.str(); } const MongoURI& StreamableReplicaSetMonitor::getOriginalUri() const { - MONGO_UNREACHABLE; -}; -bool StreamableReplicaSetMonitor::contains(const HostAndPort& server) const { - MONGO_UNREACHABLE; + return _uri; } -void StreamableReplicaSetMonitor::appendInfo(BSONObjBuilder& b, bool forFTDC) const { - MONGO_UNREACHABLE; +bool StreamableReplicaSetMonitor::contains(const HostAndPort& host) const { + return static_cast(_currentTopology()->findServerByAddress(host.toString())); +} + +void StreamableReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const { + auto topologyDescription = _currentTopology(); + + BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName())); + if (forFTDC) { + for (auto serverDescription : topologyDescription->getServers()) { + monitorInfo.appendNumber(serverDescription->getAddress(), + pingTimeMillis(serverDescription)); + } + return; + } + + // NOTE: the format here must be consistent for backwards compatibility + BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts")); + for (const auto& serverDescription : topologyDescription->getServers()) { + bool isUp = false; + bool isMaster = false; + bool isSecondary = false; + bool isHidden = false; + + switch (serverDescription->getType()) { + case ServerType::kRSPrimary: + isUp = true; + isMaster = true; + break; + case ServerType::kRSSecondary: + isUp = true; + isSecondary = true; + break; + case ServerType::kStandalone: + isUp = true; + break; + case ServerType::kMongos: + isUp = true; + break; + case ServerType::kRSGhost: + isHidden = true; + break; + case ServerType::kRSArbiter: + isHidden = true; + break; + default: + break; + } + + BSONObjBuilder builder(hosts.subobjStart()); + builder.append("addr", serverDescription->getAddress()); + builder.append("ok", isUp); + builder.append("ismaster", isMaster); // intentionally not camelCase + builder.append("hidden", isHidden); + builder.append("secondary", isSecondary); + builder.append("pingTimeMillis", pingTimeMillis(serverDescription)); + + if (serverDescription->getTags().size()) { + BSONObjBuilder tagsBuilder(builder.subobjStart("tags")); + serverDescription->appendBsonTags(tagsBuilder); + } + } } bool StreamableReplicaSetMonitor::isKnownToHaveGoodPrimary() const { - MONGO_UNREACHABLE; + return static_cast(_currentPrimary()); +} + +sdam::TopologyDescriptionPtr StreamableReplicaSetMonitor::_currentTopology() const { + return _topologyManager->getTopologyDescription(); +} + +void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent( + UUID topologyId, + TopologyDescriptionPtr previousDescription, + TopologyDescriptionPtr newDescription) { + + // notify external components, if there are membership + // changes in the topology. + if (_hasMembershipChange(previousDescription, newDescription)) { + LOG(kDefaultLogLevel) << _logPrefix() << "Topology Change: " << newDescription->toString(); + + // TODO: remove when HostAndPort conversion is done. + std::vector servers = _extractHosts(newDescription->getServers()); + + auto connectionString = ConnectionString::forReplicaSet(getName(), servers); + auto maybePrimary = newDescription->getPrimary(); + if (maybePrimary) { + // TODO: remove need for HostAndPort conversion + auto hostList = _extractHosts(newDescription->findServers(secondaryPredicate)); + std::set secondaries(hostList.begin(), hostList.end()); + + auto primaryAddress = HostAndPort((*maybePrimary)->getAddress()); + ReplicaSetMonitorManager::get()->getNotifier().onConfirmedSet( + connectionString, primaryAddress, secondaries); + } else { + ReplicaSetMonitorManager::get()->getNotifier().onPossibleSet(connectionString); + } + } +} + +void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs, + const ServerAddress& hostAndPort, + const BSONObj reply) { + IsMasterOutcome outcome(hostAndPort, reply, durationMs); + _topologyManager->onServerDescription(outcome); } +void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT durationMs, + Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply) { + IsMasterOutcome outcome(hostAndPort, reply, errorStatus.toString()); + _topologyManager->onServerDescription(outcome); +} + +void StreamableReplicaSetMonitor::onServerPingFailedEvent(const ServerAddress& hostAndPort, + const Status& status) { + failedHost(HostAndPort(hostAndPort), status); +} + +void StreamableReplicaSetMonitor::onServerPingSucceededEvent(sdam::IsMasterRTT durationMS, + const ServerAddress& hostAndPort) { + _topologyManager->onServerRTTUpdated(hostAndPort, durationMS); +} + +std::string StreamableReplicaSetMonitor::_logPrefix() { + return str::stream() << kLogPrefix << " [" << getName() << "] "; +} + +void StreamableReplicaSetMonitor::_failOutstandingWithStatus(WithLock, Status status) { + for (const auto& query : _outstandingQueries) { + if (query->done) + continue; + + query->done = true; + _executor->cancel(query->deadlineHandle); + query->promise.setError(status); + } + _outstandingQueries.clear(); +} + +bool StreamableReplicaSetMonitor::_hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription, + sdam::TopologyDescriptionPtr newDescription) { + + if (oldDescription->getServers().size() != newDescription->getServers().size()) + return true; + + for (const auto& server : oldDescription->getServers()) { + const auto newServer = newDescription->findServerByAddress(server->getAddress()); + if (!newServer) + return true; + const ServerDescription& s = *server; + const ServerDescription& ns = **newServer; + if (s != ns) + return true; + } + + for (const auto& server : newDescription->getServers()) { + auto oldServer = oldDescription->findServerByAddress(server->getAddress()); + if (!oldServer) + return true; + } + + return false; +} + +void StreamableReplicaSetMonitor::_processOutstanding(const TopologyDescriptionPtr& topologyDescription) { + // TODO: refactor so that we don't call _getHost(s) for every outstanding query + // since there might be duplicates. + stdx::lock_guard lock(_mutex); + + bool shouldRemove; + auto it = _outstandingQueries.begin(); + while (it != _outstandingQueries.end()) { + auto& query = *it; + shouldRemove = false; + + if (query->done) { + shouldRemove = true; + } else { + auto result = _getHosts(topologyDescription, query->criteria); + if (result) { + _executor->cancel(query->deadlineHandle); + query->done = true; + query->promise.emplaceValue(std::move(*result)); + LOG(kDefaultLogLevel) + << _logPrefix() + << "finish getHosts: " << readPrefToStringWithMinOpTime(query->criteria) << " (" + << _executor->now() - query->start << ")"; + shouldRemove = true; + } + } + + it = (shouldRemove) ? _outstandingQueries.erase(it) : ++it; + } + + if (_outstandingQueries.size()) { + // enable expedited mode + _isMasterMonitor->requestImmediateCheck(); + } else { + // if no more outstanding queries, no need to listen for topology changes in + // this monitor. + _eventsPublisher->removeListener(_queryProcessor); + } +} + +Status StreamableReplicaSetMonitor::_makeUnsatisfiedReadPrefError( + const ReadPreferenceSetting& criteria) const { + return Status(ErrorCodes::FailedToSatisfyReadPreference, + str::stream() << "Could not find host matching read preference " + << criteria.toString() << " for set " << getName()); +} + +Status StreamableReplicaSetMonitor::_makeReplicaSetMonitorRemovedError() const { + return Status(ErrorCodes::ReplicaSetMonitorRemoved, + str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"); +} } // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index 8c27301660e..9fbdf858f6e 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -38,7 +38,10 @@ #include "mongo/client/mongo_uri.h" #include "mongo/client/replica_set_change_notifier.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/client/sdam/sdam.h" +#include "mongo/client/server_is_master_monitor.h" #include "mongo/executor/task_executor.h" +#include "mongo/logger/log_component.h" #include "mongo/util/concurrency/with_lock.h" #include "mongo/util/duration.h" #include "mongo/util/net/hostandport.h" @@ -46,48 +49,162 @@ namespace mongo { -class StreamableReplicaSetMonitor : public ReplicaSetMonitor { - StreamableReplicaSetMonitor(const StreamableReplicaSetMonitor&) = delete; - StreamableReplicaSetMonitor& operator=(const StreamableReplicaSetMonitor&) = delete; +class BSONObj; +class ReplicaSetMonitor; +class ReplicaSetMonitorTest; +struct ReadPreferenceSetting; +using ReplicaSetMonitorPtr = std::shared_ptr; + +/** + * Replica set monitor implementation backed by the classes in the mongo::sdam namespace. + * + * All methods perform the required synchronization to allow callers from multiple threads. + */ +class StreamableReplicaSetMonitor : + public ReplicaSetMonitor, + public sdam::TopologyListener, + public std::enable_shared_from_this { + + StreamableReplicaSetMonitor(const ReplicaSetMonitor&) = delete; + StreamableReplicaSetMonitor& operator=(const ReplicaSetMonitor&) = delete; public: - StreamableReplicaSetMonitor(const MongoURI& uri); + class Refresher; + + static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500); + static constexpr auto kCheckTimeout = Seconds(5); + + StreamableReplicaSetMonitor(const MongoURI& uri, + std::shared_ptr executor); + + void init(); - void init() override; + void drop(); - void drop() override; + static ReplicaSetMonitorPtr make(const MongoURI& uri, + std::shared_ptr executor = nullptr); - SemiFuture getHostOrRefresh( - const ReadPreferenceSetting& readPref, - Milliseconds maxWait = kDefaultFindHostTimeout) override; + SemiFuture getHostOrRefresh(const ReadPreferenceSetting& readPref, + Milliseconds maxWait = kDefaultFindHostTimeout); SemiFuture> getHostsOrRefresh( - const ReadPreferenceSetting& readPref, - Milliseconds maxWait = kDefaultFindHostTimeout) override; + const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout); + + HostAndPort getMasterOrUassert(); + + void failedHost(const HostAndPort& host, const Status& status); + void failedHost(const HostAndPort& host, BSONObj bson, const Status& status); + + bool isPrimary(const HostAndPort& host) const; + + bool isHostUp(const HostAndPort& host) const; + + int getMinWireVersion() const; + + int getMaxWireVersion() const; + + std::string getName() const; + + std::string getServerAddress() const; + + const MongoURI& getOriginalUri() const; + + bool contains(const HostAndPort& server) const; + + void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const; + + bool isKnownToHaveGoodPrimary() const; + +private: + class StreamableReplicaSetMonitorQueryProcessor; + using StreamableReplicaSetMontiorQueryProcessorPtr = + std::shared_ptr; + + struct HostQuery { + Date_t deadline; + executor::TaskExecutor::CallbackHandle deadlineHandle; + ReadPreferenceSetting criteria; + Date_t start = Date_t::now(); + bool done = false; + Promise> promise; + }; + using HostQueryPtr = std::shared_ptr; + + SemiFuture> _enqueueOutstandingQuery( + WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline); + + std::vector _extractHosts( + const std::vector& serverDescriptions); + boost::optional> _getHosts(const TopologyDescriptionPtr& topology, + const ReadPreferenceSetting& criteria); + boost::optional> _getHosts(const ReadPreferenceSetting& criteria); + + void onTopologyDescriptionChangedEvent(UUID topologyId, + sdam::TopologyDescriptionPtr previousDescription, + sdam::TopologyDescriptionPtr newDescription) override; + + void onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs, + const sdam::ServerAddress& hostAndPort, + const BSONObj reply) override; + + void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, + Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply) override; + + void onServerPingFailedEvent(const sdam::ServerAddress& hostAndPort, + const Status& status) override; + + void onServerPingSucceededEvent(sdam::IsMasterRTT durationMS, + const sdam::ServerAddress& hostAndPort) override; + + // Get a pointer to the current primary's ServerDescription + // To ensure a consistent view of the Topology either _currentPrimary or _currentTopology should + // be called (not both) since the topology can change between the function invocations. + boost::optional _currentPrimary() const; - HostAndPort getMasterOrUassert() override; + // Get the current TopologyDescription + // Note that most functions will want to save the result of this function once per computation + // so that we are operating on a consistent read-only view of the topology. + sdam::TopologyDescriptionPtr _currentTopology() const; - void failedHost(const HostAndPort& host, const Status& status) override; + std::string _logPrefix(); - bool isPrimary(const HostAndPort& host) const override; + void _failOutstandingWithStatus(WithLock, Status status); + bool _hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription, + sdam::TopologyDescriptionPtr newDescription); - bool isHostUp(const HostAndPort& host) const override; + Status _makeUnsatisfiedReadPrefError(const ReadPreferenceSetting& criteria) const; + Status _makeReplicaSetMonitorRemovedError() const; - int getMinWireVersion() const override; + // Try to satisfy the outstanding queries for this instance with the given topology information. + void _processOutstanding(const TopologyDescriptionPtr& topologyDescription); - int getMaxWireVersion() const override; + sdam::SdamConfiguration _sdamConfig; + sdam::TopologyManagerPtr _topologyManager; + sdam::ServerSelectorPtr _serverSelector; + sdam::TopologyEventsPublisherPtr _eventsPublisher; + ServerIsMasterMonitorPtr _isMasterMonitor; - std::string getName() const override; + // This object will be registered as a TopologyListener if there are + // any outstanding queries for this RSM instance. + StreamableReplicaSetMontiorQueryProcessorPtr _queryProcessor; - std::string getServerAddress() const override; + const MongoURI _uri; - const MongoURI& getOriginalUri() const override; + std::shared_ptr _executor; - bool contains(const HostAndPort& server) const override; + AtomicWord _isDropped{true}; - void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override; + mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicaSetMonitor"); + std::vector _outstandingQueries; + mutable PseudoRandom _random; - bool isKnownToHaveGoodPrimary() const override; + static inline const auto kServerSelectionConfig = + sdam::ServerSelectionConfiguration::defaultConfiguration(); + static inline const auto kDefaultLogLevel = logger::LogSeverity::Debug(1); + static inline const auto kLowerLogLevel = kDefaultLogLevel.lessSevere(); + static constexpr auto kLogPrefix = "[ReplicaSetMonitor]"; }; } // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp b/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp new file mode 100644 index 00000000000..5cadf92266f --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/client/streamable_replica_set_monitor_query_processor.h" + +#include + +#include "mongo/client/global_conn_pool.h" +#include "mongo/util/log.h" + +namespace mongo { +void StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor::shutdown() { + stdx::lock_guard lock(_mutex); + _isShutdown = true; +} + +void StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor:: + onTopologyDescriptionChangedEvent(UUID topologyId, + sdam::TopologyDescriptionPtr previousDescription, + sdam::TopologyDescriptionPtr newDescription) { + { + stdx::lock_guard lock(_mutex); + if (_isShutdown) + return; + } + + const auto& setName = newDescription->getSetName(); + if (setName) { + auto replicaSetMonitor = std::static_pointer_cast(globalRSMonitorManager.getMonitor(*setName)); + if (!replicaSetMonitor) { + LOG(kLogLevel) << "could not find rsm instance " << *setName + << " for query processing."; + return; + } + replicaSetMonitor->_processOutstanding(newDescription); + } + + // No set name occurs when there is an error monitoring isMaster replies (e.g. HostUnreachable). + // There is nothing to do in that case. +} +}; // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor_query_processor.h b/src/mongo/client/streamable_replica_set_monitor_query_processor.h new file mode 100644 index 00000000000..bac2d42cc45 --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor_query_processor.h @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/client/streamable_replica_set_monitor.h" +#include "mongo/client/sdam/sdam.h" + +namespace mongo { +class StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor final : public sdam::TopologyListener { +public: + void shutdown(); + + void onTopologyDescriptionChangedEvent(UUID topologyId, + sdam::TopologyDescriptionPtr previousDescription, + sdam::TopologyDescriptionPtr newDescription) override; + +private: + static inline const logger::LogSeverity kLogLevel = logger::LogSeverity::Debug(1); + + mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicaSetMonitorQueryProcessor"); + bool _isShutdown = false; +}; +} // namespace mongo -- cgit v1.2.1