summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLaMont Nelson <lamont.nelson@mongodb.com>2021-05-18 00:50:07 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-28 19:14:14 +0000
commit8ddcaf878b5600fdac322929cd337c6f4563bf19 (patch)
tree00e31e4a692f1fb2af772b1db3279bbfaacfcb46
parent605f0cc7bbbc9bce090e3316e3db22ec043e64af (diff)
downloadmongo-8ddcaf878b5600fdac322929cd337c6f4563bf19.tar.gz
SERVER-56854: Use executor and enforce timeout when making hello requests in the ReplicaSetMonitor
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/client/SConscript3
-rw-r--r--src/mongo/client/dbclient_rs.cpp7
-rw-r--r--src/mongo/client/dbclient_rs_test.cpp11
-rw-r--r--src/mongo/client/remote_command_targeter_rs.cpp3
-rw-r--r--src/mongo/client/replica_set_monitor.cpp92
-rw-r--r--src/mongo/client/replica_set_monitor.h32
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp60
-rw-r--r--src/mongo/client/replica_set_monitor_manager.h12
-rw-r--r--src/mongo/client/replica_set_monitor_test.cpp54
-rw-r--r--src/mongo/client/replica_set_monitor_transport.cpp152
-rw-r--r--src/mongo/client/replica_set_monitor_transport.h101
-rw-r--r--src/mongo/dbtests/replica_set_monitor_test.cpp12
13 files changed, 446 insertions, 94 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 3b34dc578aa..8c36abbdba7 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -485,6 +485,7 @@ if not has_option('noshell') and usemozjs:
'shell/mongojs',
'transport/message_compressor',
'transport/transport_layer_manager',
+ 'util/concurrency/thread_pool',
'util/net/network',
'util/options_parser/options_parser_init',
'util/processinfo',
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index c5a54ee2b5c..0f8b225c8ab 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -186,6 +186,7 @@ clientDriverEnv.Library(
'global_conn_pool.cpp',
'replica_set_monitor.cpp',
'replica_set_monitor_manager.cpp',
+ 'replica_set_monitor_transport.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/write_concern_options',
@@ -194,6 +195,7 @@ clientDriverEnv.Library(
'$BUILD_DIR/mongo/executor/network_interface_thread_pool',
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
'$BUILD_DIR/mongo/util/background_job',
+ '$BUILD_DIR/mongo/util/concurrency/thread_pool',
'$BUILD_DIR/mongo/util/md5',
'$BUILD_DIR/mongo/util/net/network',
'clientdriver_minimal',
@@ -295,6 +297,7 @@ env.CppUnitTest('dbclient_rs_test',
LIBDEPS=[
'clientdriver_network',
'$BUILD_DIR/mongo/dbtests/mocklib',
+ '$BUILD_DIR/mongo/db/service_context_test_fixture',
],
)
diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp
index 7a84a98210d..57ebdd08940 100644
--- a/src/mongo/client/dbclient_rs.cpp
+++ b/src/mongo/client/dbclient_rs.cpp
@@ -139,11 +139,12 @@ DBClientReplicaSet::DBClientReplicaSet(const string& name,
_applicationName(applicationName.toString()),
_so_timeout(so_timeout),
_uri(std::move(uri)) {
+ auto rsmTransport = std::make_unique<ReplicaSetMonitorDbClientTransport>();
if (_uri.isValid()) {
- _rsm = ReplicaSetMonitor::createIfNeeded(_uri);
+ _rsm = ReplicaSetMonitor::createIfNeeded(_uri, std::move(rsmTransport));
} else {
- _rsm = ReplicaSetMonitor::createIfNeeded(name,
- set<HostAndPort>(servers.begin(), servers.end()));
+ _rsm = ReplicaSetMonitor::createIfNeeded(
+ name, set<HostAndPort>(servers.begin(), servers.end()), std::move(rsmTransport));
}
}
diff --git a/src/mongo/client/dbclient_rs_test.cpp b/src/mongo/client/dbclient_rs_test.cpp
index d3ae470d914..6bb72a5cf98 100644
--- a/src/mongo/client/dbclient_rs_test.cpp
+++ b/src/mongo/client/dbclient_rs_test.cpp
@@ -47,6 +47,7 @@
#include "mongo/client/dbclientinterface.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/service_context_test_fixture.h"
#include "mongo/dbtests/mock/mock_conn_registry.h"
#include "mongo/dbtests/mock/mock_replica_set.h"
#include "mongo/stdx/unordered_set.h"
@@ -78,7 +79,7 @@ BSONObj makeMetadata(ReadPreference rp, TagSet tagSet) {
/**
* Basic fixture with one primary and one secondary.
*/
-class BasicRS : public unittest::Test {
+class BasicRS : public ServiceContextTest {
protected:
void setUp() {
ReplicaSetMonitor::cleanup();
@@ -202,7 +203,7 @@ TEST_F(BasicRS, CommandSecondaryPreferred) {
/**
* Setup for 2 member replica set will all of the nodes down.
*/
-class AllNodesDown : public unittest::Test {
+class AllNodesDown : public ServiceContextTest {
protected:
void setUp() {
ReplicaSetMonitor::cleanup();
@@ -310,7 +311,7 @@ TEST_F(AllNodesDown, CommandNearest) {
/**
* Setup for 2 member replica set with the primary down.
*/
-class PrimaryDown : public unittest::Test {
+class PrimaryDown : public ServiceContextTest {
protected:
void setUp() {
ReplicaSetMonitor::cleanup();
@@ -416,7 +417,7 @@ TEST_F(PrimaryDown, Nearest) {
/**
* Setup for 2 member replica set with the secondary down.
*/
-class SecondaryDown : public unittest::Test {
+class SecondaryDown : public ServiceContextTest {
protected:
void setUp() {
ReplicaSetMonitor::cleanup();
@@ -527,7 +528,7 @@ TEST_F(SecondaryDown, CommandNearest) {
* Warning: Tests running this fixture cannot be run in parallel with other tests
* that uses ConnectionString::setConnectionHook
*/
-class TaggedFiveMemberRS : public unittest::Test {
+class TaggedFiveMemberRS : public ServiceContextTest {
protected:
void setUp() {
// Tests for pinning behavior require this.
diff --git a/src/mongo/client/remote_command_targeter_rs.cpp b/src/mongo/client/remote_command_targeter_rs.cpp
index c8b7f8b551a..7ba719cc35f 100644
--- a/src/mongo/client/remote_command_targeter_rs.cpp
+++ b/src/mongo/client/remote_command_targeter_rs.cpp
@@ -36,6 +36,7 @@
#include "mongo/base/status_with.h"
#include "mongo/client/connection_string.h"
+#include "mongo/client/global_conn_pool.h"
#include "mongo/client/read_preference.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/operation_context.h"
@@ -51,7 +52,7 @@ RemoteCommandTargeterRS::RemoteCommandTargeterRS(const std::string& rsName,
: _rsName(rsName) {
std::set<HostAndPort> seedServers(seedHosts.begin(), seedHosts.end());
- _rsMonitor = ReplicaSetMonitor::createIfNeeded(rsName, seedServers);
+ _rsMonitor = ReplicaSetMonitor::createIfNeeded(rsName, seedServers, nullptr);
LOG(1) << "Started targeter for "
<< ConnectionString::forReplicaSet(
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index c935ef00872..7d01c32b2c5 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/bson_extract_optime.h"
#include "mongo/db/server_options.h"
+#include "mongo/s/is_mongos.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
@@ -71,6 +72,9 @@ MONGO_FAIL_POINT_DEFINE(failAsyncConfigChangeHook);
// Failpoint for changing the default refresh period
MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod);
+// Failpoint for changing the default socket timeout for Hello command.
+MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorHelloTimeout);
+
namespace {
// Pull nested types to top-level scope
@@ -97,6 +101,8 @@ static constexpr int kRsmVerbosityThresholdTimeoutSec = 20;
// When 'is master' reply latency is over 2 sec, it will be logged.
static constexpr int kSlowIsMasterThresholdMicros = 2L * 1000 * 1000;
+static Seconds kHelloTimeout = Seconds{10};
+
const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet());
const Milliseconds kFindHostMaxBackOffTime(500);
AtomicBool areRefreshRetriesDisabledForTest{false}; // Only true in tests.
@@ -192,22 +198,43 @@ const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15);
// Defaults to random selection as required by the spec
bool ReplicaSetMonitor::useDeterministicHostSelection = false;
-Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() {
+Seconds ReplicaSetMonitor::getRefreshPeriod() {
MONGO_FAIL_POINT_BLOCK_IF(modifyReplicaSetMonitorDefaultRefreshPeriod,
data,
[&](const BSONObj& data) { return data.hasField("period"); }) {
- return Seconds{data.getData().getIntField("period")};
+ auto result = Seconds{data.getData().getIntField("period")};
+ log() << "Modified the default replica set monitor refresh period via failpoint to "
+ << result;
+ return result;
}
return kDefaultRefreshPeriod;
}
-ReplicaSetMonitor::ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds)
+Milliseconds ReplicaSetMonitor::getHelloTimeout() {
+ MONGO_FAIL_POINT_BLOCK_IF(modifyReplicaSetMonitorHelloTimeout, data, [&](const BSONObj& data) {
+ return data.hasField("period");
+ }) {
+ auto result = Milliseconds{data.getData().getIntField("period")};
+ log() << "Modified the replica set monitor request timeout via failpoint to " << result;
+ return result;
+ }
+
+ return duration_cast<Milliseconds>(kHelloTimeout);
+}
+
+ReplicaSetMonitor::ReplicaSetMonitor(StringData name,
+ const std::set<HostAndPort>& seeds,
+ ReplicaSetMonitorTransportPtr transport)
: _state(std::make_shared<SetState>(name, seeds)),
- _executor(globalRSMonitorManager.getExecutor()) {}
+ _executor(globalRSMonitorManager.getExecutor()),
+ _rsmTransport(std::move(transport)) {}
-ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri)
- : _state(std::make_shared<SetState>(uri)), _executor(globalRSMonitorManager.getExecutor()) {}
+ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri,
+ std::unique_ptr<ReplicaSetMonitorTransport> transport)
+ : _state(std::make_shared<SetState>(uri)),
+ _executor(globalRSMonitorManager.getExecutor()),
+ _rsmTransport(std::move(transport)) {}
void ReplicaSetMonitor::init() {
_scheduleRefresh(_executor->now());
@@ -325,7 +352,7 @@ HostAndPort ReplicaSetMonitor::getMasterOrUassert() {
Refresher ReplicaSetMonitor::startOrContinueRefresh() {
stdx::lock_guard<stdx::mutex> lk(_state->mutex);
- Refresher out(_state);
+ Refresher out(_state, _executor, _rsmTransport.get());
DEV _state->checkInvariants();
return out;
}
@@ -394,14 +421,16 @@ bool ReplicaSetMonitor::contains(const HostAndPort& host) const {
return _state->seedNodes.count(host);
}
-shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const string& name,
- const set<HostAndPort>& servers) {
+shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(
+ const string& name, const set<HostAndPort>& servers, ReplicaSetMonitorTransportPtr transport) {
return globalRSMonitorManager.getOrCreateMonitor(
- ConnectionString::forReplicaSet(name, vector<HostAndPort>(servers.begin(), servers.end())));
+ ConnectionString::forReplicaSet(name, vector<HostAndPort>(servers.begin(), servers.end())),
+ std::move(transport));
}
-shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const MongoURI& uri) {
- return globalRSMonitorManager.getOrCreateMonitor(uri);
+shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(
+ const MongoURI& uri, ReplicaSetMonitorTransportPtr transport) {
+ return globalRSMonitorManager.getOrCreateMonitor(uri, std::move(transport));
}
shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::get(const std::string& name) {
@@ -490,7 +519,10 @@ void ReplicaSetMonitor::markAsRemoved() {
_isRemovedFromManager.store(true);
}
-Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) {
+Refresher::Refresher(const SetStatePtr& setState,
+ executor::TaskExecutor* executor,
+ ReplicaSetMonitorTransport* transport)
+ : _set(setState), _scan(setState->currentScan), _executor(executor), _rsmTransport(transport) {
if (_scan)
return; // participate in in-progress scan
@@ -865,32 +897,18 @@ HostAndPort Refresher::_refreshUntilMatches(const ReadPreferenceSetting* criteri
continue;
case NextStep::CONTACT_HOST: {
- StatusWith<BSONObj> isMasterReplyStatus{ErrorCodes::InternalError,
- "Uninitialized variable"};
+ StatusWith<BSONObj> helloReplyStatus{ErrorCodes::InternalError,
+ "Uninitialized variable"};
int64_t pingMicros = 0;
- MongoURI targetURI;
-
- if (_set->setUri.isValid()) {
- targetURI = _set->setUri.cloneURIForServer(ns.host, "");
- targetURI.setUser("");
- targetURI.setPassword("");
- } else {
- targetURI = MongoURI(ConnectionString(ns.host));
- }
// Do not do network calls while holding a mutex
lk.unlock();
- try {
- ScopedDbConnection conn(targetURI, socketTimeoutSecs);
- bool ignoredOutParam = false;
+ {
Timer timer;
- BSONObj reply;
- conn->isMaster(ignoredOutParam, &reply);
- isMasterReplyStatus = reply;
+ auto helloFuture = _rsmTransport->sayHello(
+ ns.host, _set->name, _set->setUri, getHelloTimeout());
+ helloReplyStatus = helloFuture.getNoThrow();
pingMicros = timer.micros();
- conn.done(); // return to pool on success.
- } catch (const DBException& ex) {
- isMasterReplyStatus = ex.toStatus();
}
lk.lock();
@@ -900,10 +918,10 @@ HostAndPort Refresher::_refreshUntilMatches(const ReadPreferenceSetting* criteri
return criteria ? _set->getMatchingHost(*criteria) : HostAndPort();
}
- if (isMasterReplyStatus.isOK())
- receivedIsMaster(ns.host, pingMicros, isMasterReplyStatus.getValue(), verbose);
+ if (helloReplyStatus.isOK())
+ receivedIsMaster(ns.host, pingMicros, helloReplyStatus.getValue(), verbose);
else
- failedHost(ns.host, isMasterReplyStatus.getStatus());
+ failedHost(ns.host, helloReplyStatus.getStatus());
}
}
@@ -1069,7 +1087,7 @@ SetState::SetState(StringData name, const std::set<HostAndPort>& seedNodes, Mong
rand(std::random_device()()),
roundRobin(0),
setUri(std::move(uri)),
- refreshPeriod(getDefaultRefreshPeriod()) {
+ refreshPeriod(getRefreshPeriod()) {
uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty());
if (name.empty())
diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h
index 4b5d811a673..115696c04ca 100644
--- a/src/mongo/client/replica_set_monitor.h
+++ b/src/mongo/client/replica_set_monitor.h
@@ -39,11 +39,13 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/client/mongo_uri.h"
+#include "mongo/client/replica_set_monitor_transport.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
+#include "mongo/util/timer.h"
namespace mongo {
@@ -70,9 +72,11 @@ public:
*
* seeds must not be empty.
*/
- ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds);
+ ReplicaSetMonitor(StringData name,
+ const std::set<HostAndPort>& seeds,
+ ReplicaSetMonitorTransportPtr transport);
- ReplicaSetMonitor(const MongoURI& uri);
+ ReplicaSetMonitor(const MongoURI& uri, ReplicaSetMonitorTransportPtr transport);
/**
* Schedules the initial refresh task into task executor.
@@ -186,10 +190,13 @@ public:
/**
* Creates a new ReplicaSetMonitor, if it doesn't already exist.
*/
- static std::shared_ptr<ReplicaSetMonitor> createIfNeeded(const std::string& name,
- const std::set<HostAndPort>& servers);
+ static std::shared_ptr<ReplicaSetMonitor> createIfNeeded(
+ const std::string& name,
+ const std::set<HostAndPort>& servers,
+ ReplicaSetMonitorTransportPtr transport);
- static std::shared_ptr<ReplicaSetMonitor> createIfNeeded(const MongoURI& uri);
+ static std::shared_ptr<ReplicaSetMonitor> createIfNeeded(
+ const MongoURI& uri, ReplicaSetMonitorTransportPtr transport);
/**
* gets a cached Monitor per name. If the monitor is not found and createFromSeed is false,
@@ -249,7 +256,12 @@ public:
/**
* Returns the refresh period that is given to all new SetStates.
*/
- static Seconds getDefaultRefreshPeriod();
+ static Seconds getRefreshPeriod();
+
+ /**
+ * Returns the timeout for a single hello request.
+ */
+ static Milliseconds getHelloTimeout();
//
// internal types (defined in replica_set_monitor_internal.h)
@@ -300,6 +312,7 @@ private:
const SetStatePtr _state;
executor::TaskExecutor* _executor;
AtomicBool _isRemovedFromManager{false};
+ ReplicaSetMonitorTransportPtr _rsmTransport;
};
@@ -342,7 +355,9 @@ public:
*
* If no scan is in-progress, this function is responsible for setting up a new scan.
*/
- explicit Refresher(const SetStatePtr& setState);
+ explicit Refresher(const SetStatePtr& setState,
+ executor::TaskExecutor* executor,
+ ReplicaSetMonitorTransport* transport);
struct NextStep {
enum StepKind {
@@ -418,6 +433,9 @@ private:
// Both pointers are never NULL
SetStatePtr _set;
ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started.
+
+ executor::TaskExecutor* _executor;
+ ReplicaSetMonitorTransport* _rsmTransport;
};
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index d6de2153486..7345fd95fc8 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -38,6 +38,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/client/mongo_uri.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/client.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
@@ -47,6 +48,7 @@
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/log.h"
#include "mongo/util/map_util.h"
@@ -63,6 +65,11 @@ using executor::TaskExecutorPool;
using executor::TaskExecutor;
using executor::ThreadPoolTaskExecutor;
+namespace {
+const int kMaxRsmThreads = 1024;
+const int kMaxRsmConnectionsPerHost = 128;
+}
+
ReplicaSetMonitorManager::ReplicaSetMonitorManager() {}
ReplicaSetMonitorManager::~ReplicaSetMonitorManager() {
@@ -79,17 +86,46 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData se
}
}
+auto makeThreadPool(const std::string& poolName) {
+ ThreadPool::Options threadPoolOptions;
+ threadPoolOptions.poolName = poolName;
+
+ // Two threads are for the Scan and the hello request.
+ threadPoolOptions.minThreads = 2;
+
+ // This setting is a hedge against the issue described in
+ // SERVER-56854. Generally an RSM instance will use 1 thread
+ // to make the hello request. If there are delays in the network
+ // interface delivering the replies, the RSM will timeout and
+ // spawn additional threads to make progress.
+ threadPoolOptions.maxThreads = kMaxRsmThreads;
+
+ threadPoolOptions.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+ return stdx::make_unique<ThreadPool>(threadPoolOptions);
+}
+
void ReplicaSetMonitorManager::_setupTaskExecutorInLock(const std::string& name) {
auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>();
// do not restart taskExecutor if is in shutdown
if (!_taskExecutor && !_isShutdown) {
// construct task executor
+ auto threadName = "ReplicaSetMonitor-TaskExecutor";
+
+ // This is to limit the number of threads that a failed host can consume
+ // when there is a TCP blackhole. The RSM will attempt to contact the host repeatly
+ // spawning a new thread on each attempt if there is no response from the last attempt.
+ // Eventually, the connections will timeout according to TCP keepalive settings.
+ // See SERVER-56854 for more information.
+ executor::ConnectionPool::Options connPoolOptions;
+ connPoolOptions.maxConnections = kMaxRsmConnectionsPerHost;
+
auto net = executor::makeNetworkInterface(
- "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList));
- auto netPtr = net.get();
- _taskExecutor = stdx::make_unique<ThreadPoolTaskExecutor>(
- stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
+ threadName, nullptr, std::move(hookList), connPoolOptions);
+ _taskExecutor =
+ stdx::make_unique<ThreadPoolTaskExecutor>(makeThreadPool(threadName), std::move(net));
LOG(1) << "Starting up task executor for monitoring replica sets in response to request to "
"monitor set: "
<< redact(name);
@@ -104,7 +140,7 @@ void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode
}
shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
- const ConnectionString& connStr) {
+ const ConnectionString& connStr, ReplicaSetMonitorTransportPtr transport) {
invariant(connStr.type() == ConnectionString::SET);
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -120,13 +156,17 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
log() << "Starting new replica set monitor for " << connStr.toString();
- auto newMonitor = std::make_shared<ReplicaSetMonitor>(setName, servers);
+ if (!transport) {
+ transport = makeRsmTransport();
+ }
+ auto newMonitor = std::make_shared<ReplicaSetMonitor>(setName, servers, std::move(transport));
_monitors[setName] = newMonitor;
newMonitor->init();
return newMonitor;
}
-shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const MongoURI& uri) {
+shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
+ const MongoURI& uri, ReplicaSetMonitorTransportPtr transport) {
invariant(uri.type() == ConnectionString::SET);
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -140,7 +180,7 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const
log() << "Starting new replica set monitor for " << uri.toString();
- auto newMonitor = std::make_shared<ReplicaSetMonitor>(uri);
+ auto newMonitor = std::make_shared<ReplicaSetMonitor>(uri, std::move(transport));
_monitors[setName] = newMonitor;
newMonitor->init();
return newMonitor;
@@ -230,4 +270,8 @@ TaskExecutor* ReplicaSetMonitorManager::getExecutor() {
return _taskExecutor.get();
}
+
+ReplicaSetMonitorTransportPtr ReplicaSetMonitorManager::makeRsmTransport() {
+ return std::make_unique<ReplicaSetMonitorExecutorTransport>(getExecutor());
+}
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h
index c4ee1b05b7a..89bee98cfc8 100644
--- a/src/mongo/client/replica_set_monitor_manager.h
+++ b/src/mongo/client/replica_set_monitor_manager.h
@@ -34,6 +34,7 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
+#include "mongo/client/replica_set_monitor_transport.h"
#include "mongo/executor/task_executor.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/string_map.h"
@@ -60,8 +61,10 @@ public:
* nullptr if there is no monitor registered for the particular replica set.
*/
std::shared_ptr<ReplicaSetMonitor> getMonitor(StringData setName);
- std::shared_ptr<ReplicaSetMonitor> getOrCreateMonitor(const ConnectionString& connStr);
- std::shared_ptr<ReplicaSetMonitor> getOrCreateMonitor(const MongoURI& uri);
+ std::shared_ptr<ReplicaSetMonitor> getOrCreateMonitor(const ConnectionString& connStr,
+ ReplicaSetMonitorTransportPtr transport);
+ std::shared_ptr<ReplicaSetMonitor> getOrCreateMonitor(const MongoURI& uri,
+ ReplicaSetMonitorTransportPtr transport);
/**
* Retrieves the names of all sets tracked by this manager.
@@ -96,6 +99,11 @@ public:
*/
executor::TaskExecutor* getExecutor();
+ /*
+ * Returns a transport that uses the ReplicaSetMonitorManager's executor to run commands
+ */
+ ReplicaSetMonitorTransportPtr makeRsmTransport();
+
private:
using ReplicaSetMonitorsMap = StringMap<std::weak_ptr<ReplicaSetMonitor>>;
diff --git a/src/mongo/client/replica_set_monitor_test.cpp b/src/mongo/client/replica_set_monitor_test.cpp
index 02269794d79..7540854d9ce 100644
--- a/src/mongo/client/replica_set_monitor_test.cpp
+++ b/src/mongo/client/replica_set_monitor_test.cpp
@@ -347,7 +347,7 @@ TEST(ReplicaSetMonitor, IsMasterSecondaryWithTags) {
TEST(ReplicaSetMonitor, CheckAllSeedsSerial) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
set<HostAndPort> seen;
@@ -395,7 +395,7 @@ TEST(ReplicaSetMonitor, CheckAllSeedsSerial) {
TEST(ReplicaSetMonitor, CheckAllSeedsParallel) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
set<HostAndPort> seen;
@@ -453,7 +453,7 @@ TEST(ReplicaSetMonitor, CheckAllSeedsParallel) {
TEST(ReplicaSetMonitor, NoMasterInitAllUp) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
set<HostAndPort> seen;
@@ -500,7 +500,7 @@ TEST(ReplicaSetMonitor, NoMasterInitAllUp) {
TEST(ReplicaSetMonitor, MasterNotInSeeds_NoPrimaryInIsMaster) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
set<HostAndPort> seen;
@@ -577,7 +577,7 @@ TEST(ReplicaSetMonitor, MasterNotInSeeds_NoPrimaryInIsMaster) {
TEST(ReplicaSetMonitor, MasterNotInSeeds_PrimaryInIsMaster) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
set<HostAndPort> seen;
@@ -643,7 +643,7 @@ TEST(ReplicaSetMonitor, SlavesUsableEvenIfNoMaster) {
std::set<HostAndPort> seeds;
seeds.insert(HostAndPort("a"));
SetStatePtr state = std::make_shared<SetState>("name", seeds);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet());
@@ -685,7 +685,7 @@ TEST(ReplicaSetMonitor, SlavesUsableEvenIfNoMaster) {
// Test multiple nodes that claim to be master (we use a last-wins policy)
TEST(ReplicaSetMonitor, MultipleMasterLastNodeWins) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
set<HostAndPort> seen;
@@ -748,7 +748,7 @@ TEST(ReplicaSetMonitor, MultipleMasterLastNodeWins) {
// Test nodes disagree about who is in the set, master is source of truth
TEST(ReplicaSetMonitor, MasterIsSourceOfTruth) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
BSONArray primaryHosts = BSON_ARRAY("a"
<< "b"
@@ -789,7 +789,7 @@ TEST(ReplicaSetMonitor, MasterIsSourceOfTruth) {
// Test multiple master nodes that disagree about set membership
TEST(ReplicaSetMonitor, MultipleMastersDisagree) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
BSONArray hostsForSeed[3];
hostsForSeed[0] = BSON_ARRAY("a"
@@ -887,7 +887,7 @@ TEST(ReplicaSetMonitor, MultipleMastersDisagree) {
// Ensure getMatchingHost returns hosts even if scan is ongoing
TEST(ReplicaSetMonitor, GetMatchingDuringScan) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
const ReadPreferenceSetting primaryOnly(ReadPreference::PrimaryOnly, TagSet());
const ReadPreferenceSetting secondaryOnly(ReadPreference::SecondaryOnly, TagSet());
@@ -990,7 +990,7 @@ TEST(ReplicaSetMonitor, OutOfBandFailedHost) {
// Newly elected primary with electionId >= maximum electionId seen by the Refresher
TEST(ReplicaSetMonitorTests, NewPrimaryWithMaxElectionId) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
set<HostAndPort> seen;
@@ -1056,7 +1056,7 @@ TEST(ReplicaSetMonitorTests, NewPrimaryWithMaxElectionId) {
// Ignore electionId of secondaries
TEST(ReplicaSetMonitorTests, IgnoreElectionIdFromSecondaries) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
set<HostAndPort> seen;
@@ -1103,7 +1103,7 @@ TEST(ReplicaSetMonitorTests, IgnoreElectionIdFromSecondaries) {
// Stale Primary with obsolete electionId
TEST(ReplicaSetMonitorTests, StalePrimaryWithObsoleteElectionId) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
const OID firstElectionId = OID::gen();
const OID secondElectionId = OID::gen();
@@ -1233,7 +1233,7 @@ TEST(ReplicaSetMonitor, PrimaryIsUpCheck) {
*/
TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasNewerConfigVersion) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
auto ns = refresher.getNextStep();
ASSERT_EQUALS(ns.step, NextStep::CONTACT_HOST);
@@ -1295,7 +1295,7 @@ TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasNewerConfigVersion) {
*/
TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasOlderConfigVersion) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
auto ns = refresher.getNextStep();
ASSERT_EQUALS(ns.step, NextStep::CONTACT_HOST);
@@ -1355,7 +1355,7 @@ TEST(ReplicaSetMonitorTests, TwoPrimaries2ndHasOlderConfigVersion) {
*/
TEST(ReplicaSetMonitor, MaxStalenessMSMatch) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(100));
@@ -1408,7 +1408,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSMatch) {
*/
TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
@@ -1460,7 +1460,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoMatch) {
*/
TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
@@ -1515,7 +1515,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoPrimaryMatch) {
*/
TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
@@ -1569,7 +1569,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllFailed) {
*/
TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
@@ -1622,7 +1622,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSAllButPrimaryFailed) {
*/
TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
@@ -1674,7 +1674,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSOneSecondaryFailed) {
*/
TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleSecondaryMatched) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime opTime{Timestamp{10, 10}, 10};
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
@@ -1727,7 +1727,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNonStaleSecondaryMatched) {
*/
TEST(ReplicaSetMonitor, MaxStalenessMSNoLastWrite) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(200));
BSONArray hosts = BSON_ARRAY("a"
@@ -1769,7 +1769,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSNoLastWrite) {
*/
TEST(ReplicaSetMonitor, MaxStalenessMSZeroNoLastWrite) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
const ReadPreferenceSetting secondary(ReadPreference::SecondaryOnly, TagSet(), Seconds(0));
BSONArray hosts = BSON_ARRAY("a"
@@ -1811,7 +1811,7 @@ TEST(ReplicaSetMonitor, MaxStalenessMSZeroNoLastWrite) {
*/
TEST(ReplicaSetMonitor, MinOpTimeMatched) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10};
repl::OpTime opTimeNonStale{Timestamp{10, 10}, 11};
@@ -1856,7 +1856,7 @@ TEST(ReplicaSetMonitor, MinOpTimeMatched) {
*/
TEST(ReplicaSetMonitor, MinOpTimeNotMatched) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10};
repl::OpTime opTimeNonStale{Timestamp{10, 10}, 11};
@@ -1901,7 +1901,7 @@ TEST(ReplicaSetMonitor, MinOpTimeNotMatched) {
*/
TEST(ReplicaSetMonitor, MinOpTimeIgnored) {
SetStatePtr state = std::make_shared<SetState>("name", basicSeedsSet);
- Refresher refresher(state);
+ Refresher refresher(state, nullptr, nullptr);
repl::OpTime minOpTimeSetting{Timestamp{10, 10}, 10};
repl::OpTime opTimeStale{Timestamp{10, 10}, 9};
diff --git a/src/mongo/client/replica_set_monitor_transport.cpp b/src/mongo/client/replica_set_monitor_transport.cpp
new file mode 100644
index 00000000000..58cc7fa5671
--- /dev/null
+++ b/src/mongo/client/replica_set_monitor_transport.cpp
@@ -0,0 +1,152 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/client/replica_set_monitor_transport.h"
+
+#include "mongo/client/connpool.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/future.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+const int kLogLevel = 2;
+}
+
+ReplicaSetMonitorTransport::~ReplicaSetMonitorTransport() {}
+
+
+Future<BSONObj> ReplicaSetMonitorDbClientTransport::sayHello(HostAndPort host,
+ const std::string& setName,
+ const MongoURI& setUri,
+ Milliseconds timeout) noexcept {
+ MongoURI targetURI;
+ const auto& hostStr = host.toString();
+ Timer timer;
+ try {
+ if (setUri.isValid()) {
+ targetURI = setUri.cloneURIForServer(host, "");
+ targetURI.setUser("");
+ targetURI.setPassword("");
+ } else {
+ targetURI = MongoURI(ConnectionString(host));
+ }
+
+ LOG(kLogLevel) << "ReplicaSetMonitor " << setName << " sending hello request to "
+ << hostStr;
+ ScopedDbConnection conn(targetURI, durationCount<Seconds>(timeout));
+ bool ignoredOutParam = false;
+ BSONObj reply;
+ conn->isMaster(ignoredOutParam, &reply);
+ conn.done(); // return to pool on success.
+ LOG(kLogLevel) << "ReplicaSetMonitor " << setName << " received reply from " << hostStr
+ << ": " << reply.toString() << "("
+ << durationCount<Milliseconds>(Microseconds{timer.micros()}) << " ms)";
+ return Future<BSONObj>(reply);
+ } catch (const DBException& ex) {
+ severe() << "ReplicaSetMonitor " << setName << " recieved error while monitoring "
+ << hostStr << ": " << ex.toStatus().toString() << "("
+ << durationCount<Milliseconds>(Microseconds{timer.micros()}) << " ms)";
+ return Future<BSONObj>(ex.toStatus());
+ }
+}
+
+
+ReplicaSetMonitorExecutorTransport::ReplicaSetMonitorExecutorTransport(
+ executor::TaskExecutor* executor)
+ : _executor(executor) {}
+
+Future<BSONObj> ReplicaSetMonitorExecutorTransport::sayHello(HostAndPort host,
+ const std::string& setName,
+ const MongoURI& setUri,
+ Milliseconds timeout) noexcept {
+ try {
+ auto pf = makePromiseFuture<BSONObj>();
+ BSONObjBuilder bob;
+ bob.append("isMaster", 1);
+
+ const auto& wireSpec = WireSpec::instance();
+ if (wireSpec.isInternalClient) {
+ WireSpec::appendInternalClientWireVersion(wireSpec.outgoing, &bob);
+ }
+
+ auto request = executor::RemoteCommandRequest(host, "admin", bob.obj(), nullptr, timeout);
+ request.sslMode = setUri.getSSLMode();
+
+ LOG(kLogLevel) << "Replica set monitor " << setName << " sending hello request to " << host;
+ auto swCbHandle = _executor->scheduleRemoteCommand(std::move(request), [
+ this,
+ setName,
+ requestState = std::make_shared<HelloRequest>(host, std::move(pf.promise))
+ ](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
+ LOG(kLogLevel) << "Replica set monitor " << setName << " received reply from "
+ << requestState->host.toString() << ": "
+ << (result.response.isOK() ? result.response.data.toString()
+ : result.response.status.toString())
+ << "(" << durationCount<Milliseconds>(
+ Microseconds{requestState->timer.micros()})
+ << " ms)";
+
+ _haltIfIncompatibleServer(result.response.status);
+
+ if (result.response.isOK()) {
+ requestState->promise.emplaceValue(result.response.data);
+ } else {
+ requestState->promise.setError(result.response.status);
+ }
+ });
+
+ if (!swCbHandle.isOK()) {
+ severe() << "Replica set monitor " << setName << " error while scheduling request to "
+ << host << ": " << swCbHandle.getStatus();
+ return swCbHandle.getStatus();
+ }
+
+ return std::move(pf.future);
+ } catch (const DBException& ex) {
+ severe() << "ReplicaSetMonitor " << setName << " unexpected error while monitoring " << host
+ << ": " << ex.toString();
+ return Status(ErrorCodes::InternalError, ex.toString());
+ }
+}
+
+void ReplicaSetMonitorExecutorTransport::_haltIfIncompatibleServer(Status status) {
+ if (mongo::isMongos() && status == ErrorCodes::IncompatibleWithUpgradedServer) {
+ severe() << "This mongos server must be upgraded. It is attempting to "
+ "communicate with an upgraded cluster with which it is "
+ "incompatible. "
+ << "Error: '" << status << "' "
+ << "Crashing in order to bring attention to the incompatibility, "
+ "rather than erroring endlessly.";
+ fassertNoTrace(5685401, false);
+ }
+}
+}
diff --git a/src/mongo/client/replica_set_monitor_transport.h b/src/mongo/client/replica_set_monitor_transport.h
new file mode 100644
index 00000000000..cf6d3607dc2
--- /dev/null
+++ b/src/mongo/client/replica_set_monitor_transport.h
@@ -0,0 +1,101 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/mongo_uri.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/is_mongos.h"
+#include "mongo/transport/transport_layer.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/future.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/timer.h"
+
+namespace mongo {
+
+/**
+ * Interface for the ReplicaSetMonitorTransport. Implementations of this interface obtain a respone
+ * to the 'hello' command with a provided timeout.
+ */
+class ReplicaSetMonitorTransport {
+public:
+ /**
+ * Runs the 'hello' command on 'host' and returns the result. If the timeout value is reached,
+ * the Future result will contain the error.
+ */
+ virtual Future<BSONObj> sayHello(HostAndPort host,
+ const std::string& setName,
+ const MongoURI& setUri,
+ Milliseconds timeout) noexcept = 0;
+ virtual ~ReplicaSetMonitorTransport();
+};
+using ReplicaSetMonitorTransportPtr = std::unique_ptr<ReplicaSetMonitorTransport>;
+
+/**
+ * This class uses the DBClient object to make hello requests. It is utilized in unit tests
+ * to accomodate mocking the hello responses.
+ */
+class ReplicaSetMonitorDbClientTransport : public ReplicaSetMonitorTransport {
+public:
+ Future<BSONObj> sayHello(HostAndPort host,
+ const std::string& setName,
+ const MongoURI& setUri,
+ Milliseconds timeout) noexcept override;
+};
+
+/**
+ * This class is used in the production version of the server to obtain hello responses.
+ * It uses an executor to run the hello commands with an appropriate timeout value. The
+ * executor should be multithreaded to accomodate TCP requests that do not get a reply in a
+ * timely fashion.
+ */
+class ReplicaSetMonitorExecutorTransport : public ReplicaSetMonitorTransport {
+public:
+ explicit ReplicaSetMonitorExecutorTransport(executor::TaskExecutor* executor);
+ Future<BSONObj> sayHello(HostAndPort host,
+ const std::string& setName,
+ const MongoURI& setUri,
+ Milliseconds timeout) noexcept override;
+
+private:
+ void _haltIfIncompatibleServer(Status status);
+
+ struct HelloRequest {
+ HelloRequest(HostAndPort h, Promise<BSONObj>&& p)
+ : host(h), timer(Timer()), promise(std::move(p)) {}
+ HostAndPort host;
+ Timer timer;
+ Promise<BSONObj> promise;
+ };
+
+ executor::TaskExecutor* const _executor;
+};
+}
diff --git a/src/mongo/dbtests/replica_set_monitor_test.cpp b/src/mongo/dbtests/replica_set_monitor_test.cpp
index 2e3f2cda773..199a4e3e056 100644
--- a/src/mongo/dbtests/replica_set_monitor_test.cpp
+++ b/src/mongo/dbtests/replica_set_monitor_test.cpp
@@ -94,7 +94,8 @@ TEST_F(ReplicaSetMonitorTest, SeedWithPriOnlySecDown) {
const string replSetName(replSet->getSetName());
set<HostAndPort> seedList;
seedList.insert(HostAndPort(replSet->getPrimary()));
- auto monitor = ReplicaSetMonitor::createIfNeeded(replSetName, seedList);
+ auto monitor = ReplicaSetMonitor::createIfNeeded(
+ replSetName, seedList, std::make_unique<ReplicaSetMonitorDbClientTransport>());
replSet->kill(replSet->getPrimary());
@@ -149,7 +150,8 @@ TEST(ReplicaSetMonitorTest, PrimaryRemovedFromSetStress) {
const string replSetName(replSet.getSetName());
set<HostAndPort> seedList;
seedList.insert(HostAndPort(replSet.getPrimary()));
- auto replMonitor = ReplicaSetMonitor::createIfNeeded(replSetName, seedList);
+ auto replMonitor = ReplicaSetMonitor::createIfNeeded(
+ replSetName, seedList, std::make_unique<ReplicaSetMonitorDbClientTransport>());
const repl::ReplSetConfig& origConfig = replSet.getReplConfig();
@@ -261,7 +263,8 @@ TEST_F(TwoNodeWithTags, SecDownRetryNoTag) {
set<HostAndPort> seedList;
seedList.insert(HostAndPort(replSet->getPrimary()));
- auto monitor = ReplicaSetMonitor::createIfNeeded(replSet->getSetName(), seedList);
+ auto monitor = ReplicaSetMonitor::createIfNeeded(
+ replSet->getSetName(), seedList, std::make_unique<ReplicaSetMonitorDbClientTransport>());
const string secHost(replSet->getSecondaries().front());
replSet->kill(secHost);
@@ -287,7 +290,8 @@ TEST_F(TwoNodeWithTags, SecDownRetryWithTag) {
set<HostAndPort> seedList;
seedList.insert(HostAndPort(replSet->getPrimary()));
- auto monitor = ReplicaSetMonitor::createIfNeeded(replSet->getSetName(), seedList);
+ auto monitor = ReplicaSetMonitor::createIfNeeded(
+ replSet->getSetName(), seedList, std::make_unique<ReplicaSetMonitorDbClientTransport>());
const string secHost(replSet->getSecondaries().front());
replSet->kill(secHost);