diff options
author | LaMont Nelson <lamont.nelson@mongodb.com> | 2021-05-18 00:50:07 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-28 19:14:14 +0000 |
commit | 8ddcaf878b5600fdac322929cd337c6f4563bf19 (patch) | |
tree | 00e31e4a692f1fb2af772b1db3279bbfaacfcb46 | |
parent | 605f0cc7bbbc9bce090e3316e3db22ec043e64af (diff) | |
download | mongo-8ddcaf878b5600fdac322929cd337c6f4563bf19.tar.gz |
SERVER-56854: Use executor and enforce timeout when making hello requests in the ReplicaSetMonitor
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/client/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.cpp | 7 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs_test.cpp | 11 | ||||
-rw-r--r-- | src/mongo/client/remote_command_targeter_rs.cpp | 3 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.cpp | 92 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.h | 32 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.cpp | 60 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.h | 12 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_test.cpp | 54 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_transport.cpp | 152 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_transport.h | 101 | ||||
-rw-r--r-- | src/mongo/dbtests/replica_set_monitor_test.cpp | 12 |
13 files changed, 446 insertions, 94 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 3b34dc578aa..8c36abbdba7 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -485,6 +485,7 @@ if not has_option('noshell') and usemozjs: 'shell/mongojs', 'transport/message_compressor', 'transport/transport_layer_manager', + 'util/concurrency/thread_pool', 'util/net/network', 'util/options_parser/options_parser_init', 'util/processinfo', diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index c5a54ee2b5c..0f8b225c8ab 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -186,6 +186,7 @@ clientDriverEnv.Library( 'global_conn_pool.cpp', 'replica_set_monitor.cpp', 'replica_set_monitor_manager.cpp', + 'replica_set_monitor_transport.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/write_concern_options', @@ -194,6 +195,7 @@ clientDriverEnv.Library( '$BUILD_DIR/mongo/executor/network_interface_thread_pool', '$BUILD_DIR/mongo/executor/thread_pool_task_executor', '$BUILD_DIR/mongo/util/background_job', + '$BUILD_DIR/mongo/util/concurrency/thread_pool', '$BUILD_DIR/mongo/util/md5', '$BUILD_DIR/mongo/util/net/network', 'clientdriver_minimal', @@ -295,6 +297,7 @@ env.CppUnitTest('dbclient_rs_test', LIBDEPS=[ 'clientdriver_network', '$BUILD_DIR/mongo/dbtests/mocklib', + '$BUILD_DIR/mongo/db/service_context_test_fixture', ], ) diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index 7a84a98210d..57ebdd08940 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -139,11 +139,12 @@ DBClientReplicaSet::DBClientReplicaSet(const string& name, _applicationName(applicationName.toString()), _so_timeout(so_timeout), _uri(std::move(uri)) { + auto rsmTransport = std::make_unique<ReplicaSetMonitorDbClientTransport>(); if (_uri.isValid()) { - _rsm = ReplicaSetMonitor::createIfNeeded(_uri); + _rsm = ReplicaSetMonitor::createIfNeeded(_uri, std::move(rsmTransport)); } else { - _rsm = ReplicaSetMonitor::createIfNeeded(name, - set<HostAndPort>(servers.begin(), servers.end())); + _rsm = ReplicaSetMonitor::createIfNeeded( + name, set<HostAndPort>(servers.begin(), servers.end()), std::move(rsmTransport)); } } diff --git a/src/mongo/client/dbclient_rs_test.cpp b/src/mongo/client/dbclient_rs_test.cpp index d3ae470d914..6bb72a5cf98 100644 --- a/src/mongo/client/dbclient_rs_test.cpp +++ b/src/mongo/client/dbclient_rs_test.cpp @@ -47,6 +47,7 @@ #include "mongo/client/dbclientinterface.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/jsobj.h" +#include "mongo/db/service_context_test_fixture.h" #include "mongo/dbtests/mock/mock_conn_registry.h" #include "mongo/dbtests/mock/mock_replica_set.h" #include "mongo/stdx/unordered_set.h" @@ -78,7 +79,7 @@ BSONObj makeMetadata(ReadPreference rp, TagSet tagSet) { /** * Basic fixture with one primary and one secondary. */ -class BasicRS : public unittest::Test { +class BasicRS : public ServiceContextTest { protected: void setUp() { ReplicaSetMonitor::cleanup(); @@ -202,7 +203,7 @@ TEST_F(BasicRS, CommandSecondaryPreferred) { /** * Setup for 2 member replica set will all of the nodes down. */ -class AllNodesDown : public unittest::Test { +class AllNodesDown : public ServiceContextTest { protected: void setUp() { ReplicaSetMonitor::cleanup(); @@ -310,7 +311,7 @@ TEST_F(AllNodesDown, CommandNearest) { /** * Setup for 2 member replica set with the primary down. */ -class PrimaryDown : public unittest::Test { +class PrimaryDown : public ServiceContextTest { protected: void setUp() { ReplicaSetMonitor::cleanup(); @@ -416,7 +417,7 @@ TEST_F(PrimaryDown, Nearest) { /** * Setup for 2 member replica set with the secondary down. */ -class SecondaryDown : public unittest::Test { +class SecondaryDown : public ServiceContextTest { protected: void setUp() { ReplicaSetMonitor::cleanup(); @@ -527,7 +528,7 @@ TEST_F(SecondaryDown, CommandNearest) { * Warning: Tests running this fixture cannot be run in parallel with other tests * that uses ConnectionString::setConnectionHook */ -class TaggedFiveMemberRS : public unittest::Test { +class TaggedFiveMemberRS : public ServiceContextTest { protected: void setUp() { // Tests for pinning behavior require this. diff --git a/src/mongo/client/remote_command_targeter_rs.cpp b/src/mongo/client/remote_command_targeter_rs.cpp index c8b7f8b551a..7ba719cc35f 100644 --- a/src/mongo/client/remote_command_targeter_rs.cpp +++ b/src/mongo/client/remote_command_targeter_rs.cpp @@ -36,6 +36,7 @@ #include "mongo/base/status_with.h" #include "mongo/client/connection_string.h" +#include "mongo/client/global_conn_pool.h" #include "mongo/client/read_preference.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/operation_context.h" @@ -51,7 +52,7 @@ RemoteCommandTargeterRS::RemoteCommandTargeterRS(const std::string& rsName, : _rsName(rsName) { std::set<HostAndPort> seedServers(seedHosts.begin(), seedHosts.end()); - _rsMonitor = ReplicaSetMonitor::createIfNeeded(rsName, seedServers); + _rsMonitor = ReplicaSetMonitor::createIfNeeded(rsName, seedServers, nullptr); LOG(1) << "Started targeter for " << ConnectionString::forReplicaSet( diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index c935ef00872..7d01c32b2c5 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -46,6 +46,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/bson_extract_optime.h" #include "mongo/db/server_options.h" +#include "mongo/s/is_mongos.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" @@ -71,6 +72,9 @@ MONGO_FAIL_POINT_DEFINE(failAsyncConfigChangeHook); // Failpoint for changing the default refresh period MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod); +// Failpoint for changing the default socket timeout for Hello command. +MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorHelloTimeout); + namespace { // Pull nested types to top-level scope @@ -97,6 +101,8 @@ static constexpr int kRsmVerbosityThresholdTimeoutSec = 20; // When 'is master' reply latency is over 2 sec, it will be logged. static constexpr int kSlowIsMasterThresholdMicros = 2L * 1000 * 1000; +static Seconds kHelloTimeout = Seconds{10}; + const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet()); const Milliseconds kFindHostMaxBackOffTime(500); AtomicBool areRefreshRetriesDisabledForTest{false}; // Only true in tests. @@ -192,22 +198,43 @@ const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15); // Defaults to random selection as required by the spec bool ReplicaSetMonitor::useDeterministicHostSelection = false; -Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() { +Seconds ReplicaSetMonitor::getRefreshPeriod() { MONGO_FAIL_POINT_BLOCK_IF(modifyReplicaSetMonitorDefaultRefreshPeriod, data, [&](const BSONObj& data) { return data.hasField("period"); }) { - return Seconds{data.getData().getIntField("period")}; + auto result = Seconds{data.getData().getIntField("period")}; + log() << "Modified the default replica set monitor refresh period via failpoint to " + << result; + return result; } return kDefaultRefreshPeriod; } -ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds) +Milliseconds ReplicaSetMonitor::getHelloTimeout() { + MONGO_FAIL_POINT_BLOCK_IF(modifyReplicaSetMonitorHelloTimeout, data, [&](const BSONObj& data) { + return data.hasField("period"); + }) { + auto result = Milliseconds{data.getData().getIntField("period")}; + log() << "Modified the replica set monitor request timeout via failpoint to " << result; + return result; + } + + return duration_cast<Milliseconds>(kHelloTimeout); +} + +ReplicaSetMonitor::ReplicaSetMonitor(StringData name, + const std::set<HostAndPort>& seeds, + ReplicaSetMonitorTransportPtr transport) : _state(std::make_shared<SetState>(name, seeds)), - _executor(globalRSMonitorManager.getExecutor()) {} + _executor(globalRSMonitorManager.getExecutor()), + _rsmTransport(std::move(transport)) {} -ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri) - : _state(std::make_shared<SetState>(uri)), _executor(globalRSMonitorManager.getExecutor()) {} +ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri, + std::unique_ptr<ReplicaSetMonitorTransport> transport) + : _state(std::make_shared<SetState>(uri)), + _executor(globalRSMonitorManager.getExecutor()), + _rsmTransport(std::move(transport)) {} void ReplicaSetMonitor::init() { _scheduleRefresh(_executor->now()); @@ -325,7 +352,7 @@ HostAndPort ReplicaSetMonitor::getMasterOrUassert() { Refresher ReplicaSetMonitor::startOrContinueRefresh() { stdx::lock_guard<stdx::mutex> lk(_state->mutex); - Refresher out(_state); + Refresher out(_state, _executor, _rsmTransport.get()); DEV _state->checkInvariants(); return out; } @@ -394,14 +421,16 @@ bool ReplicaSetMonitor::contains(const HostAndPort& host) const { return _state->seedNodes.count(host); } -shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const string& name, - const set<HostAndPort>& servers) { +shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded( + const string& name, const set<HostAndPort>& servers, ReplicaSetMonitorTransportPtr transport) { return globalRSMonitorManager.getOrCreateMonitor( - ConnectionString::forReplicaSet(name, vector<HostAndPort>(servers.begin(), servers.end()))); + ConnectionString::forReplicaSet(name, vector<HostAndPort>(servers.begin(), servers.end())), + std::move(transport)); } -shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const MongoURI& uri) { - return globalRSMonitorManager.getOrCreateMonitor(uri); +shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded( + const MongoURI& uri, ReplicaSetMonitorTransportPtr transport) { + return globalRSMonitorManager.getOrCreateMonitor(uri, std::move(transport)); } shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::get(const std::string& name) { @@ -490,7 +519,10 @@ void ReplicaSetMonitor::markAsRemoved() { _isRemovedFromManager.store(true); } -Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) { +Refresher::Refresher(const SetStatePtr& setState, + executor::TaskExecutor* executor, + ReplicaSetMonitorTransport* transport) + : _set(setState), _scan(setState->currentScan), _executor(executor), _rsmTransport(transport) { if (_scan) return; // participate in in-progress scan @@ -865,32 +897,18 @@ HostAndPort Refresher::_refreshUntilMatches(const ReadPreferenceSetting* criteri continue; case NextStep::CONTACT_HOST: { - StatusWith<BSONObj> isMasterReplyStatus{ErrorCodes::InternalError, - "Uninitialized variable"}; + StatusWith<BSONObj> helloReplyStatus{ErrorCodes::InternalError, + "Uninitialized variable"}; int64_t pingMicros = 0; - MongoURI targetURI; - - if (_set->setUri.isValid()) { - targetURI = _set->setUri.cloneURIForServer(ns.host, ""); - targetURI.setUser(""); - targetURI.setPassword(""); - } else { - targetURI = MongoURI(ConnectionString(ns.host)); - } // Do not do network calls while holding a mutex lk.unlock(); - try { - ScopedDbConnection conn(targetURI, socketTimeoutSecs); - bool ignoredOutParam = false; + { Timer timer; - BSONObj reply; - conn->isMaster(ignoredOutParam, &reply); - isMasterReplyStatus = reply; + auto helloFuture = _rsmTransport->sayHello( + ns.host, _set->name, _set->setUri, getHelloTimeout()); + helloReplyStatus = helloFuture.getNoThrow(); pingMicros = timer.micros(); - conn.done(); // return to pool on success. - } catch (const DBException& ex) { - isMasterReplyStatus = ex.toStatus(); } lk.lock(); @@ -900,10 +918,10 @@ HostAndPort Refresher::_refreshUntilMatches(const ReadPreferenceSetting* criteri return criteria ? _set->getMatchingHost(*criteria) : HostAndPort(); } - if (isMasterReplyStatus.isOK()) - receivedIsMaster(ns.host, pingMicros, isMasterReplyStatus.getValue(), verbose); + if (helloReplyStatus.isOK()) + receivedIsMaster(ns.host, pingMicros, helloReplyStatus.getValue(), verbose); else - failedHost(ns.host, isMasterReplyStatus.getStatus()); + failedHost(ns.host, helloReplyStatus.getStatus()); } } @@ -1069,7 +1087,7 @@ SetState::SetState(StringData name, const std::set<HostAndPort>& seedNodes, Mong rand(std::random_device()()), roundRobin(0), setUri(std::move(uri)), - refreshPeriod(getDefaultRefreshPeriod()) { + refreshPeriod(getRefreshPeriod()) { uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty()); if (name.empty()) diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h index 4b5d811a673..115696c04ca 100644 --- a/src/mongo/client/replica_set_monitor.h +++ b/src/mongo/client/replica_set_monitor.h @@ -39,11 +39,13 @@ #include "mongo/base/disallow_copying.h" #include "mongo/base/string_data.h" #include "mongo/client/mongo_uri.h" +#include "mongo/client/replica_set_monitor_transport.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/functional.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" +#include "mongo/util/timer.h" namespace mongo { @@ -70,9 +72,11 @@ public: * * seeds must not be empty. */ - ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds); + ReplicaSetMonitor(StringData name, + const std::set<HostAndPort>& seeds, + ReplicaSetMonitorTransportPtr transport); - ReplicaSetMonitor(const MongoURI& uri); + ReplicaSetMonitor(const MongoURI& uri, ReplicaSetMonitorTransportPtr transport); /** * Schedules the initial refresh task into task executor. @@ -186,10 +190,13 @@ public: /** * Creates a new ReplicaSetMonitor, if it doesn't already exist. */ - static std::shared_ptr<ReplicaSetMonitor> createIfNeeded(const std::string& name, - const std::set<HostAndPort>& servers); + static std::shared_ptr<ReplicaSetMonitor> createIfNeeded( + const std::string& name, + const std::set<HostAndPort>& servers, + ReplicaSetMonitorTransportPtr transport); - static std::shared_ptr<ReplicaSetMonitor> createIfNeeded(const MongoURI& uri); + static std::shared_ptr<ReplicaSetMonitor> createIfNeeded( + const MongoURI& uri, ReplicaSetMonitorTransportPtr transport); /** * gets a cached Monitor per name. If the monitor is not found and createFromSeed is false, @@ -249,7 +256,12 @@ public: /** * Returns the refresh period that is given to all new SetStates. */ - static Seconds getDefaultRefreshPeriod(); + static Seconds getRefreshPeriod(); + + /** + * Returns the timeout for a single hello request. + */ + static Milliseconds getHelloTimeout(); // // internal types (defined in replica_set_monitor_internal.h) @@ -300,6 +312,7 @@ private: const SetStatePtr _state; executor::TaskExecutor* _executor; AtomicBool _isRemovedFromManager{false}; + ReplicaSetMonitorTransportPtr _rsmTransport; }; @@ -342,7 +355,9 @@ public: * * If no scan is in-progress, this function is responsible for setting up a new scan. */ - explicit Refresher(const SetStatePtr& setState); + explicit Refresher(const SetStatePtr& setState, + executor::TaskExecutor* executor, + ReplicaSetMonitorTransport* transport); struct NextStep { enum StepKind { @@ -418,6 +433,9 @@ private: // Both pointers are never NULL SetStatePtr _set; ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started. + + executor::TaskExecutor* _executor; + ReplicaSetMonitorTransport* _rsmTransport; }; } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index d6de2153486..7345fd95fc8 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -38,6 +38,7 @@ #include "mongo/client/connection_string.h" #include "mongo/client/mongo_uri.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/db/client.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" @@ -47,6 +48,7 @@ #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/log.h" #include "mongo/util/map_util.h" @@ -63,6 +65,11 @@ using executor::TaskExecutorPool; using executor::TaskExecutor; using executor::ThreadPoolTaskExecutor; +namespace { +const int kMaxRsmThreads = 1024; +const int kMaxRsmConnectionsPerHost = 128; +} + ReplicaSetMonitorManager::ReplicaSetMonitorManager() {} ReplicaSetMonitorManager::~ReplicaSetMonitorManager() { @@ -79,17 +86,46 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData se } } +auto makeThreadPool(const std::string& poolName) { + ThreadPool::Options threadPoolOptions; + threadPoolOptions.poolName = poolName; + + // Two threads are for the Scan and the hello request. + threadPoolOptions.minThreads = 2; + + // This setting is a hedge against the issue described in + // SERVER-56854. Generally an RSM instance will use 1 thread + // to make the hello request. If there are delays in the network + // interface delivering the replies, the RSM will timeout and + // spawn additional threads to make progress. + threadPoolOptions.maxThreads = kMaxRsmThreads; + + threadPoolOptions.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + return stdx::make_unique<ThreadPool>(threadPoolOptions); +} + void ReplicaSetMonitorManager::_setupTaskExecutorInLock(const std::string& name) { auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); // do not restart taskExecutor if is in shutdown if (!_taskExecutor && !_isShutdown) { // construct task executor + auto threadName = "ReplicaSetMonitor-TaskExecutor"; + + // This is to limit the number of threads that a failed host can consume + // when there is a TCP blackhole. The RSM will attempt to contact the host repeatly + // spawning a new thread on each attempt if there is no response from the last attempt. + // Eventually, the connections will timeout according to TCP keepalive settings. + // See SERVER-56854 for more information. + executor::ConnectionPool::Options connPoolOptions; + connPoolOptions.maxConnections = kMaxRsmConnectionsPerHost; + auto net = executor::makeNetworkInterface( - "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList)); - auto netPtr = net.get(); - _taskExecutor = stdx::make_unique<ThreadPoolTaskExecutor>( - stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); + threadName, nullptr, std::move(hookList), connPoolOptions); + _taskExecutor = + stdx::make_unique<ThreadPoolTaskExecutor>(makeThreadPool(threadName), std::move(net)); LOG(1) << "Starting up task executor for monitoring replica sets in response to request to " "monitor set: " << redact(name); @@ -104,7 +140,7 @@ void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode } shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( - const ConnectionString& connStr) { + const ConnectionString& connStr, ReplicaSetMonitorTransportPtr transport) { invariant(connStr.type() == ConnectionString::SET); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -120,13 +156,17 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( log() << "Starting new replica set monitor for " << connStr.toString(); - auto newMonitor = std::make_shared<ReplicaSetMonitor>(setName, servers); + if (!transport) { + transport = makeRsmTransport(); + } + auto newMonitor = std::make_shared<ReplicaSetMonitor>(setName, servers, std::move(transport)); _monitors[setName] = newMonitor; newMonitor->init(); return newMonitor; } -shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const MongoURI& uri) { +shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( + const MongoURI& uri, ReplicaSetMonitorTransportPtr transport) { invariant(uri.type() == ConnectionString::SET); stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -140,7 +180,7 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const log() << "Starting new replica set monitor for " << uri.toString(); - auto newMonitor = std::make_shared<ReplicaSetMonitor>(uri); + auto newMonitor = std::make_shared<ReplicaSetMonitor>(uri, std::move(transport)); _monitors[setName] = newMonitor; newMonitor->init(); return newMonitor; @@ -230,4 +270,8 @@ TaskExecutor* ReplicaSetMonitorManager::getExecutor() { return _taskExecutor.get(); } + +ReplicaSetMonitorTransportPtr ReplicaSetMonitorManager::makeRsmTransport() { + return std::make_unique<ReplicaSetMonitorExecutorTransport>(getExecutor()); +} } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h index c4ee1b05b7a..89bee98cfc8 100644 --- a/src/mongo/client/replica_set_monitor_manager.h +++ b/src/mongo/client/replica_set_monitor_manager.h @@ -34,6 +34,7 @@ #include <vector> #include "mongo/base/disallow_copying.h" +#include "mongo/client/replica_set_monitor_transport.h" #include "mongo/executor/task_executor.h" #include "mongo/stdx/mutex.h" #include "mongo/util/string_map.h" @@ -60,8 +61,10 @@ public: * nullptr if there is no monitor registered for the particular replica set. */ std::shared_ptr<ReplicaSetMonitor> getMonitor(StringData setName); - std::shared_ptr<ReplicaSetMonitor> getOrCreateMonitor(const ConnectionString& connStr); - std::shared_ptr<ReplicaSetMonitor> getOrCreateMonitor(const MongoURI& uri); + std::shared_ptr<ReplicaSetMonitor> getOrCreateMonitor(const ConnectionString& connStr, + ReplicaSetMonitorTransportPtr transport); + std::shared_ptr<ReplicaSetMonitor> getOrCreateMonitor(const MongoURI& uri, + ReplicaSetMonitorTransportPtr transport); /** * Retrieves the names of all sets tracked by this manager. @@ -96,6 +99,11 @@ public: */ executor::TaskExecutor* getExecutor(); + /* + * Returns a transport that uses the ReplicaSetMonitorManager's executor to run commands + */ + ReplicaSetMonitorTransportPtr makeRsmTransport(); + private: using ReplicaSetMonitorsMap = StringMap<std::weak_ptr<ReplicaSetMonitor>>; diff --git a/src/mongo/client/replica_set_monitor_test.cpp b/src/mongo/client/replica_set_monitor_test.cpp index 02269794d79..7540854d9ce 100644 --- a/src/mongo/client/replica_set_monitor_test.cpp +++ b/src/mongo/client/replica_set_monitor_test.cpp @@ -347,7 +347,7 @@ TEST(ReplicaSetMonitor, IsMasterSecondaryWithTags) { TEST(ReplicaSetMonitor, CheckAllSeedsSerial) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); set<HostAndPort> seen; @@ -395,7 +395,7 @@ TEST(ReplicaSetMonitor, CheckAllSeedsSerial) { TEST(ReplicaSetMonitor, CheckAllSeedsParallel) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); set<HostAndPort> seen; @@ -453,7 +453,7 @@ TEST(ReplicaSetMonitor, CheckAllSeedsParallel) { TEST(ReplicaSetMonitor, NoMasterInitAllUp) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); set<HostAndPort> seen; @@ -500,7 +500,7 @@ TEST(ReplicaSetMonitor, NoMasterInitAllUp) { TEST(ReplicaSetMonitor, MasterNotInSeeds_NoPrimaryInIsMaster) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); set<HostAndPort> seen; @@ -577,7 +577,7 @@ TEST(ReplicaSetMonitor, MasterNotInSeeds_NoPrimaryInIsMaster) { TEST(ReplicaSetMonitor, MasterNotInSeeds_PrimaryInIsMaster) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); set<HostAndPort> seen; @@ -643,7 +643,7 @@ TEST(ReplicaSetMonitor, SlavesUsableEvenIfNoMaster) { std::set<HostAndPort> seeds; seeds.insert(HostAndPort("a")); SetStatePtr state = std::make_shared<SetState>("name", seeds); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet()); @@ -685,7 +685,7 @@ TEST(ReplicaSetMonitor, SlavesUsableEvenIfNoMaster) { // Test multiple nodes that claim to be master (we use a last-wins policy) TEST(ReplicaSetMonitor, MultipleMasterLastNodeWins) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); set<HostAndPort> seen; @@ -748,7 +748,7 @@ TEST(ReplicaSetMonitor, MultipleMasterLastNodeWins) { // Test nodes disagree about who is in the set, master is source of truth TEST(ReplicaSetMonitor, MasterIsSourceOfTruth) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); BSONArray primaryHosts = BSON_ARRAY("a" << "b" @@ -789,7 +789,7 @@ TEST(ReplicaSetMonitor, MasterIsSourceOfTruth) { // Test multiple master nodes that disagree about set membership TEST(ReplicaSetMonitor, MultipleMastersDisagree) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); BSONArray hostsForSeed[3]; hostsForSeed[0] = BSON_ARRAY("a" @@ -887,7 +887,7 @@ TEST(ReplicaSetMonitor, MultipleMastersDisagree) { // Ensure getMatchingHost returns hosts even if scan is ongoing TEST(ReplicaSetMonitor, GetMatchingDuringScan) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); const ReadPreferenceSetting primaryOnly(ReadPreference::PrimaryOnly, TagSet()); const ReadPreferenceSetting secondaryOnly(ReadPreference::SecondaryOnly, TagSet()); @@ -990,7 +990,7 @@ TEST(ReplicaSetMonitor, OutOfBandFailedHost) { // Newly elected primary with electionId >= maximum electionId seen by the Refresher TEST(ReplicaSetMonitorTests, NewPrimaryWithMaxElectionId) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); set<HostAndPort> seen; @@ -1056,7 +1056,7 @@ TEST(ReplicaSetMonitorTests, NewPrimaryWithMaxElectionId) { // Ignore electionId of secondaries TEST(ReplicaSetMonitorTests, IgnoreElectionIdFromSecondaries) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); set<HostAndPort> seen; @@ -1103,7 +1103,7 @@ TEST(ReplicaSetMonitorTests, IgnoreElectionIdFromSecondaries) { // Stale Primary with obsolete electionId TEST(ReplicaSetMonitorTests, StalePrimaryWithObsoleteElectionId) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); const OID firstElectionId = OID::gen(); const OID secondElectionId = OID::gen(); @@ -1233,7 +1233,7 @@ TEST(ReplicaSetMonitor, PrimaryIsUpCheck) { */ TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasNewerConfigVersion) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); auto ns = refresher.getNextStep(); ASSERT_EQUALS(ns.step, NextStep::CONTACT_HOST); @@ -1295,7 +1295,7 @@ TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasNewerConfigVersion) { */ TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasOlderConfigVersion) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); auto ns = refresher.getNextStep(); ASSERT_EQUALS(ns.step, NextStep::CONTACT_HOST); @@ -1355,7 +1355,7 @@ TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasOlderConfigVersion) { */ TEST(ReplicaSetMonitor, MaxStalenessMSMatch) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(100)); @@ -1408,7 +1408,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSMatch) { */ TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); @@ -1460,7 +1460,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) { */ TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); @@ -1515,7 +1515,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) { */ TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); @@ -1569,7 +1569,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) { */ TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); @@ -1622,7 +1622,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) { */ TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); @@ -1674,7 +1674,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) { */ TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleSecondaryMatched) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime opTime{Timestamp{10, 10}, 10}; const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); @@ -1727,7 +1727,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleSecondaryMatched) { */ TEST(ReplicaSetMonitor, MaxStalenessMSNoLastWrite) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200)); BSONArray hosts = BSON_ARRAY("a" @@ -1769,7 +1769,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoLastWrite) { */ TEST(ReplicaSetMonitor, MaxStalenessMSZeroNoLastWrite) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(0)); BSONArray hosts = BSON_ARRAY("a" @@ -1811,7 +1811,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSZeroNoLastWrite) { */ TEST(ReplicaSetMonitor, MinOpTimeMatched) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10}; repl::OpTime opTimeNonStale{Timestamp{10, 10}, 11}; @@ -1856,7 +1856,7 @@ TEST(ReplicaSetMonitor, MinOpTimeMatched) { */ TEST(ReplicaSetMonitor, MinOpTimeNotMatched) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10}; repl::OpTime opTimeNonStale{Timestamp{10, 10}, 11}; @@ -1901,7 +1901,7 @@ TEST(ReplicaSetMonitor, MinOpTimeNotMatched) { */ TEST(ReplicaSetMonitor, MinOpTimeIgnored) { SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet); - Refresher refresher(state); + Refresher refresher(state, nullptr, nullptr); repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10}; repl::OpTime opTimeStale{Timestamp{10, 10}, 9}; diff --git a/src/mongo/client/replica_set_monitor_transport.cpp b/src/mongo/client/replica_set_monitor_transport.cpp new file mode 100644 index 00000000000..58cc7fa5671 --- /dev/null +++ b/src/mongo/client/replica_set_monitor_transport.cpp @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/client/replica_set_monitor_transport.h" + +#include "mongo/client/connpool.h" +#include "mongo/executor/task_executor.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/future.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { +const int kLogLevel = 2; +} + +ReplicaSetMonitorTransport::~ReplicaSetMonitorTransport() {} + + +Future<BSONObj> ReplicaSetMonitorDbClientTransport::sayHello(HostAndPort host, + const std::string& setName, + const MongoURI& setUri, + Milliseconds timeout) noexcept { + MongoURI targetURI; + const auto& hostStr = host.toString(); + Timer timer; + try { + if (setUri.isValid()) { + targetURI = setUri.cloneURIForServer(host, ""); + targetURI.setUser(""); + targetURI.setPassword(""); + } else { + targetURI = MongoURI(ConnectionString(host)); + } + + LOG(kLogLevel) << "ReplicaSetMonitor " << setName << " sending hello request to " + << hostStr; + ScopedDbConnection conn(targetURI, durationCount<Seconds>(timeout)); + bool ignoredOutParam = false; + BSONObj reply; + conn->isMaster(ignoredOutParam, &reply); + conn.done(); // return to pool on success. + LOG(kLogLevel) << "ReplicaSetMonitor " << setName << " received reply from " << hostStr + << ": " << reply.toString() << "(" + << durationCount<Milliseconds>(Microseconds{timer.micros()}) << " ms)"; + return Future<BSONObj>(reply); + } catch (const DBException& ex) { + severe() << "ReplicaSetMonitor " << setName << " recieved error while monitoring " + << hostStr << ": " << ex.toStatus().toString() << "(" + << durationCount<Milliseconds>(Microseconds{timer.micros()}) << " ms)"; + return Future<BSONObj>(ex.toStatus()); + } +} + + +ReplicaSetMonitorExecutorTransport::ReplicaSetMonitorExecutorTransport( + executor::TaskExecutor* executor) + : _executor(executor) {} + +Future<BSONObj> ReplicaSetMonitorExecutorTransport::sayHello(HostAndPort host, + const std::string& setName, + const MongoURI& setUri, + Milliseconds timeout) noexcept { + try { + auto pf = makePromiseFuture<BSONObj>(); + BSONObjBuilder bob; + bob.append("isMaster", 1); + + const auto& wireSpec = WireSpec::instance(); + if (wireSpec.isInternalClient) { + WireSpec::appendInternalClientWireVersion(wireSpec.outgoing, &bob); + } + + auto request = executor::RemoteCommandRequest(host, "admin", bob.obj(), nullptr, timeout); + request.sslMode = setUri.getSSLMode(); + + LOG(kLogLevel) << "Replica set monitor " << setName << " sending hello request to " << host; + auto swCbHandle = _executor->scheduleRemoteCommand(std::move(request), [ + this, + setName, + requestState = std::make_shared<HelloRequest>(host, std::move(pf.promise)) + ](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { + LOG(kLogLevel) << "Replica set monitor " << setName << " received reply from " + << requestState->host.toString() << ": " + << (result.response.isOK() ? result.response.data.toString() + : result.response.status.toString()) + << "(" << durationCount<Milliseconds>( + Microseconds{requestState->timer.micros()}) + << " ms)"; + + _haltIfIncompatibleServer(result.response.status); + + if (result.response.isOK()) { + requestState->promise.emplaceValue(result.response.data); + } else { + requestState->promise.setError(result.response.status); + } + }); + + if (!swCbHandle.isOK()) { + severe() << "Replica set monitor " << setName << " error while scheduling request to " + << host << ": " << swCbHandle.getStatus(); + return swCbHandle.getStatus(); + } + + return std::move(pf.future); + } catch (const DBException& ex) { + severe() << "ReplicaSetMonitor " << setName << " unexpected error while monitoring " << host + << ": " << ex.toString(); + return Status(ErrorCodes::InternalError, ex.toString()); + } +} + +void ReplicaSetMonitorExecutorTransport::_haltIfIncompatibleServer(Status status) { + if (mongo::isMongos() && status == ErrorCodes::IncompatibleWithUpgradedServer) { + severe() << "This mongos server must be upgraded. It is attempting to " + "communicate with an upgraded cluster with which it is " + "incompatible. " + << "Error: '" << status << "' " + << "Crashing in order to bring attention to the incompatibility, " + "rather than erroring endlessly."; + fassertNoTrace(5685401, false); + } +} +} diff --git a/src/mongo/client/replica_set_monitor_transport.h b/src/mongo/client/replica_set_monitor_transport.h new file mode 100644 index 00000000000..cf6d3607dc2 --- /dev/null +++ b/src/mongo/client/replica_set_monitor_transport.h @@ -0,0 +1,101 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/platform/basic.h" + +#include "mongo/client/mongo_uri.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/is_mongos.h" +#include "mongo/transport/transport_layer.h" +#include "mongo/util/duration.h" +#include "mongo/util/future.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/timer.h" + +namespace mongo { + +/** + * Interface for the ReplicaSetMonitorTransport. Implementations of this interface obtain a respone + * to the 'hello' command with a provided timeout. + */ +class ReplicaSetMonitorTransport { +public: + /** + * Runs the 'hello' command on 'host' and returns the result. If the timeout value is reached, + * the Future result will contain the error. + */ + virtual Future<BSONObj> sayHello(HostAndPort host, + const std::string& setName, + const MongoURI& setUri, + Milliseconds timeout) noexcept = 0; + virtual ~ReplicaSetMonitorTransport(); +}; +using ReplicaSetMonitorTransportPtr = std::unique_ptr<ReplicaSetMonitorTransport>; + +/** + * This class uses the DBClient object to make hello requests. It is utilized in unit tests + * to accomodate mocking the hello responses. + */ +class ReplicaSetMonitorDbClientTransport : public ReplicaSetMonitorTransport { +public: + Future<BSONObj> sayHello(HostAndPort host, + const std::string& setName, + const MongoURI& setUri, + Milliseconds timeout) noexcept override; +}; + +/** + * This class is used in the production version of the server to obtain hello responses. + * It uses an executor to run the hello commands with an appropriate timeout value. The + * executor should be multithreaded to accomodate TCP requests that do not get a reply in a + * timely fashion. + */ +class ReplicaSetMonitorExecutorTransport : public ReplicaSetMonitorTransport { +public: + explicit ReplicaSetMonitorExecutorTransport(executor::TaskExecutor* executor); + Future<BSONObj> sayHello(HostAndPort host, + const std::string& setName, + const MongoURI& setUri, + Milliseconds timeout) noexcept override; + +private: + void _haltIfIncompatibleServer(Status status); + + struct HelloRequest { + HelloRequest(HostAndPort h, Promise<BSONObj>&& p) + : host(h), timer(Timer()), promise(std::move(p)) {} + HostAndPort host; + Timer timer; + Promise<BSONObj> promise; + }; + + executor::TaskExecutor* const _executor; +}; +} diff --git a/src/mongo/dbtests/replica_set_monitor_test.cpp b/src/mongo/dbtests/replica_set_monitor_test.cpp index 2e3f2cda773..199a4e3e056 100644 --- a/src/mongo/dbtests/replica_set_monitor_test.cpp +++ b/src/mongo/dbtests/replica_set_monitor_test.cpp @@ -94,7 +94,8 @@ TEST_F(ReplicaSetMonitorTest, SeedWithPriOnlySecDown) { const string replSetName(replSet->getSetName()); set<HostAndPort> seedList; seedList.insert(HostAndPort(replSet->getPrimary())); - auto monitor = ReplicaSetMonitor::createIfNeeded(replSetName, seedList); + auto monitor = ReplicaSetMonitor::createIfNeeded( + replSetName, seedList, std::make_unique<ReplicaSetMonitorDbClientTransport>()); replSet->kill(replSet->getPrimary()); @@ -149,7 +150,8 @@ TEST(ReplicaSetMonitorTest, PrimaryRemovedFromSetStress) { const string replSetName(replSet.getSetName()); set<HostAndPort> seedList; seedList.insert(HostAndPort(replSet.getPrimary())); - auto replMonitor = ReplicaSetMonitor::createIfNeeded(replSetName, seedList); + auto replMonitor = ReplicaSetMonitor::createIfNeeded( + replSetName, seedList, std::make_unique<ReplicaSetMonitorDbClientTransport>()); const repl::ReplSetConfig& origConfig = replSet.getReplConfig(); @@ -261,7 +263,8 @@ TEST_F(TwoNodeWithTags, SecDownRetryNoTag) { set<HostAndPort> seedList; seedList.insert(HostAndPort(replSet->getPrimary())); - auto monitor = ReplicaSetMonitor::createIfNeeded(replSet->getSetName(), seedList); + auto monitor = ReplicaSetMonitor::createIfNeeded( + replSet->getSetName(), seedList, std::make_unique<ReplicaSetMonitorDbClientTransport>()); const string secHost(replSet->getSecondaries().front()); replSet->kill(secHost); @@ -287,7 +290,8 @@ TEST_F(TwoNodeWithTags, SecDownRetryWithTag) { set<HostAndPort> seedList; seedList.insert(HostAndPort(replSet->getPrimary())); - auto monitor = ReplicaSetMonitor::createIfNeeded(replSet->getSetName(), seedList); + auto monitor = ReplicaSetMonitor::createIfNeeded( + replSet->getSetName(), seedList, std::make_unique<ReplicaSetMonitorDbClientTransport>()); const string secHost(replSet->getSecondaries().front()); replSet->kill(secHost); |