summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2020-02-12 18:10:07 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-24 17:28:39 +0000
commit5c833792316fe542445683b865a66a64eb793a4a (patch)
tree7fabcf6e73d58641cfa51e6fd07e613426e80811
parentd000968754302bd8ebbeaee3f7fbed6b25778593 (diff)
downloadmongo-5c833792316fe542445683b865a66a64eb793a4a.tar.gz
SERVER-45962 feature flag to opt out of new RSM implementation
-rw-r--r--src/mongo/client/SConscript14
-rw-r--r--src/mongo/client/dbclient_connection.cpp2
-rw-r--r--src/mongo/client/disable_streamable_rsm_flag_test.cpp128
-rw-r--r--src/mongo/client/replica_set_monitor.cpp1438
-rw-r--r--src/mongo/client/replica_set_monitor.h272
-rw-r--r--src/mongo/client/replica_set_monitor_interface.h169
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp16
-rw-r--r--src/mongo/client/replica_set_monitor_params.idl39
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.cpp1424
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.h213
-rw-r--r--src/mongo/client/scanning_replica_set_monitor_internal.h (renamed from src/mongo/client/replica_set_monitor_internal.h)15
-rw-r--r--src/mongo/client/scanning_replica_set_monitor_internal_test.cpp (renamed from src/mongo/client/replica_set_monitor_internal_test.cpp)8
-rw-r--r--src/mongo/client/scanning_replica_set_monitor_read_preference_test.cpp (renamed from src/mongo/client/replica_set_monitor_read_preference_test.cpp)4
-rw-r--r--src/mongo/client/scanning_replica_set_monitor_scan_test.cpp (renamed from src/mongo/client/replica_set_monitor_scan_test.cpp)16
-rw-r--r--src/mongo/client/scanning_replica_set_monitor_test_concurrent.cpp (renamed from src/mongo/client/replica_set_monitor_test_concurrent.cpp)27
-rw-r--r--src/mongo/client/scanning_replica_set_monitor_test_fixture.cpp (renamed from src/mongo/client/replica_set_monitor_test_fixture.cpp)12
-rw-r--r--src/mongo/client/scanning_replica_set_monitor_test_fixture.h (renamed from src/mongo/client/replica_set_monitor_test_fixture.h)18
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp108
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h93
-rw-r--r--src/mongo/dbtests/replica_set_monitor_test.cpp2
-rw-r--r--src/mongo/s/client/parallel.cpp2
-rw-r--r--src/mongo/shell/shell_utils.cpp2
22 files changed, 2270 insertions, 1752 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 6707bfa3494..b06b7d6f20d 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -194,7 +194,10 @@ clientDriverEnv.Library(
env.Idlc('global_conn_pool.idl')[0],
'replica_set_change_notifier.cpp',
'replica_set_monitor.cpp',
+ 'scanning_replica_set_monitor.cpp',
+ 'streamable_replica_set_monitor.cpp',
'replica_set_monitor_manager.cpp',
+ env.Idlc('replica_set_monitor_params.idl')[0],
'server_ping_monitor.cpp',
],
LIBDEPS=[
@@ -304,16 +307,17 @@ env.CppUnitTest(
'authenticate_test.cpp',
'connection_string_test.cpp',
'dbclient_cursor_test.cpp',
+ 'disable_streamable_rsm_flag_test.cpp',
'fetcher_test.cpp',
'index_spec_test.cpp',
'mongo_uri_test.cpp',
'read_preference_test.cpp',
'remote_command_retry_scheduler_test.cpp',
- 'replica_set_monitor_internal_test.cpp',
- 'replica_set_monitor_read_preference_test.cpp',
- 'replica_set_monitor_scan_test.cpp',
- 'replica_set_monitor_test_concurrent.cpp',
- 'replica_set_monitor_test_fixture.cpp',
+ 'scanning_replica_set_monitor_internal_test.cpp',
+ 'scanning_replica_set_monitor_read_preference_test.cpp',
+ 'scanning_replica_set_monitor_scan_test.cpp',
+ 'scanning_replica_set_monitor_test_concurrent.cpp',
+ 'scanning_replica_set_monitor_test_fixture.cpp',
'server_ping_monitor_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp
index e39fcec61c7..c7170733b1b 100644
--- a/src/mongo/client/dbclient_connection.cpp
+++ b/src/mongo/client/dbclient_connection.cpp
@@ -801,7 +801,7 @@ void DBClientConnection::handleNotMasterResponse(const BSONObj& replyBody,
return;
}
- ReplicaSetMonitorPtr monitor = ReplicaSetMonitor::get(_parentReplSetName);
+ auto monitor = ReplicaSetMonitor::get(_parentReplSetName);
if (monitor) {
monitor->failedHost(_serverAddress,
{ErrorCodes::NotMaster,
diff --git a/src/mongo/client/disable_streamable_rsm_flag_test.cpp b/src/mongo/client/disable_streamable_rsm_flag_test.cpp
new file mode 100644
index 00000000000..0cad97280d5
--- /dev/null
+++ b/src/mongo/client/disable_streamable_rsm_flag_test.cpp
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/client/replica_set_monitor_params_gen.h"
+#include "mongo/client/scanning_replica_set_monitor.h"
+#include "mongo/client/streamable_replica_set_monitor.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+namespace {
+
+class RSMDisableStreamableFlagTestFixture : public unittest::Test {
+protected:
+ void setUp() {
+ setGlobalServiceContext(ServiceContext::make());
+ ReplicaSetMonitor::cleanup();
+ }
+
+ void tearDown() {
+ unsetParameter();
+ }
+
+ /**
+ * Sets the data of the disableStreamableReplicaSetMonitor parameter to flagValue.
+ */
+ void setParameter(bool flagValue) {
+ const BSONObj newFlagParameter = BSON(kDisableStreamableFlagName << flagValue);
+ BSONObjIterator parameterIterator(newFlagParameter);
+ BSONElement newParameter = parameterIterator.next();
+ const auto foundParameter = findDisableStreamableServerParameter();
+
+ uassertStatusOK(foundParameter->second->set(newParameter));
+ ASSERT_EQ(flagValue, disableStreamableReplicaSetMonitor.load());
+ }
+
+ /**
+ * Restores the disableStreamableReplicaSetMonitor parameter to its default value.
+ */
+ void unsetParameter() {
+ const auto defaultParameter = kDefaultParameter[kDisableStreamableFlagName];
+ const auto foundParameter = findDisableStreamableServerParameter();
+
+ uassertStatusOK(foundParameter->second->set(defaultParameter));
+ }
+
+ /**
+ * Finds the disableStreamableReplicaSetMonitor ServerParameter.
+ */
+ ServerParameter::Map::const_iterator findDisableStreamableServerParameter() {
+ const ServerParameter::Map& parameterMap = ServerParameterSet::getGlobal()->getMap();
+ return parameterMap.find(kDisableStreamableFlagName);
+ }
+
+ static inline const std::string kDisableStreamableFlagName =
+ "disableStreamableReplicaSetMonitor";
+
+ /**
+ * A BSONObj containing the default for the disableStreamableReplicaSetMonitor flag.
+ */
+ static inline const BSONObj kDefaultParameter =
+ BSON(kDisableStreamableFlagName << disableStreamableReplicaSetMonitor.load());
+};
+
+/**
+ * Checks that a ScanningReplicaSetMonitor is created when the disableStreamableReplicaSetMonitor
+ * flag is set to true.
+ */
+TEST_F(RSMDisableStreamableFlagTestFixture, checkIsScanningIfDisableStreamableIsTrue) {
+ setParameter(true);
+ auto uri = MongoURI::parse("mongodb://a,b,c/?replicaSet=name");
+ ASSERT_OK(uri.getStatus());
+ auto createdMonitor = ReplicaSetMonitor::createIfNeeded(uri.getValue());
+
+ // If the created monitor does not point to a ScanningReplicaSetMonitor, the cast returns a
+ // nullptr.
+ auto scanningMonitorCast = dynamic_cast<ScanningReplicaSetMonitor*>(createdMonitor.get());
+ ASSERT(scanningMonitorCast);
+
+ auto streamableMonitorCast = dynamic_cast<StreamableReplicaSetMonitor*>(createdMonitor.get());
+ ASSERT_FALSE(streamableMonitorCast);
+}
+
+/**
+ * Checks that a StreamableReplicaSetMonitor is created when the the
+ * disableStreamableReplicaSetMonitor flag is set to false.
+ *
+ * TODO SERVER-43332: Once the StreamableReplicaSetMonitor is integrated into the codebase, this
+ * test should mirror the logic in checkIsScanningIfDisableStreamableIsTrue accordingly.
+ */
+TEST_F(RSMDisableStreamableFlagTestFixture, checkIsStreamableIfDisableStreamableIsFalse) {
+ setParameter(false);
+ auto uri = MongoURI::parse("mongodb://a,b,c/?replicaSet=name");
+ ASSERT_OK(uri.getStatus());
+ ASSERT_THROWS_CODE(ReplicaSetMonitor::createIfNeeded(uri.getValue()), DBException, 31451);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index 2110d43cd8a..57f43547cc8 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -41,11 +41,9 @@
#include "mongo/client/connpool.h"
#include "mongo/client/global_conn_pool.h"
#include "mongo/client/read_preference.h"
-#include "mongo/client/replica_set_monitor_internal.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/bson_extract_optime.h"
#include "mongo/db/server_options.h"
-#include "mongo/logv2/log.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
@@ -67,371 +65,9 @@ using std::vector;
// Failpoint for changing the default refresh period
MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod);
-namespace {
-
-// Pull nested types to top-level scope
-typedef ReplicaSetMonitor::IsMasterReply IsMasterReply;
-typedef ReplicaSetMonitor::ScanState ScanState;
-typedef ReplicaSetMonitor::ScanStatePtr ScanStatePtr;
-typedef ReplicaSetMonitor::SetState SetState;
-typedef ReplicaSetMonitor::SetStatePtr SetStatePtr;
-typedef ReplicaSetMonitor::Refresher Refresher;
-typedef ScanState::UnconfirmedReplies UnconfirmedReplies;
-typedef SetState::Node Node;
-typedef SetState::Nodes Nodes;
-using executor::TaskExecutor;
-using CallbackArgs = TaskExecutor::CallbackArgs;
-using CallbackHandle = TaskExecutor::CallbackHandle;
-
-// Intentionally chosen to compare worse than all known latencies.
-const int64_t unknownLatency = numeric_limits<int64_t>::max();
-
-const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet());
-const Milliseconds kExpeditedRefreshPeriod(500);
-AtomicWord<bool> areRefreshRetriesDisabledForTest{false}; // Only true in tests.
-
-//
-// Helpers for stl algorithms
-//
-
-bool isMaster(const Node& node) {
- return node.isMaster;
-}
-
-bool opTimeGreater(const Node* lhs, const Node* rhs) {
- return lhs->opTime > rhs->opTime;
-}
-
-bool compareLatencies(const Node* lhs, const Node* rhs) {
- // NOTE: this automatically compares Node::unknownLatency worse than all others.
- return lhs->latencyMicros < rhs->latencyMicros;
-}
-
-bool hostsEqual(const Node& lhs, const HostAndPort& rhs) {
- return lhs.host == rhs;
-}
-
-// Allows comparing two Nodes, or a HostAndPort and a Node.
-// NOTE: the two HostAndPort overload is only needed to support extra checks in some STL
-// implementations. For simplicity, no comparator should be used with collections of just
-// HostAndPort.
-struct CompareHosts {
- bool operator()(const Node& lhs, const Node& rhs) {
- return lhs.host < rhs.host;
- }
- bool operator()(const Node& lhs, const HostAndPort& rhs) {
- return lhs.host < rhs;
- }
- bool operator()(const HostAndPort& lhs, const Node& rhs) {
- return lhs < rhs.host;
- }
- bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) {
- return lhs < rhs;
- }
-} compareHosts; // like an overloaded function, but able to pass to stl algorithms
-
-// The following structs should be treated as functions returning a UnaryPredicate.
-// Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost));
-// They all hold their constructor argument by reference.
-
-struct HostIs {
- explicit HostIs(const HostAndPort& host) : _host(host) {}
- bool operator()(const HostAndPort& host) {
- return host == _host;
- }
- bool operator()(const Node& node) {
- return node.host == _host;
- }
- const HostAndPort& _host;
-};
-
-struct HostNotIn {
- explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {}
- bool operator()(const HostAndPort& host) {
- return !_hosts.count(host);
- }
- bool operator()(const Node& node) {
- return !_hosts.count(node.host);
- }
- const std::set<HostAndPort>& _hosts;
-};
-
-int32_t pingTimeMillis(const Node& node) {
- auto latencyMillis = node.latencyMicros / 1000;
- if (latencyMillis > numeric_limits<int32_t>::max()) {
- // In particular, Node::unknownLatency does not fit in an int32.
- return numeric_limits<int32_t>::max();
- }
- return latencyMillis;
-}
-
-/**
- * Replica set refresh period on the task executor.
- */
-const Seconds kDefaultRefreshPeriod(30);
-} // namespace
-
-// If we cannot find a host after 15 seconds of refreshing, give up
-const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15);
-
// Defaults to random selection as required by the spec
bool ReplicaSetMonitor::useDeterministicHostSelection = false;
-Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() {
- Seconds r = kDefaultRefreshPeriod;
- static constexpr auto kPeriodField = "period"_sd;
- modifyReplicaSetMonitorDefaultRefreshPeriod.executeIf(
- [&r](const BSONObj& data) { r = Seconds{data.getIntField(kPeriodField)}; },
- [](const BSONObj& data) { return data.hasField(kPeriodField); });
- return r;
-}
-
-ReplicaSetMonitor::ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {}
-
-ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri)
- : ReplicaSetMonitor(
- std::make_shared<SetState>(uri,
- &ReplicaSetMonitorManager::get()->getNotifier(),
- ReplicaSetMonitorManager::get()->getExecutor())) {}
-
-void ReplicaSetMonitor::init() {
- if (areRefreshRetriesDisabledForTest.load()) {
- // This is for MockReplicaSet. Those tests want to control when scanning happens.
- LOGV2_WARNING(20180,
- "*** Not starting background refresh because refresh retries are disabled.");
- return;
- }
-
- {
- stdx::lock_guard lk(_state->mutex);
- _state->init();
- }
-}
-
-void ReplicaSetMonitor::drop() {
- {
- stdx::lock_guard lk(_state->mutex);
- _state->drop();
- }
-}
-
-ReplicaSetMonitor::~ReplicaSetMonitor() {
- drop();
-}
-
-template <typename Callback>
-auto ReplicaSetMonitor::SetState::scheduleWorkAt(Date_t when, Callback&& cb) const {
- auto wrappedCallback = [cb = std::forward<Callback>(cb),
- anchor = shared_from_this()](const CallbackArgs& cbArgs) mutable {
- if (ErrorCodes::isCancelationError(cbArgs.status)) {
- // Do no more work if we're removed or canceled
- return;
- }
- invariant(cbArgs.status);
-
- stdx::lock_guard lk(anchor->mutex);
- if (anchor->isDropped) {
- return;
- }
-
- cb(cbArgs);
- };
- return executor->scheduleWorkAt(std::move(when), std::move(wrappedCallback));
-}
-
-void ReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy strategy) {
- // Reschedule the refresh
-
- if (!executor || isMocked) {
- // Without an executor, we can't do refreshes -- we're in a test
- return;
- }
-
- if (isDropped) { // already removed so no need to refresh
- LOGV2_DEBUG(20162,
- 1,
- "Stopping refresh for replica set {name} because it's removed",
- "name"_attr = name);
- return;
- }
-
- Milliseconds period = refreshPeriod;
- if (isExpedited) {
- period = std::min<Milliseconds>(period, kExpeditedRefreshPeriod);
- }
-
- auto currentTime = now();
- auto possibleNextScanTime = currentTime + period;
- if (refresherHandle && //
- (strategy == SchedulingStrategy::kKeepEarlyScan) && //
- (nextScanTime > currentTime) && //
- (possibleNextScanTime >= nextScanTime)) {
- // If the next scan would be sooner than our desired, why cancel?
- return;
- }
-
- // Cancel out the last refresh
- if (auto currentHandle = std::exchange(refresherHandle, {})) {
- executor->cancel(currentHandle);
- }
-
- nextScanTime = possibleNextScanTime;
- LOGV2_DEBUG(20163,
- 1,
- "Next replica set scan scheduled for {nextScanTime}",
- "nextScanTime"_attr = nextScanTime);
- auto swHandle = scheduleWorkAt(nextScanTime, [this](const CallbackArgs& cbArgs) {
- if (cbArgs.myHandle != refresherHandle)
- return; // We've been replaced!
-
- // It is possible that a waiter will have already expired by the point of this rescan.
- // Thus we notify here to trigger that logic.
- notify();
-
- _ensureScanInProgress(shared_from_this());
-
- // And now we set up the next one
- rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan);
- });
-
- if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) {
- LOGV2_DEBUG(20164,
- 1,
- "Cant schedule refresh for {name}. Executor shutdown in progress",
- "name"_attr = name);
- return;
- }
-
- if (!swHandle.isOK()) {
- LOGV2_FATAL(20184,
- "Can't continue refresh for replica set {name} due to {swHandle_getStatus}",
- "name"_attr = name,
- "swHandle_getStatus"_attr = redact(swHandle.getStatus()));
- fassertFailed(40140);
- }
-
- refresherHandle = std::move(swHandle.getValue());
-}
-
-SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria,
- Milliseconds maxWait) {
- return _getHostsOrRefresh(criteria, maxWait)
- .then([](const auto& hosts) {
- invariant(hosts.size());
- return hosts[0];
- })
- .semi();
-}
-
-SemiFuture<std::vector<HostAndPort>> ReplicaSetMonitor::getHostsOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
- return _getHostsOrRefresh(criteria, maxWait).semi();
-}
-
-Future<std::vector<HostAndPort>> ReplicaSetMonitor::_getHostsOrRefresh(
- const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
-
- stdx::lock_guard<Latch> lk(_state->mutex);
- if (_state->isDropped) {
- return Status(ErrorCodes::ReplicaSetMonitorRemoved,
- str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed");
- }
-
- auto out = _state->getMatchingHosts(criteria);
- if (!out.empty())
- return {std::move(out)};
-
- // Fail early without doing any more work if the timeout has already expired.
- if (maxWait <= Milliseconds(0))
- return _state->makeUnsatisfedReadPrefError(criteria);
-
- // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is
- // dealing with maxWait.
- auto pf = makePromiseFuture<decltype(out)>();
- _state->waiters.emplace_back(
- SetState::Waiter{_state->now() + maxWait, criteria, std::move(pf.promise)});
-
- // This must go after we set up the wait state to correctly handle unittests using
- // MockReplicaSet.
- _ensureScanInProgress(_state);
-
- // Switch to expedited scanning.
- _state->isExpedited = true;
- _state->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan);
-
- return std::move(pf.future);
-}
-HostAndPort ReplicaSetMonitor::getMasterOrUassert() {
- return getHostOrRefresh(kPrimaryOnlyReadPreference).get();
-}
-
-void ReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
- stdx::lock_guard<Latch> lk(_state->mutex);
- Node* node = _state->findNode(host);
- if (node)
- node->markFailed(status);
- if (kDebugBuild)
- _state->checkInvariants();
-}
-
-bool ReplicaSetMonitor::isPrimary(const HostAndPort& host) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- Node* node = _state->findNode(host);
- return node ? node->isMaster : false;
-}
-
-bool ReplicaSetMonitor::isHostUp(const HostAndPort& host) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- Node* node = _state->findNode(host);
- return node ? node->isUp : false;
-}
-
-int ReplicaSetMonitor::getMinWireVersion() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- int minVersion = 0;
- for (const auto& host : _state->nodes) {
- if (host.isUp) {
- minVersion = std::max(minVersion, host.minWireVersion);
- }
- }
-
- return minVersion;
-}
-
-int ReplicaSetMonitor::getMaxWireVersion() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- int maxVersion = std::numeric_limits<int>::max();
- for (const auto& host : _state->nodes) {
- if (host.isUp) {
- maxVersion = std::min(maxVersion, host.maxWireVersion);
- }
- }
-
- return maxVersion;
-}
-
-std::string ReplicaSetMonitor::getName() const {
- // name is const so don't need to lock
- return _state->name;
-}
-
-std::string ReplicaSetMonitor::getServerAddress() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- // We return our setUri until first confirmation
- return _state->seedConnStr.isValid() ? _state->seedConnStr.toString()
- : _state->setUri.connectionString().toString();
-}
-
-const MongoURI& ReplicaSetMonitor::getOriginalUri() const {
- // setUri is const so no need to lock.
- return _state->setUri;
-}
-
-bool ReplicaSetMonitor::contains(const HostAndPort& host) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- return _state->seedNodes.count(host);
-}
-
shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const string& name,
const set<HostAndPort>& servers) {
return ReplicaSetMonitorManager::get()->getOrCreateMonitor(
@@ -458,40 +94,6 @@ ReplicaSetChangeNotifier& ReplicaSetMonitor::getNotifier() {
return ReplicaSetMonitorManager::get()->getNotifier();
}
-// TODO move to correct order with non-statics before pushing
-void ReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
-
- BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName()));
- if (forFTDC) {
- for (size_t i = 0; i < _state->nodes.size(); i++) {
- const Node& node = _state->nodes[i];
- monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node));
- }
- return;
- }
-
- // NOTE: the format here must be consistent for backwards compatibility
- BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts"));
- for (size_t i = 0; i < _state->nodes.size(); i++) {
- const Node& node = _state->nodes[i];
-
- BSONObjBuilder builder;
- builder.append("addr", node.host.toString());
- builder.append("ok", node.isUp);
- builder.append("ismaster", node.isMaster); // intentionally not camelCase
- builder.append("hidden", false); // we don't keep hidden nodes in the set
- builder.append("secondary", node.isUp && !node.isMaster);
- builder.append("pingTimeMillis", pingTimeMillis(node));
-
- if (!node.tags.isEmpty()) {
- builder.append("tags", node.tags);
- }
-
- hosts.append(builder.obj());
- }
-}
-
void ReplicaSetMonitor::shutdown() {
ReplicaSetMonitorManager::get()->shutdown();
}
@@ -500,1041 +102,15 @@ void ReplicaSetMonitor::cleanup() {
ReplicaSetMonitorManager::get()->removeAllMonitors();
}
-void ReplicaSetMonitor::disableRefreshRetries_forTest() {
- areRefreshRetriesDisabledForTest.store(true);
-}
-
-bool ReplicaSetMonitor::isKnownToHaveGoodPrimary() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
-
- for (const auto& node : _state->nodes) {
- if (node.isMaster) {
- return true;
- }
- }
-
- return false;
-}
-
-void ReplicaSetMonitor::runScanForMockReplicaSet() {
- stdx::lock_guard<Latch> lk(_state->mutex);
- _ensureScanInProgress(_state);
-
- // This function should only be called from tests using MockReplicaSet and they should use the
- // synchronous path to complete before returning.
- invariant(_state->currentScan == nullptr);
-}
-
-void ReplicaSetMonitor::_ensureScanInProgress(const SetStatePtr& state) {
- Refresher(state).scheduleNetworkRequests();
-}
-
-Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) {
- if (_scan) {
- _set->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan);
- _scan->retryAllTriedHosts(_set->rand);
- return; // participate in in-progress scan
- }
-
- startNewScan();
-}
-
-void Refresher::scheduleNetworkRequests() {
- for (auto ns = getNextStep(); ns.step == NextStep::CONTACT_HOST; ns = getNextStep()) {
- if (!_set->executor || _set->isMocked) {
- // If we're mocked, just schedule an isMaster
- scheduleIsMaster(ns.host);
- continue;
- }
-
- // cancel any scheduled isMaster calls that haven't yet been called
- Node* node = _set->findOrCreateNode(ns.host);
- if (auto handle = std::exchange(node->scheduledIsMasterHandle, {})) {
- _set->executor->cancel(handle);
- }
-
- // ensure that the call to isMaster is scheduled at most every 500ms
- auto swHandle =
- _set->scheduleWorkAt(node->nextPossibleIsMasterCall,
- [*this, host = ns.host](const CallbackArgs& cbArgs) mutable {
- scheduleIsMaster(host);
- });
-
- if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) {
- _scan->markHostsToScanAsTried();
- break;
- }
-
- if (!swHandle.isOK()) {
- LOGV2_FATAL(
- 20185,
- "Can't continue scan for replica set {set_name} due to {swHandle_getStatus}",
- "set_name"_attr = _set->name,
- "swHandle_getStatus"_attr = redact(swHandle.getStatus()));
- fassertFailed(31176);
- }
-
- node->scheduledIsMasterHandle = uassertStatusOK(std::move(swHandle));
- }
-
- if (kDebugBuild)
- _set->checkInvariants();
-}
-
-void Refresher::scheduleIsMaster(const HostAndPort& host) {
- if (_set->isMocked) {
- // MockReplicaSet only works with DBClient-style access since it injects itself into the
- // ScopedDbConnection pool connection creation.
- try {
- ScopedDbConnection conn(ConnectionString(host), kCheckTimeout.count());
-
- auto timer = Timer();
- auto reply = BSONObj();
- bool ignoredOutParam = false;
- conn->isMaster(ignoredOutParam, &reply);
- conn.done(); // return to pool on success.
-
- receivedIsMaster(host, timer.micros(), reply);
- } catch (DBException& ex) {
- failedHost(host, ex.toStatus());
- }
-
- return;
- }
-
- auto request = executor::RemoteCommandRequest(
- host, "admin", BSON("isMaster" << 1), nullptr, kCheckTimeout);
- request.sslMode = _set->setUri.getSSLMode();
- auto status =
- _set->executor
- ->scheduleRemoteCommand(
- std::move(request),
- [copy = *this, host, timer = Timer()](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
- stdx::lock_guard lk(copy._set->mutex);
- // Ignore the reply and return if we are no longer the current scan. This might
- // happen if it was decided that the host we were contacting isn't part of the
- // set.
- if (copy._scan != copy._set->currentScan) {
- return;
- }
-
- // ensure that isMaster calls occur at most 500ms after the previous call ended
- if (auto node = copy._set->findNode(host)) {
- node->nextPossibleIsMasterCall =
- copy._set->executor->now() + Milliseconds(500);
- }
-
- if (result.response.isOK()) {
- // Not using result.response.elapsedMillis because higher precision is
- // useful for computing the rolling average.
- copy.receivedIsMaster(host, timer.micros(), result.response.data);
- } else {
- copy.failedHost(host, result.response.status);
- }
-
- // This reply may have discovered new hosts to contact so we need to schedule
- // them.
- copy.scheduleNetworkRequests();
- })
- .getStatus();
-
- if (!status.isOK()) {
- failedHost(host, status);
- // This is only called from scheduleNetworkRequests() which will still be looping, so we
- // don't need to call it here after updating the state.
- }
-}
-
-Refresher::NextStep Refresher::getNextStep() {
- // No longer the current scan
- if (_scan != _set->currentScan) {
- return NextStep(NextStep::DONE);
- }
-
- // Wait for all dispatched hosts to return before trying any fallback hosts.
- if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) {
- return NextStep(NextStep::WAIT);
- }
-
- // If we haven't yet found a master, try contacting unconfirmed hosts
- if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) {
- _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand);
- _scan->possibleNodes.clear();
- }
-
- if (_scan->hostsToScan.empty()) {
- // We've tried all hosts we can, so nothing more to do in this round.
- if (!_scan->foundUpMaster) {
- LOGV2_WARNING(
- 20181, "Unable to reach primary for set {set_name}", "set_name"_attr = _set->name);
-
- // Since we've talked to everyone we could but still didn't find a primary, we
- // do the best we can, and assume all unconfirmedReplies are actually from nodes
- // in the set (we've already confirmed that they think they are). This is
- // important since it allows us to bootstrap to a usable state even if we are
- // unable to talk to a master while starting up. As soon as we are able to
- // contact a master, we will remove any nodes that it doesn't think are part of
- // the set, undoing the damage we cause here.
-
- // NOTE: we don't modify seedNodes or notify about set membership change in this
- // case since it hasn't been confirmed by a master.
- for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
- it != _scan->unconfirmedReplies.end();
- ++it) {
- _set->findOrCreateNode(it->first)->update(it->second);
- }
-
- auto connStr = _set->possibleConnectionString();
- if (connStr != _set->workingConnStr) {
- _set->workingConnStr = std::move(connStr);
- _set->notifier->onPossibleSet(_set->workingConnStr);
- }
-
- // If at some point we care about lacking a primary, on it here
- _set->lastSeenMaster = {};
- }
-
- if (_scan->foundAnyUpNodes) {
- _set->consecutiveFailedScans = 0;
- } else {
- auto nScans = _set->consecutiveFailedScans++;
- if (nScans <= 10 || nScans % 10 == 0) {
- LOGV2(20165,
- "Cannot reach any nodes for set {set_name}. Please check network "
- "connectivity and the status of the set. This has happened for "
- "{set_consecutiveFailedScans} checks in a row.",
- "set_name"_attr = _set->name,
- "set_consecutiveFailedScans"_attr = _set->consecutiveFailedScans);
- }
- }
-
- // Makes sure all other Refreshers in this round return DONE
- _set->currentScan.reset();
- _set->notify();
-
- LOGV2_DEBUG(20166,
- 1,
- "Refreshing replica set {set_name} took {scan_timer_millis}ms",
- "set_name"_attr = _set->name,
- "scan_timer_millis"_attr = _scan->timer.millis());
-
- return NextStep(NextStep::DONE);
- }
-
- // Pop and return the next hostToScan.
- HostAndPort host = _scan->hostsToScan.front();
- _scan->hostsToScan.pop_front();
- _scan->waitingFor.insert(host);
- _scan->triedHosts.insert(host);
-
- return NextStep(NextStep::CONTACT_HOST, host);
-}
-
-void Refresher::receivedIsMaster(const HostAndPort& from,
- int64_t latencyMicros,
- const BSONObj& replyObj) {
- _scan->waitingFor.erase(from);
-
- const IsMasterReply reply(from, latencyMicros, replyObj);
-
- // Handle various failure cases
- if (!reply.ok) {
- failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"});
- return;
- }
-
- if (reply.setName != _set->name) {
- if (reply.raw["isreplicaset"].trueValue()) {
- // The reply came from a node in the state referred to as RSGhost in the SDAM
- // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event,
- // if a reply from a ghost offers a list of possible other members of the replica set,
- // and if this refresher has yet to find the replica set master, we add hosts listed in
- // the reply to the list of possible replica set members.
- if (!_scan->foundUpMaster) {
- _scan->possibleNodes.insert(reply.members.begin(), reply.members.end());
- }
- } else {
- LOGV2_ERROR(20183,
- "replset name mismatch: expected \"{set_name}\", but remote node {from} "
- "has replset name \"{reply_setName}\", ismaster: {replyObj}",
- "set_name"_attr = _set->name,
- "from"_attr = from,
- "reply_setName"_attr = reply.setName,
- "replyObj"_attr = replyObj);
- }
-
- failedHost(from,
- {ErrorCodes::InconsistentReplicaSetNames,
- str::stream() << "Target replica set name " << reply.setName
- << " does not match the monitored set name " << _set->name});
- return;
- }
-
- if (reply.isMaster) {
- Status status = receivedIsMasterFromMaster(from, reply);
- if (!status.isOK()) {
- failedHost(from, status);
- return;
- }
- }
-
- if (_scan->foundUpMaster) {
- // We only update a Node if a master has confirmed it is in the set.
- _set->updateNodeIfInNodes(reply);
- } else {
- // Populate possibleNodes.
- _scan->possibleNodes.insert(reply.members.begin(), reply.members.end());
- _scan->unconfirmedReplies[from] = reply;
- }
-
- // _set->nodes may still not have any nodes with isUp==true, but we have at least found a
- // connectible host that is that claims to be in the set.
- _scan->foundAnyUpNodes = true;
-
- _set->notify();
-
- if (kDebugBuild)
- _set->checkInvariants();
-}
-
-void Refresher::failedHost(const HostAndPort& host, const Status& status) {
- _scan->waitingFor.erase(host);
-
- Node* node = _set->findNode(host);
- if (node)
- node->markFailed(status);
-
- if (_scan->waitingFor.empty()) {
- // If this was the last host that needed a response, we should notify the SetState so that
- // we can fail any waiters that have timed out.
- _set->notify();
- }
-}
-
-void Refresher::startNewScan() {
- // The heuristics we use in deciding the order to contact hosts are designed to find a
- // master as quickly as possible. This is because we can't use any hosts we find until
- // we either get the latest set of members from a master or talk to all possible hosts
- // without finding a master.
-
- // TODO It might make sense to check down nodes first if the last seen master is still
- // marked as up.
-
- _scan = std::make_shared<ScanState>();
- _set->currentScan = _scan;
-
- int upNodes = 0;
- for (Nodes::const_iterator it(_set->nodes.begin()), end(_set->nodes.end()); it != end; ++it) {
- if (it->isUp) {
- // _scan the nodes we think are up first
- _scan->hostsToScan.push_front(it->host);
- upNodes++;
- } else {
- _scan->hostsToScan.push_back(it->host);
- }
- }
-
- // shuffle the queue, but keep "up" nodes at the front
- std::shuffle(
- _scan->hostsToScan.begin(), _scan->hostsToScan.begin() + upNodes, _set->rand.urbg());
- std::shuffle(_scan->hostsToScan.begin() + upNodes, _scan->hostsToScan.end(), _set->rand.urbg());
-
- if (!_set->lastSeenMaster.empty()) {
- // move lastSeenMaster to front of queue
- std::stable_partition(
- _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(_set->lastSeenMaster));
- }
-}
-
-Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) {
- invariant(reply.isMaster);
-
- // Reject if config version is older. This is for backwards compatibility with nodes in pv0
- // since they don't have the same ordering with pv1 electionId.
- if (reply.configVersion < _set->configVersion) {
- return {
- ErrorCodes::NotMaster,
- str::stream() << "Node " << from << " believes it is primary, but its config version "
- << reply.configVersion << " is older than the most recent config version "
- << _set->configVersion};
- }
-
- if (reply.electionId.isSet()) {
- // ElectionIds are only comparable if they are of the same protocol version. However, since
- // isMaster has no protocol version field, we use the configVersion instead. This works
- // because configVersion needs to be incremented whenever the protocol version is changed.
- if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() &&
- _set->maxElectionId.compare(reply.electionId) > 0) {
- return {
- ErrorCodes::NotMaster,
- str::stream() << "Node " << from << " believes it is primary, but its election id "
- << reply.electionId << " is older than the most recent election id "
- << _set->maxElectionId};
- }
-
- _set->maxElectionId = reply.electionId;
- }
-
- _set->configVersion = reply.configVersion;
-
- // Mark all nodes as not master. We will mark ourself as master before releasing the lock.
- // NOTE: we use a "last-wins" policy if multiple hosts claim to be master.
- for (size_t i = 0; i < _set->nodes.size(); i++) {
- _set->nodes[i].isMaster = false;
- }
-
- // Check if the master agrees with our current list of nodes.
- // REMINDER: both _set->nodes and reply.members are sorted.
- if (_set->nodes.size() != reply.members.size() ||
- !std::equal(_set->nodes.begin(), _set->nodes.end(), reply.members.begin(), hostsEqual)) {
- LOGV2_DEBUG(20167,
- 2,
- "Adjusting nodes in our view of replica set {set_name} based on master reply: "
- "{reply_raw}",
- "set_name"_attr = _set->name,
- "reply_raw"_attr = redact(reply.raw));
-
- // remove non-members from _set->nodes
- _set->nodes.erase(
- std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.members)),
- _set->nodes.end());
-
- // add new members to _set->nodes
- for (auto& host : reply.members) {
- _set->findOrCreateNode(host);
- }
-
- // replace hostToScan queue with untried normal hosts. can both add and remove
- // hosts from the queue.
- _scan->hostsToScan.clear();
- _scan->enqueAllUntriedHosts(reply.members, _set->rand);
-
- if (!_scan->waitingFor.empty()) {
- // make sure we don't wait for any hosts that aren't considered members
- std::set<HostAndPort> newWaitingFor;
- std::set_intersection(reply.members.begin(),
- reply.members.end(),
- _scan->waitingFor.begin(),
- _scan->waitingFor.end(),
- std::inserter(newWaitingFor, newWaitingFor.end()));
- _scan->waitingFor.swap(newWaitingFor);
- }
- }
-
- bool changedHosts = reply.members != _set->seedNodes;
- bool changedPrimary = reply.host != _set->lastSeenMaster;
- if (changedHosts || changedPrimary) {
- ++_set->seedGen;
- _set->seedNodes = reply.members;
- _set->seedConnStr = _set->confirmedConnectionString();
-
- // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare
- // and we want to record our changes
- LOGV2(20168,
- "Confirmed replica set for {set_name} is {set_seedConnStr}",
- "set_name"_attr = _set->name,
- "set_seedConnStr"_attr = _set->seedConnStr);
-
- _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host, reply.passives);
- }
-
- // Update our working string
- _set->workingConnStr = _set->seedConnStr;
-
- // Update other nodes's information based on replies we've already seen
- for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
- it != _scan->unconfirmedReplies.end();
- ++it) {
- // this ignores replies from hosts not in _set->nodes (as modified above)
- _set->updateNodeIfInNodes(it->second);
- }
- _scan->unconfirmedReplies.clear();
-
- _scan->foundUpMaster = true;
- _set->lastSeenMaster = reply.host;
-
- return Status::OK();
-}
-
-void IsMasterReply::parse(const BSONObj& obj) {
- try {
- raw = obj.getOwned(); // don't use obj again after this line
-
- ok = raw["ok"].trueValue();
- if (!ok)
- return;
-
- setName = raw["setName"].str();
- hidden = raw["hidden"].trueValue();
- secondary = raw["secondary"].trueValue();
-
- minWireVersion = raw["minWireVersion"].numberInt();
- maxWireVersion = raw["maxWireVersion"].numberInt();
-
- // hidden nodes can't be master, even if they claim to be.
- isMaster = !hidden && raw["ismaster"].trueValue();
-
- if (isMaster && raw.hasField("electionId")) {
- electionId = raw["electionId"].OID();
- }
-
- configVersion = raw["setVersion"].numberInt();
-
- const string primaryString = raw["primary"].str();
- primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString);
-
- // both hosts and passives, but not arbiters, are considered "normal hosts"
- members.clear();
- BSONForEach(host, raw.getObjectField("hosts")) {
- members.insert(HostAndPort(host.String()));
- }
- BSONForEach(host, raw.getObjectField("passives")) {
- members.insert(HostAndPort(host.String()));
- passives.insert(HostAndPort(host.String()));
- }
-
- tags = raw.getObjectField("tags");
- BSONObj lastWriteField = raw.getObjectField("lastWrite");
- if (!lastWriteField.isEmpty()) {
- if (auto lastWrite = lastWriteField["lastWriteDate"]) {
- lastWriteDate = lastWrite.date();
- }
-
- uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime));
- }
- } catch (const std::exception& e) {
- ok = false;
- LOGV2(20169,
- "exception while parsing isMaster reply: {e_what} {obj}",
- "e_what"_attr = e.what(),
- "obj"_attr = obj);
- }
-}
-
-Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {}
-
-void Node::markFailed(const Status& status) {
- if (isUp) {
- LOGV2(20170,
- "Marking host {host} as failed{causedBy_status}",
- "host"_attr = host,
- "causedBy_status"_attr = causedBy(redact(status)));
-
- isUp = false;
- }
-
- isMaster = false;
-}
-
-bool Node::matches(const ReadPreference pref) const {
- if (!isUp) {
- LOGV2_DEBUG(20171, 3, "Host {host} is not up", "host"_attr = host);
- return false;
- }
-
- LOGV2_DEBUG(20172,
- 3,
- "Host {host} is {isMaster_primary_not_primary}",
- "host"_attr = host,
- "isMaster_primary_not_primary"_attr = (isMaster ? "primary" : "not primary"));
- if (pref == ReadPreference::PrimaryOnly) {
- return isMaster;
- }
-
- if (pref == ReadPreference::SecondaryOnly) {
- return !isMaster;
- }
-
- return true;
-}
-
-bool Node::matches(const BSONObj& tag) const {
- BSONForEach(tagCriteria, tag) {
- if (SimpleBSONElementComparator::kInstance.evaluate(
- this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) {
- return false;
- }
- }
-
- return true;
-}
-
-void Node::update(const IsMasterReply& reply) {
- invariant(host == reply.host);
- invariant(reply.ok);
-
- LOGV2_DEBUG(20173,
- 3,
- "Updating host {host} based on ismaster reply: {reply_raw}",
- "host"_attr = host,
- "reply_raw"_attr = reply.raw);
-
- // Nodes that are hidden or neither master or secondary are considered down since we can't
- // send any operations to them.
- isUp = !reply.hidden && (reply.isMaster || reply.secondary);
- isMaster = reply.isMaster;
-
- minWireVersion = reply.minWireVersion;
- maxWireVersion = reply.maxWireVersion;
-
- // save a copy if unchanged
- if (!tags.binaryEqual(reply.tags))
- tags = reply.tags.getOwned();
-
- if (reply.latencyMicros >= 0) { // TODO upper bound?
- if (latencyMicros == unknownLatency) {
- latencyMicros = reply.latencyMicros;
- } else {
- // update latency with smoothed moving average (1/4th the delta)
- latencyMicros += (reply.latencyMicros - latencyMicros) / 4;
- }
- }
-
- LOGV2_DEBUG(20174,
- 3,
- "Updating {host} lastWriteDate to {reply_lastWriteDate}",
- "host"_attr = host,
- "reply_lastWriteDate"_attr = reply.lastWriteDate);
- lastWriteDate = reply.lastWriteDate;
-
- LOGV2_DEBUG(20175,
- 3,
- "Updating {host} opTime to {reply_opTime}",
- "host"_attr = host,
- "reply_opTime"_attr = reply.opTime);
- opTime = reply.opTime;
- lastWriteDateUpdateTime = Date_t::now();
-}
-
-SetState::SetState(const MongoURI& uri,
- ReplicaSetChangeNotifier* notifier,
- executor::TaskExecutor* executor)
- : setUri(std::move(uri)),
- name(setUri.getSetName()),
- notifier(notifier),
- executor(executor),
- seedNodes(setUri.getServers().begin(), setUri.getServers().end()),
- latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)),
- rand(std::random_device()()),
- refreshPeriod(getDefaultRefreshPeriod()) {
- uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty());
-
- if (name.empty())
- LOGV2_WARNING(20182,
- "Replica set name empty, first node: {seedNodes_begin}",
- "seedNodes_begin"_attr = *(seedNodes.begin()));
-
- // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a
- // scan until we start a scan and either find a master or contact all hosts without finding
- // one.
- // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to
- // sort nodes after this loop.
- for (auto&& addr : seedNodes) {
- nodes.push_back(Node(addr));
-
- if (addr.host()[0] == '$') {
- invariant(isMocked || &addr == &*seedNodes.begin()); // Can't mix and match.
- isMocked = true;
- } else {
- invariant(!isMocked); // Can't mix and match.
- }
- }
-
- if (kDebugBuild)
- checkInvariants();
-}
-
-HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const {
- auto hosts = getMatchingHosts(criteria);
-
- if (hosts.empty()) {
- return HostAndPort();
- }
-
- return hosts[0];
-}
-
-std::vector<HostAndPort> SetState::getMatchingHosts(const ReadPreferenceSetting& criteria) const {
- switch (criteria.pref) {
- // "Prefered" read preferences are defined in terms of other preferences
- case ReadPreference::PrimaryPreferred: {
- std::vector<HostAndPort> out =
- getMatchingHosts(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
- // NOTE: the spec says we should use the primary even if tags don't match
- if (!out.empty())
- return out;
- return getMatchingHosts(ReadPreferenceSetting(
- ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
- }
-
- case ReadPreference::SecondaryPreferred: {
- std::vector<HostAndPort> out = getMatchingHosts(ReadPreferenceSetting(
- ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
- if (!out.empty())
- return out;
- // NOTE: the spec says we should use the primary even if tags don't match
- return getMatchingHosts(
- ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
- }
-
- case ReadPreference::PrimaryOnly: {
- // NOTE: isMaster implies isUp
- Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster);
- if (it == nodes.end())
- return {};
- return {it->host};
- }
-
- // The difference between these is handled by Node::matches
- case ReadPreference::SecondaryOnly:
- case ReadPreference::Nearest: {
- std::function<bool(const Node&)> matchNode = [](const Node& node) -> bool {
- return true;
- };
- // build comparator
- if (criteria.maxStalenessSeconds.count()) {
- auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster);
- if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) {
- auto writeDateCmp = [](const Node* a, const Node* b) -> bool {
- return a->lastWriteDate < b->lastWriteDate;
- };
- // use only non failed nodes
- std::vector<const Node*> upNodes;
- for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) {
- if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) {
- upNodes.push_back(&(*nodeIt));
- }
- }
- auto latestSecNode =
- std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp);
- if (latestSecNode == upNodes.end()) {
- matchNode = [](const Node& node) -> bool { return false; };
- } else {
- Date_t maxWriteTime = (*latestSecNode)->lastWriteDate;
- matchNode = [=](const Node& node) -> bool {
- return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) +
- refreshPeriod <=
- criteria.maxStalenessSeconds;
- };
- }
- } else {
- Seconds primaryStaleness = duration_cast<Seconds>(
- masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate);
- matchNode = [=](const Node& node) -> bool {
- return duration_cast<Seconds>(node.lastWriteDateUpdateTime -
- node.lastWriteDate) -
- primaryStaleness + refreshPeriod <=
- criteria.maxStalenessSeconds;
- };
- }
- }
-
- std::vector<const Node*> allMatchingNodes;
- BSONForEach(tagElem, criteria.tags.getTagBSON()) {
- uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj());
- BSONObj tag = tagElem.Obj();
-
- std::vector<const Node*> matchingNodes;
- for (size_t i = 0; i < nodes.size(); i++) {
- if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) &&
- matchNode(nodes[i])) {
- matchingNodes.push_back(&nodes[i]);
- }
- }
-
- // Only consider nodes that satisfy the minOpTime
- if (!criteria.minOpTime.isNull()) {
- std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater);
- for (size_t i = 0; i < matchingNodes.size(); i++) {
- if (matchingNodes[i]->opTime < criteria.minOpTime) {
- if (i == 0) {
- // If no nodes satisfy the minOpTime criteria, we ignore the
- // minOpTime requirement.
- break;
- }
- matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());
- break;
- }
- }
- }
-
- allMatchingNodes.insert(
- allMatchingNodes.end(), matchingNodes.begin(), matchingNodes.end());
- }
-
- // don't do more complicated selection if not needed
- if (allMatchingNodes.empty()) {
- return {};
- }
- if (allMatchingNodes.size() == 1) {
- return {allMatchingNodes.front()->host};
- }
-
- // If there are multiple nodes satisfying the minOpTime, next order by latency
- // and don't consider hosts further than a threshold from the closest.
- std::sort(allMatchingNodes.begin(), allMatchingNodes.end(), compareLatencies);
- for (size_t i = 1; i < allMatchingNodes.size(); i++) {
- int64_t distance =
- allMatchingNodes[i]->latencyMicros - allMatchingNodes[0]->latencyMicros;
- if (distance >= latencyThresholdMicros) {
- // this node and all remaining ones are too far away
- allMatchingNodes.erase(allMatchingNodes.begin() + i, allMatchingNodes.end());
- break;
- }
- }
-
- std::vector<HostAndPort> hosts;
- std::transform(allMatchingNodes.begin(),
- allMatchingNodes.end(),
- std::back_inserter(hosts),
- [](const auto& node) { return node->host; });
-
- // Note that the host list is only deterministic (or random) for the first node.
- // The rest of the list is in matchingNodes order (latency) with one element swapped
- // for the first element.
- if (auto bestHostIdx = ReplicaSetMonitor::useDeterministicHostSelection
- ? roundRobin++ % hosts.size()
- : rand.nextInt32(hosts.size())) {
- using std::swap;
- swap(hosts[0], hosts[bestHostIdx]);
- }
-
- return hosts;
- }
-
- default:
- uassert(16337, "Unknown read preference", false);
- break;
- }
-}
-
-Node* SetState::findNode(const HostAndPort& host) {
- const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
- if (it == nodes.end() || it->host != host)
- return nullptr;
-
- return &(*it);
-}
-
-Node* SetState::findOrCreateNode(const HostAndPort& host) {
- // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class
- // must function correctly even with more nodes). If we lift that restriction, we may need
- // to consider alternate algorithms.
- Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
- if (it == nodes.end() || it->host != host) {
- LOGV2_DEBUG(20176,
- 2,
- "Adding node {host} to our view of replica set {name}",
- "host"_attr = host,
- "name"_attr = name);
- it = nodes.insert(it, Node(host));
- }
- return &(*it);
-}
-
-void SetState::updateNodeIfInNodes(const IsMasterReply& reply) {
- Node* node = findNode(reply.host);
- if (!node) {
- LOGV2_DEBUG(20177,
- 2,
- "Skipping application of ismaster reply from {reply_host} since it isn't a "
- "confirmed member of set {name}",
- "reply_host"_attr = reply.host,
- "name"_attr = name);
- return;
- }
-
- node->update(reply);
-}
-
-ConnectionString SetState::confirmedConnectionString() const {
- std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes));
-
- return ConnectionString::forReplicaSet(name, std::move(hosts));
-}
-
-ConnectionString SetState::possibleConnectionString() const {
- std::vector<HostAndPort> hosts;
- hosts.reserve(nodes.size());
-
- for (auto& node : nodes) {
- hosts.push_back(node.host);
- }
-
- return ConnectionString::forReplicaSet(name, std::move(hosts));
-}
-
-void SetState::notify() {
- if (!waiters.size()) {
- return;
- }
-
- const auto cachedNow = now();
- auto shouldQuickFail = areRefreshRetriesDisabledForTest.load() && !currentScan;
-
- for (auto it = waiters.begin(); it != waiters.end();) {
- if (isDropped) {
- it->promise.setError({ErrorCodes::ShutdownInProgress,
- str::stream() << "ReplicaSetMonitor is shutting down"});
- waiters.erase(it++);
- continue;
- }
-
- auto match = getMatchingHosts(it->criteria);
- if (!match.empty()) {
- // match;
- it->promise.emplaceValue(std::move(match));
- waiters.erase(it++);
- } else if (it->deadline <= cachedNow) {
- LOGV2_DEBUG(
- 20178,
- 1,
- "Unable to statisfy read preference {it_criteria} by deadline {it_deadline}",
- "it_criteria"_attr = it->criteria,
- "it_deadline"_attr = it->deadline);
- it->promise.setError(makeUnsatisfedReadPrefError(it->criteria));
- waiters.erase(it++);
- } else if (shouldQuickFail) {
- LOGV2_DEBUG(20179, 1, "Unable to statisfy read preference because tests fail quickly");
- it->promise.setError(makeUnsatisfedReadPrefError(it->criteria));
- waiters.erase(it++);
- } else {
- it++;
- }
- }
-
- if (waiters.empty()) {
- // No current waiters so we can stop the expedited scanning.
- isExpedited = false;
- rescheduleRefresh(SchedulingStrategy::kCancelPreviousScan);
- }
-}
-
-Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const {
- return Status(ErrorCodes::FailedToSatisfyReadPreference,
- str::stream() << "Could not find host matching read preference "
- << criteria.toString() << " for set " << name);
-}
-
-void SetState::init() {
- rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan);
- notifier->onFoundSet(name);
-}
-
-void SetState::drop() {
- if (std::exchange(isDropped, true)) {
- // If a SetState calls drop() from destruction after the RSMM calls shutdown(), then the
- // RSMM's executor may no longer exist. Thus, only drop once.
- return;
- }
-
- currentScan.reset();
- notify();
-
- if (auto handle = std::exchange(refresherHandle, {})) {
- // Cancel our refresh on the way out
- executor->cancel(handle);
- }
-
- for (auto& node : nodes) {
- if (auto handle = std::exchange(node.scheduledIsMasterHandle, {})) {
- // Cancel any isMasters we had scheduled
- executor->cancel(handle);
- }
- }
-
- // No point in notifying if we never started
- if (workingConnStr.isValid()) {
- notifier->onDroppedSet(name);
- }
-}
-
-void SetState::checkInvariants() const {
- bool foundMaster = false;
- for (size_t i = 0; i < nodes.size(); i++) {
- // no empty hosts
- invariant(!nodes[i].host.empty());
-
- if (nodes[i].isMaster) {
- // masters must be up
- invariant(nodes[i].isUp);
-
- // at most one master
- invariant(!foundMaster);
- foundMaster = true;
-
- // if we have a master it should be the same as lastSeenMaster
- invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster);
- }
-
- // should never end up with negative latencies
- invariant(nodes[i].latencyMicros >= 0);
-
- // nodes must be sorted by host with no-dupes
- invariant(i == 0 || (nodes[i - 1].host < nodes[i].host));
- }
-
- // nodes should be a (non-strict) superset of the seedNodes
- invariant(std::includes(
- nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts));
-
- if (currentScan) {
- // hostsToScan can't have dups or hosts already in triedHosts.
- std::set<HostAndPort> cantSee = currentScan->triedHosts;
- for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin();
- it != currentScan->hostsToScan.end();
- ++it) {
- invariant(!cantSee.count(*it));
- cantSee.insert(*it); // make sure we don't see this again
- }
-
- // We should only be waitingFor hosts that are in triedHosts
- invariant(std::includes(currentScan->triedHosts.begin(),
- currentScan->triedHosts.end(),
- currentScan->waitingFor.begin(),
- currentScan->waitingFor.end()));
-
- // We should only have unconfirmedReplies if we haven't found a master yet
- invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty());
- }
-}
-
-template <typename Container>
-void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) {
- invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue.
-
- // no std::copy_if before c++11
- for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end;
- ++it) {
- if (!triedHosts.count(*it)) {
- hostsToScan.push_back(*it);
- }
- }
- std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg());
-}
+namespace {
+AtomicWord<bool> refreshRetriesDisabledForTest{false};
+} // namespace
-void ScanState::retryAllTriedHosts(PseudoRandom& rand) {
- invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue.
- // Move hosts that are in triedHosts but not in waitingFor from triedHosts to hostsToScan.
- std::set_difference(triedHosts.begin(),
- triedHosts.end(),
- waitingFor.begin(),
- waitingFor.end(),
- std::inserter(hostsToScan, hostsToScan.end()));
- std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg());
- triedHosts = waitingFor;
+void ReplicaSetMonitor::disableRefreshRetries_forTest() {
+ refreshRetriesDisabledForTest.store(true);
}
-void ScanState::markHostsToScanAsTried() noexcept {
- while (!hostsToScan.empty()) {
- auto host = hostsToScan.front();
- hostsToScan.pop_front();
- /**
- * Mark the popped host as tried to avoid deleting hosts in multiple points.
- * This emulates the final effect of Refresher::getNextStep() on the set.
- */
- triedHosts.insert(host);
- }
+bool ReplicaSetMonitor::areRefreshRetriesDisabledForTest() {
+ return refreshRetriesDisabledForTest.load();
}
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h
index 44b9a2504d1..1fa5f737ae9 100644
--- a/src/mongo/client/replica_set_monitor.h
+++ b/src/mongo/client/replica_set_monitor.h
@@ -34,9 +34,9 @@
#include <set>
#include <string>
-#include "mongo/base/string_data.h"
#include "mongo/client/mongo_uri.h"
#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/client/replica_set_monitor_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/duration.h"
@@ -44,136 +44,24 @@
#include "mongo/util/time_support.h"
namespace mongo {
-
-class BSONObj;
-class ReplicaSetMonitor;
-class ReplicaSetMonitorTest;
-struct ReadPreferenceSetting;
-typedef std::shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr;
-
/**
- * Holds state about a replica set and provides a means to refresh the local view.
- * All methods perform the required synchronization to allow callers from multiple threads.
+ * An abstract class, defines the external interface for static ReplicaSetMonitor methods and
+ * provides a means to refresh the local view.
+ * A ReplicaSetMonitor holds a state about the replica set and provides a means to refresh the local
+ * view. All methods perform the required synchronization to allow callers from multiple threads.
*/
-class ReplicaSetMonitor {
- ReplicaSetMonitor(const ReplicaSetMonitor&) = delete;
- ReplicaSetMonitor& operator=(const ReplicaSetMonitor&) = delete;
-
+class ReplicaSetMonitor : public ReplicaSetMonitorInterface {
public:
- class Refresher;
-
- static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500);
- static constexpr auto kCheckTimeout = Seconds(5);
-
- /**
- * Initializes local state from a MongoURI.
- */
- ReplicaSetMonitor(const MongoURI& uri);
-
- /**
- * Schedules the initial refresh task into task executor.
- */
- void init();
-
- /**
- * Ends any ongoing refreshes.
- */
- void drop();
+ virtual ~ReplicaSetMonitor() = default;
/**
- * Returns a host matching the given read preference or an error, if no host matches.
- *
- * @param readPref Read preference to match against
- * @param maxWait If no host is readily available, which matches the specified read preference,
- * wait for one to become available for up to the specified time and periodically refresh
- * the view of the set. The call may return with an error earlier than the specified value,
- * if none of the known hosts for the set are reachable within some number of attempts.
- * Note that if a maxWait of 0ms is specified, this method may still attempt to contact
- * every host in the replica set up to one time.
- *
- * Known errors are:
- * FailedToSatisfyReadPreference, if node cannot be found, which matches the read preference.
- */
- SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
- Milliseconds maxWait = kDefaultFindHostTimeout);
-
- SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
- const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout);
-
- /**
- * Returns the host we think is the current master or uasserts.
- *
- * This is a thin wrapper around getHostOrRefresh so this will also refresh our view if we
- * don't think there is a master at first. The main difference is that this will uassert
- * rather than returning an empty HostAndPort.
- */
- HostAndPort getMasterOrUassert();
-
- /**
- * Notifies this Monitor that a host has failed because of the specified error 'status' and
- * should be considered down.
+ * Defaults to false, meaning that if multiple hosts meet a criteria we pick one at random.
+ * This is required by the replica set driver spec. Set this to true in tests that need host
+ * selection to be deterministic.
*
- * Call this when you get a connection error. If you get an error while trying to refresh our
- * view of a host, call Refresher::failedHost instead because it bypasses taking the monitor's
- * mutex.
- */
- void failedHost(const HostAndPort& host, const Status& status);
-
- /**
- * Returns true if this node is the master based ONLY on local data. Be careful, return may
- * be stale.
- */
- bool isPrimary(const HostAndPort& host) const;
-
- /**
- * Returns true if host is part of this set and is considered up (meaning it can accept
- * queries).
- */
- bool isHostUp(const HostAndPort& host) const;
-
- /**
- * Returns the minimum wire version supported across the replica set.
- */
- int getMinWireVersion() const;
-
- /**
- * Returns the maximum wire version supported across the replica set.
- */
- int getMaxWireVersion() const;
-
- /**
- * The name of the set.
- */
- std::string getName() const;
-
- /**
- * Returns a std::string with the format name/server1,server2.
- * If name is empty, returns just comma-separated list of servers.
- * It IS updated to reflect the current members of the set.
- */
- std::string getServerAddress() const;
-
- /**
- * Returns the URI that was used to construct this monitor.
- * It IS NOT updated to reflect the current members of the set.
- */
- const MongoURI& getOriginalUri() const;
-
- /**
- * Is server part of this set? Uses only cached information.
- */
- bool contains(const HostAndPort& server) const;
-
- /**
- * Writes information about our cached view of the set to a BSONObjBuilder. If
- * forFTDC, trim to minimize its size for full-time diagnostic data capture.
- */
- void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const;
-
- /**
- * Returns true if the monitor knows a usable primary from it's interal view.
+ * NOTE: Used by unit-tests only.
*/
- bool isKnownToHaveGoodPrimary() const;
+ static bool useDeterministicHostSelection;
/**
* Creates a new ReplicaSetMonitor, if it doesn't already exist.
@@ -210,145 +98,13 @@ public:
static void cleanup();
/**
- * Use these to speed up tests by disabling the sleep-and-retry loops and cause errors to be
- * reported immediately.
- */
- static void disableRefreshRetries_forTest();
-
- /**
* Permanently stops all monitoring on replica sets.
*/
static void shutdown();
- /**
- * Returns the refresh period that is given to all new SetStates.
- */
- static Seconds getDefaultRefreshPeriod();
-
- //
- // internal types (defined in replica_set_monitor_internal.h)
- //
-
- struct IsMasterReply;
- struct ScanState;
- struct SetState;
- typedef std::shared_ptr<ScanState> ScanStatePtr;
- typedef std::shared_ptr<SetState> SetStatePtr;
-
- /**
- * Allows tests to set initial conditions and introspect the current state.
- */
- explicit ReplicaSetMonitor(const SetStatePtr& initialState);
- ~ReplicaSetMonitor();
-
- /**
- * The default timeout, which will be used for finding a replica set host if the caller does
- * not explicitly specify it.
- */
- static const Seconds kDefaultFindHostTimeout;
-
- /**
- * Defaults to false, meaning that if multiple hosts meet a criteria we pick one at random.
- * This is required by the replica set driver spec. Set this to true in tests that need host
- * selection to be deterministic.
- *
- * NOTE: Used by unit-tests only.
- */
- static bool useDeterministicHostSelection;
-
- /**
- * This is for use in tests using MockReplicaSet to ensure that a full scan completes before
- * continuing.
- */
- void runScanForMockReplicaSet();
-
-private:
- Future<std::vector<HostAndPort>> _getHostsOrRefresh(const ReadPreferenceSetting& readPref,
- Milliseconds maxWait);
- /**
- * If no scan is in-progress, this function is responsible for setting up a new scan. Otherwise,
- * does nothing.
- */
- static void _ensureScanInProgress(const SetStatePtr&);
-
- const SetStatePtr _state;
-};
-
-
-/**
- * Refreshes the local view of a replica set.
- *
- * All logic related to choosing the hosts to contact and updating the SetState based on replies
- * lives in this class. Use of this class should always be guarded by SetState::mutex unless in
- * single-threaded use by ReplicaSetMonitorTest.
- */
-class ReplicaSetMonitor::Refresher {
-public:
- explicit Refresher(const SetStatePtr& setState);
-
- struct NextStep {
- enum StepKind {
- CONTACT_HOST, /// Contact the returned host
- WAIT, /// Wait on condition variable and try again.
- DONE, /// No more hosts to contact in this Refresh round
- };
-
- explicit NextStep(StepKind step, const HostAndPort& host = HostAndPort())
- : step(step), host(host) {}
-
- StepKind step;
- HostAndPort host;
- };
-
- /**
- * Returns the next step to take.
- *
- * By calling this, you promise to call receivedIsMaster or failedHost if the NextStep is
- * CONTACT_HOST.
- */
- NextStep getNextStep();
-
- /**
- * Call this if a host returned from getNextStep successfully replied to an isMaster call.
- * Negative latencyMicros are ignored.
- */
- void receivedIsMaster(const HostAndPort& from, int64_t latencyMicros, const BSONObj& reply);
-
- /**
- * Call this if a host returned from getNextStep failed to reply to an isMaster call.
- */
- void failedHost(const HostAndPort& host, const Status& status);
-
- /**
- * Starts a new scan over the hosts in set.
- */
- void startNewScan();
-
- /**
- * First, checks that the "reply" is not from a stale primary by comparing the electionId of
- * "reply" to the maxElectionId recorded by the SetState and returns OK status if "reply"
- * belongs to a non-stale primary. Otherwise returns a failed status.
- *
- * The 'from' parameter specifies the node from which the response is received.
- *
- * Updates _set and _scan based on set-membership information from a master.
- * Applies _scan->unconfirmedReplies to confirmed nodes.
- * Does not update this host's node in _set->nodes.
- */
- Status receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply);
-
- /**
- * Schedules isMaster requests to all hosts that currently need to be contacted.
- * Does nothing if requests have already been sent to all known hosts.
- */
- void scheduleNetworkRequests();
-
- void scheduleIsMaster(const HostAndPort& host);
+ static void disableRefreshRetries_forTest();
-private:
- // Both pointers are never NULL
- SetStatePtr _set;
- ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started.
+ static bool areRefreshRetriesDisabledForTest();
};
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_interface.h b/src/mongo/client/replica_set_monitor_interface.h
new file mode 100644
index 00000000000..3826154dfe2
--- /dev/null
+++ b/src/mongo/client/replica_set_monitor_interface.h
@@ -0,0 +1,169 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <set>
+#include <string>
+
+#include "mongo/client/mongo_uri.h"
+#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+struct ReadPreferenceSetting;
+
+class ReplicaSetMonitorInterface {
+public:
+ virtual ~ReplicaSetMonitorInterface() = default;
+
+ /**
+ * The default timeout, which will be used for finding a replica set host if the caller does
+ * not explicitly specify it.
+ */
+ static constexpr Seconds kDefaultFindHostTimeout{15};
+
+ /**
+ * Schedules the initial refresh task into task executor.
+ */
+ virtual void init() = 0;
+
+ /**
+ * Ends any ongoing refreshes.
+ */
+ virtual void drop() = 0;
+
+ /**
+ * Returns a host matching the given read preference or an error, if no host matches.
+ *
+ * @param readPref Read preference to match against
+ * @param maxWait If no host is readily available that matches the specified read preference,
+ * wait for one to become available for up to the specified time and periodically refresh
+ * the view of the set. The call may return with an error earlier than the specified value,
+ * if none of the known hosts for the set are reachable within some number of attempts.
+ * Note that if a maxWait of 0ms is specified, this method may still attempt to contact
+ * every host in the replica set up to one time.
+ *
+ * Known errors are:
+ * FailedToSatisfyReadPreference, if node cannot be found, which matches the read preference.
+ */
+ virtual SemiFuture<HostAndPort> getHostOrRefresh(
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout) = 0;
+
+ virtual SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout) = 0;
+
+ /**
+ * Returns the host the RSM thinks is the current master or uasserts.
+ *
+ * This is a thin wrapper around getHostOrRefresh and will also refresh the view if a primary
+ * does not exist. The main difference is that this will uassert rather than returning an empty
+ * HostAndPort.
+ */
+ virtual HostAndPort getMasterOrUassert() = 0;
+
+ /**
+ * Notifies this Monitor that a host has failed because of the specified error 'status' and
+ * should be considered down.
+ *
+ * Call this when you get a connection error. If you get an error while trying to refresh our
+ * view of a host, call Refresher::failedHost instead because it bypasses taking the monitor's
+ * mutex.
+ */
+ virtual void failedHost(const HostAndPort& host, const Status& status) = 0;
+
+ /**
+ * Returns true if this node is the master based ONLY on local data. Be careful, return may
+ * be stale.
+ */
+ virtual bool isPrimary(const HostAndPort& host) const = 0;
+
+ /**
+ * Returns true if host is part of this set and is considered up (meaning it can accept
+ * queries).
+ */
+ virtual bool isHostUp(const HostAndPort& host) const = 0;
+
+ /**
+ * Returns the minimum wire version supported across the replica set.
+ */
+ virtual int getMinWireVersion() const = 0;
+
+ /**
+ * Returns the maximum wire version supported across the replica set.
+ */
+ virtual int getMaxWireVersion() const = 0;
+
+ /**
+ * The name of the set.
+ */
+ virtual std::string getName() const = 0;
+
+ /**
+ * Returns a std::string with the format name/server1,server2.
+ * If name is empty, returns just comma-separated list of servers.
+ * It IS updated to reflect the current members of the set.
+ */
+ virtual std::string getServerAddress() const = 0;
+
+ /**
+ * Returns the URI that was used to construct this monitor.
+ * It IS NOT updated to reflect the current members of the set.
+ */
+ virtual const MongoURI& getOriginalUri() const = 0;
+
+ /**
+ * Is server part of this set? Uses only cached information.
+ */
+ virtual bool contains(const HostAndPort& server) const = 0;
+
+ /**
+ * Writes information about our cached view of the set to a BSONObjBuilder. If
+ * forFTDC, trim to minimize its size for full-time diagnostic data capture.
+ */
+ virtual void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const = 0;
+
+ /**
+ * Returns true if the monitor knows a usable primary from it's interal view.
+ */
+ virtual bool isKnownToHaveGoodPrimary() const = 0;
+
+ /**
+ * This is for use in tests using MockReplicaSet to ensure that a full scan completes before
+ * continuing.
+ */
+ virtual void runScanForMockReplicaSet() = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index fad377e0368..1e81bbbb688 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -38,7 +38,9 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/mongo_uri.h"
-#include "mongo/client/replica_set_monitor.h"
+#include "mongo/client/replica_set_monitor_params_gen.h"
+#include "mongo/client/scanning_replica_set_monitor.h"
+#include "mongo/client/streamable_replica_set_monitor.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
@@ -130,10 +132,14 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const
LOGV2(20186, "Starting new replica set monitor for {uri}", "uri"_attr = uri.toString());
- auto newMonitor = std::make_shared<ReplicaSetMonitor>(uri);
- _monitors[setName] = newMonitor;
- newMonitor->init();
- return newMonitor;
+ if (disableStreamableReplicaSetMonitor.load()) {
+ auto newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri);
+ _monitors[setName] = newMonitor;
+ newMonitor->init();
+ return newMonitor;
+ } else {
+ uasserted(31451, "StreamableReplicaSetMonitor is not yet implemented");
+ }
}
vector<string> ReplicaSetMonitorManager::getAllSetNames() {
diff --git a/src/mongo/client/replica_set_monitor_params.idl b/src/mongo/client/replica_set_monitor_params.idl
new file mode 100644
index 00000000000..f5b0e571305
--- /dev/null
+++ b/src/mongo/client/replica_set_monitor_params.idl
@@ -0,0 +1,39 @@
+# Copyright (C) 2020-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+global:
+ cpp_namespace: mongo
+
+server_parameters:
+ disableStreamableReplicaSetMonitor:
+ description: >-
+ Disable the StreamableReplicaSetMonitor and revert to the prior behavior with the
+ ScanningReplicaSetMonitor
+ set_at: startup
+ cpp_vartype: AtomicWord<bool>
+ cpp_varname: disableStreamableReplicaSetMonitor
+ default: true
diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp
new file mode 100644
index 00000000000..d29259d4bb7
--- /dev/null
+++ b/src/mongo/client/scanning_replica_set_monitor.cpp
@@ -0,0 +1,1424 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/scanning_replica_set_monitor.h"
+
+#include <algorithm>
+#include <limits>
+#include <random>
+
+#include "mongo/bson/simple_bsonelement_comparator.h"
+#include "mongo/client/connpool.h"
+#include "mongo/client/global_conn_pool.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/client/scanning_replica_set_monitor_internal.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/bson_extract_optime.h"
+#include "mongo/db/server_options.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/util/background.h"
+#include "mongo/util/debug_util.h"
+#include "mongo/util/exit.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/log.h"
+#include "mongo/util/string_map.h"
+#include "mongo/util/timer.h"
+
+namespace mongo {
+
+using std::numeric_limits;
+using std::set;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+
+namespace {
+
+// Pull nested types to top-level scope
+typedef ScanningReplicaSetMonitor::IsMasterReply IsMasterReply;
+typedef ScanningReplicaSetMonitor::ScanState ScanState;
+typedef ScanningReplicaSetMonitor::ScanStatePtr ScanStatePtr;
+typedef ScanningReplicaSetMonitor::SetState SetState;
+typedef ScanningReplicaSetMonitor::SetStatePtr SetStatePtr;
+typedef ScanningReplicaSetMonitor::Refresher Refresher;
+typedef ScanState::UnconfirmedReplies UnconfirmedReplies;
+typedef SetState::Node Node;
+typedef SetState::Nodes Nodes;
+using executor::TaskExecutor;
+using CallbackArgs = TaskExecutor::CallbackArgs;
+using CallbackHandle = TaskExecutor::CallbackHandle;
+
+// Intentionally chosen to compare worse than all known latencies.
+const int64_t unknownLatency = numeric_limits<int64_t>::max();
+
+const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet());
+
+//
+// Helpers for stl algorithms
+//
+
+bool isMaster(const Node& node) {
+ return node.isMaster;
+}
+
+bool opTimeGreater(const Node* lhs, const Node* rhs) {
+ return lhs->opTime > rhs->opTime;
+}
+
+bool compareLatencies(const Node* lhs, const Node* rhs) {
+ // NOTE: this automatically compares Node::unknownLatency worse than all others.
+ return lhs->latencyMicros < rhs->latencyMicros;
+}
+
+bool hostsEqual(const Node& lhs, const HostAndPort& rhs) {
+ return lhs.host == rhs;
+}
+
+// Allows comparing two Nodes, or a HostAndPort and a Node.
+// NOTE: the two HostAndPort overload is only needed to support extra checks in some STL
+// implementations. For simplicity, no comparator should be used with collections of just
+// HostAndPort.
+struct CompareHosts {
+ bool operator()(const Node& lhs, const Node& rhs) {
+ return lhs.host < rhs.host;
+ }
+ bool operator()(const Node& lhs, const HostAndPort& rhs) {
+ return lhs.host < rhs;
+ }
+ bool operator()(const HostAndPort& lhs, const Node& rhs) {
+ return lhs < rhs.host;
+ }
+ bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) {
+ return lhs < rhs;
+ }
+} compareHosts; // like an overloaded function, but able to pass to stl algorithms
+
+// The following structs should be treated as functions returning a UnaryPredicate.
+// Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost));
+// They all hold their constructor argument by reference.
+
+struct HostIs {
+ explicit HostIs(const HostAndPort& host) : _host(host) {}
+ bool operator()(const HostAndPort& host) {
+ return host == _host;
+ }
+ bool operator()(const Node& node) {
+ return node.host == _host;
+ }
+ const HostAndPort& _host;
+};
+
+struct HostNotIn {
+ explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {}
+ bool operator()(const HostAndPort& host) {
+ return !_hosts.count(host);
+ }
+ bool operator()(const Node& node) {
+ return !_hosts.count(node.host);
+ }
+ const std::set<HostAndPort>& _hosts;
+};
+
+int32_t pingTimeMillis(const Node& node) {
+ auto latencyMillis = node.latencyMicros / 1000;
+ if (latencyMillis > numeric_limits<int32_t>::max()) {
+ // In particular, Node::unknownLatency does not fit in an int32.
+ return numeric_limits<int32_t>::max();
+ }
+ return latencyMillis;
+}
+
+/**
+ * Replica set refresh period on the task executor.
+ */
+const Seconds kDefaultRefreshPeriod(30);
+} // namespace
+
+ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const SetStatePtr& initialState)
+ : _state(initialState) {}
+
+ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const MongoURI& uri)
+ : ScanningReplicaSetMonitor(
+ std::make_shared<SetState>(uri,
+ &ReplicaSetMonitorManager::get()->getNotifier(),
+ ReplicaSetMonitorManager::get()->getExecutor())) {}
+
+void ScanningReplicaSetMonitor::init() {
+ if (areRefreshRetriesDisabledForTest()) {
+ // This is for MockReplicaSet. Those tests want to control when scanning happens.
+ warning() << "*** Not starting background refresh because refresh retries are disabled.";
+ return;
+ }
+
+ {
+ stdx::lock_guard lk(_state->mutex);
+ _state->init();
+ }
+}
+
+void ScanningReplicaSetMonitor::drop() {
+ {
+ stdx::lock_guard lk(_state->mutex);
+ _state->drop();
+ }
+}
+
+ScanningReplicaSetMonitor::~ScanningReplicaSetMonitor() {
+ drop();
+}
+
+template <typename Callback>
+auto ScanningReplicaSetMonitor::SetState::scheduleWorkAt(Date_t when, Callback&& cb) const {
+ auto wrappedCallback = [cb = std::forward<Callback>(cb),
+ anchor = shared_from_this()](const CallbackArgs& cbArgs) mutable {
+ if (ErrorCodes::isCancelationError(cbArgs.status)) {
+ // Do no more work if we're removed or canceled
+ return;
+ }
+ invariant(cbArgs.status);
+
+ stdx::lock_guard lk(anchor->mutex);
+ if (anchor->isDropped) {
+ return;
+ }
+
+ cb(cbArgs);
+ };
+ return executor->scheduleWorkAt(std::move(when), std::move(wrappedCallback));
+}
+
+void ScanningReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy strategy) {
+ // Reschedule the refresh
+
+ if (!executor || isMocked) {
+ // Without an executor, we can't do refreshes -- we're in a test
+ return;
+ }
+
+ if (isDropped) { // already removed so no need to refresh
+ LOG(1) << "Stopping refresh for replica set " << name << " because it's removed";
+ return;
+ }
+
+ Milliseconds period = refreshPeriod;
+ if (isExpedited) {
+ period = std::min<Milliseconds>(period, kExpeditedRefreshPeriod);
+ }
+
+ auto currentTime = now();
+ auto possibleNextScanTime = currentTime + period;
+ if (refresherHandle && //
+ (strategy == SchedulingStrategy::kKeepEarlyScan) && //
+ (nextScanTime > currentTime) && //
+ (possibleNextScanTime >= nextScanTime)) {
+ // If the next scan would be sooner than our desired, why cancel?
+ return;
+ }
+
+ // Cancel out the last refresh
+ if (auto currentHandle = std::exchange(refresherHandle, {})) {
+ executor->cancel(currentHandle);
+ }
+
+ nextScanTime = possibleNextScanTime;
+ LOG(1) << "Next replica set scan scheduled for " << nextScanTime;
+ auto swHandle = scheduleWorkAt(nextScanTime, [this](const CallbackArgs& cbArgs) {
+ if (cbArgs.myHandle != refresherHandle)
+ return; // We've been replaced!
+
+ // It is possible that a waiter will have already expired by the point of this rescan.
+ // Thus we notify here to trigger that logic.
+ notify();
+
+ _ensureScanInProgress(shared_from_this());
+
+ // And now we set up the next one
+ rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan);
+ });
+
+ if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) {
+ LOG(1) << "Cant schedule refresh for " << name << ". Executor shutdown in progress";
+ return;
+ }
+
+ if (!swHandle.isOK()) {
+ severe() << "Can't continue refresh for replica set " << name << " due to "
+ << redact(swHandle.getStatus());
+ fassertFailed(40140);
+ }
+
+ refresherHandle = std::move(swHandle.getValue());
+}
+
+SemiFuture<HostAndPort> ScanningReplicaSetMonitor::getHostOrRefresh(
+ const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
+ return _getHostsOrRefresh(criteria, maxWait)
+ .then([](const auto& hosts) {
+ invariant(hosts.size());
+ return hosts[0];
+ })
+ .semi();
+}
+
+SemiFuture<std::vector<HostAndPort>> ScanningReplicaSetMonitor::getHostsOrRefresh(
+ const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
+ return _getHostsOrRefresh(criteria, maxWait).semi();
+}
+
+Future<std::vector<HostAndPort>> ScanningReplicaSetMonitor::_getHostsOrRefresh(
+ const ReadPreferenceSetting& criteria, Milliseconds maxWait) {
+
+ stdx::lock_guard<Latch> lk(_state->mutex);
+ if (_state->isDropped) {
+ return Status(ErrorCodes::ReplicaSetMonitorRemoved,
+ str::stream()
+ << "ScanningReplicaSetMonitor for set " << getName() << " is removed");
+ }
+
+ auto out = _state->getMatchingHosts(criteria);
+ if (!out.empty())
+ return {std::move(out)};
+
+ // Fail early without doing any more work if the timeout has already expired.
+ if (maxWait <= Milliseconds(0))
+ return _state->makeUnsatisfedReadPrefError(criteria);
+
+ // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is
+ // dealing with maxWait.
+ auto pf = makePromiseFuture<decltype(out)>();
+ _state->waiters.emplace_back(
+ SetState::Waiter{_state->now() + maxWait, criteria, std::move(pf.promise)});
+
+ // This must go after we set up the wait state to correctly handle unittests using
+ // MockReplicaSet.
+ _ensureScanInProgress(_state);
+
+ // Switch to expedited scanning.
+ _state->isExpedited = true;
+ _state->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan);
+
+ return std::move(pf.future);
+}
+HostAndPort ScanningReplicaSetMonitor::getMasterOrUassert() {
+ return getHostOrRefresh(kPrimaryOnlyReadPreference).get();
+}
+
+void ScanningReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+ Node* node = _state->findNode(host);
+ if (node)
+ node->markFailed(status);
+ if (kDebugBuild)
+ _state->checkInvariants();
+}
+
+bool ScanningReplicaSetMonitor::isPrimary(const HostAndPort& host) const {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+ Node* node = _state->findNode(host);
+ return node ? node->isMaster : false;
+}
+
+bool ScanningReplicaSetMonitor::isHostUp(const HostAndPort& host) const {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+ Node* node = _state->findNode(host);
+ return node ? node->isUp : false;
+}
+
+int ScanningReplicaSetMonitor::getMinWireVersion() const {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+ int minVersion = 0;
+ for (const auto& host : _state->nodes) {
+ if (host.isUp) {
+ minVersion = std::max(minVersion, host.minWireVersion);
+ }
+ }
+
+ return minVersion;
+}
+
+int ScanningReplicaSetMonitor::getMaxWireVersion() const {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+ int maxVersion = std::numeric_limits<int>::max();
+ for (const auto& host : _state->nodes) {
+ if (host.isUp) {
+ maxVersion = std::min(maxVersion, host.maxWireVersion);
+ }
+ }
+
+ return maxVersion;
+}
+
+std::string ScanningReplicaSetMonitor::getName() const {
+ // name is const so don't need to lock
+ return _state->name;
+}
+
+std::string ScanningReplicaSetMonitor::getServerAddress() const {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+ // We return our setUri until first confirmation
+ return _state->seedConnStr.isValid() ? _state->seedConnStr.toString()
+ : _state->setUri.connectionString().toString();
+}
+
+const MongoURI& ScanningReplicaSetMonitor::getOriginalUri() const {
+ // setUri is const so no need to lock.
+ return _state->setUri;
+}
+
+bool ScanningReplicaSetMonitor::contains(const HostAndPort& host) const {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+ return _state->seedNodes.count(host);
+}
+
+// TODO move to correct order with non-statics before pushing
+void ScanningReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+
+ BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName()));
+ if (forFTDC) {
+ for (size_t i = 0; i < _state->nodes.size(); i++) {
+ const Node& node = _state->nodes[i];
+ monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node));
+ }
+ return;
+ }
+
+ // NOTE: the format here must be consistent for backwards compatibility
+ BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts"));
+ for (size_t i = 0; i < _state->nodes.size(); i++) {
+ const Node& node = _state->nodes[i];
+
+ BSONObjBuilder builder;
+ builder.append("addr", node.host.toString());
+ builder.append("ok", node.isUp);
+ builder.append("ismaster", node.isMaster); // intentionally not camelCase
+ builder.append("hidden", false); // we don't keep hidden nodes in the set
+ builder.append("secondary", node.isUp && !node.isMaster);
+ builder.append("pingTimeMillis", pingTimeMillis(node));
+
+ if (!node.tags.isEmpty()) {
+ builder.append("tags", node.tags);
+ }
+
+ hosts.append(builder.obj());
+ }
+}
+bool ScanningReplicaSetMonitor::isKnownToHaveGoodPrimary() const {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+
+ for (const auto& node : _state->nodes) {
+ if (node.isMaster) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+Seconds ScanningReplicaSetMonitor::getDefaultRefreshPeriod() {
+ Seconds r = kDefaultRefreshPeriod;
+ static constexpr auto kPeriodField = "period"_sd;
+ globalFailPointRegistry()
+ .find("modifyReplicaSetMonitorDefaultRefreshPeriod")
+ ->executeIf([&r](const BSONObj& data) { r = Seconds{data.getIntField(kPeriodField)}; },
+ [](const BSONObj& data) { return data.hasField(kPeriodField); });
+ return r;
+}
+
+void ScanningReplicaSetMonitor::runScanForMockReplicaSet() {
+ stdx::lock_guard<Latch> lk(_state->mutex);
+ _ensureScanInProgress(_state);
+
+ // This function should only be called from tests using MockReplicaSet and they should use the
+ // synchronous path to complete before returning.
+ invariant(_state->currentScan == nullptr);
+}
+
+void ScanningReplicaSetMonitor::_ensureScanInProgress(const SetStatePtr& state) {
+ Refresher(state).scheduleNetworkRequests();
+}
+
+Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) {
+ if (_scan) {
+ _set->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan);
+ _scan->retryAllTriedHosts(_set->rand);
+ return; // participate in in-progress scan
+ }
+
+ startNewScan();
+}
+
+void Refresher::scheduleNetworkRequests() {
+ for (auto ns = getNextStep(); ns.step == NextStep::CONTACT_HOST; ns = getNextStep()) {
+ if (!_set->executor || _set->isMocked) {
+ // If we're mocked, just schedule an isMaster
+ scheduleIsMaster(ns.host);
+ continue;
+ }
+
+ // cancel any scheduled isMaster calls that haven't yet been called
+ Node* node = _set->findOrCreateNode(ns.host);
+ if (auto handle = std::exchange(node->scheduledIsMasterHandle, {})) {
+ _set->executor->cancel(handle);
+ }
+
+ // ensure that the call to isMaster is scheduled at most every 500ms
+ auto swHandle =
+ _set->scheduleWorkAt(node->nextPossibleIsMasterCall,
+ [*this, host = ns.host](const CallbackArgs& cbArgs) mutable {
+ scheduleIsMaster(host);
+ });
+
+ if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) {
+ _scan->markHostsToScanAsTried();
+ break;
+ }
+
+ if (!swHandle.isOK()) {
+ severe() << "Can't continue scan for replica set " << _set->name << " due to "
+ << redact(swHandle.getStatus());
+ fassertFailed(31176);
+ }
+
+ node->scheduledIsMasterHandle = uassertStatusOK(std::move(swHandle));
+ }
+
+ if (kDebugBuild)
+ _set->checkInvariants();
+}
+
+void Refresher::scheduleIsMaster(const HostAndPort& host) {
+ if (_set->isMocked) {
+ // MockReplicaSet only works with DBClient-style access since it injects itself into the
+ // ScopedDbConnection pool connection creation.
+ try {
+ ScopedDbConnection conn(ConnectionString(host), kCheckTimeout.count());
+
+ auto timer = Timer();
+ auto reply = BSONObj();
+ bool ignoredOutParam = false;
+ conn->isMaster(ignoredOutParam, &reply);
+ conn.done(); // return to pool on success.
+
+ receivedIsMaster(host, timer.micros(), reply);
+ } catch (DBException& ex) {
+ failedHost(host, ex.toStatus());
+ }
+
+ return;
+ }
+
+ auto request = executor::RemoteCommandRequest(
+ host, "admin", BSON("isMaster" << 1), nullptr, kCheckTimeout);
+ request.sslMode = _set->setUri.getSSLMode();
+ auto status =
+ _set->executor
+ ->scheduleRemoteCommand(
+ std::move(request),
+ [copy = *this, host, timer = Timer()](
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
+ stdx::lock_guard lk(copy._set->mutex);
+ // Ignore the reply and return if we are no longer the current scan. This might
+ // happen if it was decided that the host we were contacting isn't part of the
+ // set.
+ if (copy._scan != copy._set->currentScan) {
+ return;
+ }
+
+ // ensure that isMaster calls occur at most 500ms after the previous call ended
+ if (auto node = copy._set->findNode(host)) {
+ node->nextPossibleIsMasterCall =
+ copy._set->executor->now() + Milliseconds(500);
+ }
+
+ if (result.response.isOK()) {
+ // Not using result.response.elapsedMillis because higher precision is
+ // useful for computing the rolling average.
+ copy.receivedIsMaster(host, timer.micros(), result.response.data);
+ } else {
+ copy.failedHost(host, result.response.status);
+ }
+
+ // This reply may have discovered new hosts to contact so we need to schedule
+ // them.
+ copy.scheduleNetworkRequests();
+ })
+ .getStatus();
+
+ if (!status.isOK()) {
+ failedHost(host, status);
+ // This is only called from scheduleNetworkRequests() which will still be looping, so we
+ // don't need to call it here after updating the state.
+ }
+}
+
+Refresher::NextStep Refresher::getNextStep() {
+ // No longer the current scan
+ if (_scan != _set->currentScan) {
+ return NextStep(NextStep::DONE);
+ }
+
+ // Wait for all dispatched hosts to return before trying any fallback hosts.
+ if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) {
+ return NextStep(NextStep::WAIT);
+ }
+
+ // If we haven't yet found a master, try contacting unconfirmed hosts
+ if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) {
+ _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand);
+ _scan->possibleNodes.clear();
+ }
+
+ if (_scan->hostsToScan.empty()) {
+ // We've tried all hosts we can, so nothing more to do in this round.
+ if (!_scan->foundUpMaster) {
+ warning() << "Unable to reach primary for set " << _set->name;
+
+ // Since we've talked to everyone we could but still didn't find a primary, we
+ // do the best we can, and assume all unconfirmedReplies are actually from nodes
+ // in the set (we've already confirmed that they think they are). This is
+ // important since it allows us to bootstrap to a usable state even if we are
+ // unable to talk to a master while starting up. As soon as we are able to
+ // contact a master, we will remove any nodes that it doesn't think are part of
+ // the set, undoing the damage we cause here.
+
+ // NOTE: we don't modify seedNodes or notify about set membership change in this
+ // case since it hasn't been confirmed by a master.
+ for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
+ it != _scan->unconfirmedReplies.end();
+ ++it) {
+ _set->findOrCreateNode(it->first)->update(it->second);
+ }
+
+ auto connStr = _set->possibleConnectionString();
+ if (connStr != _set->workingConnStr) {
+ _set->workingConnStr = std::move(connStr);
+ _set->notifier->onPossibleSet(_set->workingConnStr);
+ }
+
+ // If at some point we care about lacking a primary, on it here
+ _set->lastSeenMaster = {};
+ }
+
+ if (_scan->foundAnyUpNodes) {
+ _set->consecutiveFailedScans = 0;
+ } else {
+ auto nScans = _set->consecutiveFailedScans++;
+ if (nScans <= 10 || nScans % 10 == 0) {
+ log() << "Cannot reach any nodes for set " << _set->name
+ << ". Please check network connectivity and the status of the set. "
+ << "This has happened for " << _set->consecutiveFailedScans
+ << " checks in a row.";
+ }
+ }
+
+ // Makes sure all other Refreshers in this round return DONE
+ _set->currentScan.reset();
+ _set->notify();
+
+ LOG(1) << "Refreshing replica set " << _set->name << " took " << _scan->timer.millis()
+ << "ms";
+
+ return NextStep(NextStep::DONE);
+ }
+
+ // Pop and return the next hostToScan.
+ HostAndPort host = _scan->hostsToScan.front();
+ _scan->hostsToScan.pop_front();
+ _scan->waitingFor.insert(host);
+ _scan->triedHosts.insert(host);
+
+ return NextStep(NextStep::CONTACT_HOST, host);
+}
+
+void Refresher::receivedIsMaster(const HostAndPort& from,
+ int64_t latencyMicros,
+ const BSONObj& replyObj) {
+ _scan->waitingFor.erase(from);
+
+ const IsMasterReply reply(from, latencyMicros, replyObj);
+
+ // Handle various failure cases
+ if (!reply.ok) {
+ failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"});
+ return;
+ }
+
+ if (reply.setName != _set->name) {
+ if (reply.raw["isreplicaset"].trueValue()) {
+ // The reply came from a node in the state referred to as RSGhost in the SDAM
+ // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event,
+ // if a reply from a ghost offers a list of possible other members of the replica set,
+ // and if this refresher has yet to find the replica set master, we add hosts listed in
+ // the reply to the list of possible replica set members.
+ if (!_scan->foundUpMaster) {
+ _scan->possibleNodes.insert(reply.members.begin(), reply.members.end());
+ }
+ } else {
+ error() << "replset name mismatch: expected \"" << _set->name << "\", "
+ << "but remote node " << from << " has replset name \"" << reply.setName << "\""
+ << ", ismaster: " << replyObj;
+ }
+
+ failedHost(from,
+ {ErrorCodes::InconsistentReplicaSetNames,
+ str::stream() << "Target replica set name " << reply.setName
+ << " does not match the monitored set name " << _set->name});
+ return;
+ }
+
+ if (reply.isMaster) {
+ Status status = receivedIsMasterFromMaster(from, reply);
+ if (!status.isOK()) {
+ failedHost(from, status);
+ return;
+ }
+ }
+
+ if (_scan->foundUpMaster) {
+ // We only update a Node if a master has confirmed it is in the set.
+ _set->updateNodeIfInNodes(reply);
+ } else {
+ // Populate possibleNodes.
+ _scan->possibleNodes.insert(reply.members.begin(), reply.members.end());
+ _scan->unconfirmedReplies[from] = reply;
+ }
+
+ // _set->nodes may still not have any nodes with isUp==true, but we have at least found a
+ // connectible host that is that claims to be in the set.
+ _scan->foundAnyUpNodes = true;
+
+ _set->notify();
+
+ if (kDebugBuild)
+ _set->checkInvariants();
+}
+
+void Refresher::failedHost(const HostAndPort& host, const Status& status) {
+ _scan->waitingFor.erase(host);
+
+ Node* node = _set->findNode(host);
+ if (node)
+ node->markFailed(status);
+
+ if (_scan->waitingFor.empty()) {
+ // If this was the last host that needed a response, we should notify the SetState so that
+ // we can fail any waiters that have timed out.
+ _set->notify();
+ }
+}
+
+void Refresher::startNewScan() {
+ // The heuristics we use in deciding the order to contact hosts are designed to find a
+ // master as quickly as possible. This is because we can't use any hosts we find until
+ // we either get the latest set of members from a master or talk to all possible hosts
+ // without finding a master.
+
+ // TODO It might make sense to check down nodes first if the last seen master is still
+ // marked as up.
+
+ _scan = std::make_shared<ScanState>();
+ _set->currentScan = _scan;
+
+ int upNodes = 0;
+ for (Nodes::const_iterator it(_set->nodes.begin()), end(_set->nodes.end()); it != end; ++it) {
+ if (it->isUp) {
+ // _scan the nodes we think are up first
+ _scan->hostsToScan.push_front(it->host);
+ upNodes++;
+ } else {
+ _scan->hostsToScan.push_back(it->host);
+ }
+ }
+
+ // shuffle the queue, but keep "up" nodes at the front
+ std::shuffle(
+ _scan->hostsToScan.begin(), _scan->hostsToScan.begin() + upNodes, _set->rand.urbg());
+ std::shuffle(_scan->hostsToScan.begin() + upNodes, _scan->hostsToScan.end(), _set->rand.urbg());
+
+ if (!_set->lastSeenMaster.empty()) {
+ // move lastSeenMaster to front of queue
+ std::stable_partition(
+ _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(_set->lastSeenMaster));
+ }
+}
+
+Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) {
+ invariant(reply.isMaster);
+
+ // Reject if config version is older. This is for backwards compatibility with nodes in pv0
+ // since they don't have the same ordering with pv1 electionId.
+ if (reply.configVersion < _set->configVersion) {
+ return {
+ ErrorCodes::NotMaster,
+ str::stream() << "Node " << from << " believes it is primary, but its config version "
+ << reply.configVersion << " is older than the most recent config version "
+ << _set->configVersion};
+ }
+
+ if (reply.electionId.isSet()) {
+ // ElectionIds are only comparable if they are of the same protocol version. However, since
+ // isMaster has no protocol version field, we use the configVersion instead. This works
+ // because configVersion needs to be incremented whenever the protocol version is changed.
+ if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() &&
+ _set->maxElectionId.compare(reply.electionId) > 0) {
+ return {
+ ErrorCodes::NotMaster,
+ str::stream() << "Node " << from << " believes it is primary, but its election id "
+ << reply.electionId << " is older than the most recent election id "
+ << _set->maxElectionId};
+ }
+
+ _set->maxElectionId = reply.electionId;
+ }
+
+ _set->configVersion = reply.configVersion;
+
+ // Mark all nodes as not master. We will mark ourself as master before releasing the lock.
+ // NOTE: we use a "last-wins" policy if multiple hosts claim to be master.
+ for (size_t i = 0; i < _set->nodes.size(); i++) {
+ _set->nodes[i].isMaster = false;
+ }
+
+ // Check if the master agrees with our current list of nodes.
+ // REMINDER: both _set->nodes and reply.members are sorted.
+ if (_set->nodes.size() != reply.members.size() ||
+ !std::equal(_set->nodes.begin(), _set->nodes.end(), reply.members.begin(), hostsEqual)) {
+ LOG(2) << "Adjusting nodes in our view of replica set " << _set->name
+ << " based on master reply: " << redact(reply.raw);
+
+ // remove non-members from _set->nodes
+ _set->nodes.erase(
+ std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.members)),
+ _set->nodes.end());
+
+ // add new members to _set->nodes
+ for (auto& host : reply.members) {
+ _set->findOrCreateNode(host);
+ }
+
+ // replace hostToScan queue with untried normal hosts. can both add and remove
+ // hosts from the queue.
+ _scan->hostsToScan.clear();
+ _scan->enqueAllUntriedHosts(reply.members, _set->rand);
+
+ if (!_scan->waitingFor.empty()) {
+ // make sure we don't wait for any hosts that aren't considered members
+ std::set<HostAndPort> newWaitingFor;
+ std::set_intersection(reply.members.begin(),
+ reply.members.end(),
+ _scan->waitingFor.begin(),
+ _scan->waitingFor.end(),
+ std::inserter(newWaitingFor, newWaitingFor.end()));
+ _scan->waitingFor.swap(newWaitingFor);
+ }
+ }
+
+ bool changedHosts = reply.members != _set->seedNodes;
+ bool changedPrimary = reply.host != _set->lastSeenMaster;
+ if (changedHosts || changedPrimary) {
+ ++_set->seedGen;
+ _set->seedNodes = reply.members;
+ _set->seedConnStr = _set->confirmedConnectionString();
+
+ // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare
+ // and we want to record our changes
+ log() << "Confirmed replica set for " << _set->name << " is " << _set->seedConnStr;
+
+ _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host, reply.passives);
+ }
+
+ // Update our working string
+ _set->workingConnStr = _set->seedConnStr;
+
+ // Update other nodes's information based on replies we've already seen
+ for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
+ it != _scan->unconfirmedReplies.end();
+ ++it) {
+ // this ignores replies from hosts not in _set->nodes (as modified above)
+ _set->updateNodeIfInNodes(it->second);
+ }
+ _scan->unconfirmedReplies.clear();
+
+ _scan->foundUpMaster = true;
+ _set->lastSeenMaster = reply.host;
+
+ return Status::OK();
+}
+
+void IsMasterReply::parse(const BSONObj& obj) {
+ try {
+ raw = obj.getOwned(); // don't use obj again after this line
+
+ ok = raw["ok"].trueValue();
+ if (!ok)
+ return;
+
+ setName = raw["setName"].str();
+ hidden = raw["hidden"].trueValue();
+ secondary = raw["secondary"].trueValue();
+
+ minWireVersion = raw["minWireVersion"].numberInt();
+ maxWireVersion = raw["maxWireVersion"].numberInt();
+
+ // hidden nodes can't be master, even if they claim to be.
+ isMaster = !hidden && raw["ismaster"].trueValue();
+
+ if (isMaster && raw.hasField("electionId")) {
+ electionId = raw["electionId"].OID();
+ }
+
+ configVersion = raw["setVersion"].numberInt();
+
+ const string primaryString = raw["primary"].str();
+ primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString);
+
+ // both hosts and passives, but not arbiters, are considered "normal hosts"
+ members.clear();
+ BSONForEach(host, raw.getObjectField("hosts")) {
+ members.insert(HostAndPort(host.String()));
+ }
+ BSONForEach(host, raw.getObjectField("passives")) {
+ members.insert(HostAndPort(host.String()));
+ passives.insert(HostAndPort(host.String()));
+ }
+
+ tags = raw.getObjectField("tags");
+ BSONObj lastWriteField = raw.getObjectField("lastWrite");
+ if (!lastWriteField.isEmpty()) {
+ if (auto lastWrite = lastWriteField["lastWriteDate"]) {
+ lastWriteDate = lastWrite.date();
+ }
+
+ uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime));
+ }
+ } catch (const std::exception& e) {
+ ok = false;
+ log() << "exception while parsing isMaster reply: " << e.what() << " " << obj;
+ }
+}
+
+Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {}
+
+void Node::markFailed(const Status& status) {
+ if (isUp) {
+ log() << "Marking host " << host << " as failed" << causedBy(redact(status));
+
+ isUp = false;
+ }
+
+ isMaster = false;
+}
+
+bool Node::matches(const ReadPreference pref) const {
+ if (!isUp) {
+ LOG(3) << "Host " << host << " is not up";
+ return false;
+ }
+
+ LOG(3) << "Host " << host << " is " << (isMaster ? "primary" : "not primary");
+ if (pref == ReadPreference::PrimaryOnly) {
+ return isMaster;
+ }
+
+ if (pref == ReadPreference::SecondaryOnly) {
+ return !isMaster;
+ }
+
+ return true;
+}
+
+bool Node::matches(const BSONObj& tag) const {
+ BSONForEach(tagCriteria, tag) {
+ if (SimpleBSONElementComparator::kInstance.evaluate(
+ this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void Node::update(const IsMasterReply& reply) {
+ invariant(host == reply.host);
+ invariant(reply.ok);
+
+ LOG(3) << "Updating host " << host << " based on ismaster reply: " << reply.raw;
+
+ // Nodes that are hidden or neither master or secondary are considered down since we can't
+ // send any operations to them.
+ isUp = !reply.hidden && (reply.isMaster || reply.secondary);
+ isMaster = reply.isMaster;
+
+ minWireVersion = reply.minWireVersion;
+ maxWireVersion = reply.maxWireVersion;
+
+ // save a copy if unchanged
+ if (!tags.binaryEqual(reply.tags))
+ tags = reply.tags.getOwned();
+
+ if (reply.latencyMicros >= 0) { // TODO upper bound?
+ if (latencyMicros == unknownLatency) {
+ latencyMicros = reply.latencyMicros;
+ } else {
+ // update latency with smoothed moving average (1/4th the delta)
+ latencyMicros += (reply.latencyMicros - latencyMicros) / 4;
+ }
+ }
+
+ LOG(3) << "Updating " << host << " lastWriteDate to " << reply.lastWriteDate;
+ lastWriteDate = reply.lastWriteDate;
+
+ LOG(3) << "Updating " << host << " opTime to " << reply.opTime;
+ opTime = reply.opTime;
+ lastWriteDateUpdateTime = Date_t::now();
+}
+
+SetState::SetState(const MongoURI& uri,
+ ReplicaSetChangeNotifier* notifier,
+ executor::TaskExecutor* executor)
+ : setUri(std::move(uri)),
+ name(setUri.getSetName()),
+ notifier(notifier),
+ executor(executor),
+ seedNodes(setUri.getServers().begin(), setUri.getServers().end()),
+ latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)),
+ rand(std::random_device()()),
+ refreshPeriod(getDefaultRefreshPeriod()) {
+ uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty());
+
+ if (name.empty())
+ warning() << "Replica set name empty, first node: " << *(seedNodes.begin());
+
+ // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a
+ // scan until we start a scan and either find a master or contact all hosts without finding
+ // one.
+ // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to
+ // sort nodes after this loop.
+ for (auto&& addr : seedNodes) {
+ nodes.push_back(Node(addr));
+
+ if (addr.host()[0] == '$') {
+ invariant(isMocked || &addr == &*seedNodes.begin()); // Can't mix and match.
+ isMocked = true;
+ } else {
+ invariant(!isMocked); // Can't mix and match.
+ }
+ }
+
+ if (kDebugBuild)
+ checkInvariants();
+}
+
+HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const {
+ auto hosts = getMatchingHosts(criteria);
+
+ if (hosts.empty()) {
+ return HostAndPort();
+ }
+
+ return hosts[0];
+}
+
+std::vector<HostAndPort> SetState::getMatchingHosts(const ReadPreferenceSetting& criteria) const {
+ switch (criteria.pref) {
+ // "Prefered" read preferences are defined in terms of other preferences
+ case ReadPreference::PrimaryPreferred: {
+ std::vector<HostAndPort> out =
+ getMatchingHosts(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
+ // NOTE: the spec says we should use the primary even if tags don't match
+ if (!out.empty())
+ return out;
+ return getMatchingHosts(ReadPreferenceSetting(
+ ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
+ }
+
+ case ReadPreference::SecondaryPreferred: {
+ std::vector<HostAndPort> out = getMatchingHosts(ReadPreferenceSetting(
+ ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds));
+ if (!out.empty())
+ return out;
+ // NOTE: the spec says we should use the primary even if tags don't match
+ return getMatchingHosts(
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags));
+ }
+
+ case ReadPreference::PrimaryOnly: {
+ // NOTE: isMaster implies isUp
+ Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster);
+ if (it == nodes.end())
+ return {};
+ return {it->host};
+ }
+
+ // The difference between these is handled by Node::matches
+ case ReadPreference::SecondaryOnly:
+ case ReadPreference::Nearest: {
+ std::function<bool(const Node&)> matchNode = [](const Node& node) -> bool {
+ return true;
+ };
+ // build comparator
+ if (criteria.maxStalenessSeconds.count()) {
+ auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster);
+ if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) {
+ auto writeDateCmp = [](const Node* a, const Node* b) -> bool {
+ return a->lastWriteDate < b->lastWriteDate;
+ };
+ // use only non failed nodes
+ std::vector<const Node*> upNodes;
+ for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) {
+ if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) {
+ upNodes.push_back(&(*nodeIt));
+ }
+ }
+ auto latestSecNode =
+ std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp);
+ if (latestSecNode == upNodes.end()) {
+ matchNode = [](const Node& node) -> bool { return false; };
+ } else {
+ Date_t maxWriteTime = (*latestSecNode)->lastWriteDate;
+ matchNode = [=](const Node& node) -> bool {
+ return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) +
+ refreshPeriod <=
+ criteria.maxStalenessSeconds;
+ };
+ }
+ } else {
+ Seconds primaryStaleness = duration_cast<Seconds>(
+ masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate);
+ matchNode = [=](const Node& node) -> bool {
+ return duration_cast<Seconds>(node.lastWriteDateUpdateTime -
+ node.lastWriteDate) -
+ primaryStaleness + refreshPeriod <=
+ criteria.maxStalenessSeconds;
+ };
+ }
+ }
+
+ std::vector<const Node*> allMatchingNodes;
+ BSONForEach(tagElem, criteria.tags.getTagBSON()) {
+ uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj());
+ BSONObj tag = tagElem.Obj();
+
+ std::vector<const Node*> matchingNodes;
+ for (size_t i = 0; i < nodes.size(); i++) {
+ if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) &&
+ matchNode(nodes[i])) {
+ matchingNodes.push_back(&nodes[i]);
+ }
+ }
+
+ // Only consider nodes that satisfy the minOpTime
+ if (!criteria.minOpTime.isNull()) {
+ std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater);
+ for (size_t i = 0; i < matchingNodes.size(); i++) {
+ if (matchingNodes[i]->opTime < criteria.minOpTime) {
+ if (i == 0) {
+ // If no nodes satisfy the minOpTime criteria, we ignore the
+ // minOpTime requirement.
+ break;
+ }
+ matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());
+ break;
+ }
+ }
+ }
+
+ allMatchingNodes.insert(
+ allMatchingNodes.end(), matchingNodes.begin(), matchingNodes.end());
+ }
+
+ // don't do more complicated selection if not needed
+ if (allMatchingNodes.empty()) {
+ return {};
+ }
+ if (allMatchingNodes.size() == 1) {
+ return {allMatchingNodes.front()->host};
+ }
+
+ // If there are multiple nodes satisfying the minOpTime, next order by latency
+ // and don't consider hosts further than a threshold from the closest.
+ std::sort(allMatchingNodes.begin(), allMatchingNodes.end(), compareLatencies);
+ for (size_t i = 1; i < allMatchingNodes.size(); i++) {
+ int64_t distance =
+ allMatchingNodes[i]->latencyMicros - allMatchingNodes[0]->latencyMicros;
+ if (distance >= latencyThresholdMicros) {
+ // this node and all remaining ones are too far away
+ allMatchingNodes.erase(allMatchingNodes.begin() + i, allMatchingNodes.end());
+ break;
+ }
+ }
+
+ std::vector<HostAndPort> hosts;
+ std::transform(allMatchingNodes.begin(),
+ allMatchingNodes.end(),
+ std::back_inserter(hosts),
+ [](const auto& node) { return node->host; });
+
+ // Note that the host list is only deterministic (or random) for the first node.
+ // The rest of the list is in matchingNodes order (latency) with one element swapped
+ // for the first element.
+ if (auto bestHostIdx = useDeterministicHostSelection ? roundRobin++ % hosts.size()
+ : rand.nextInt32(hosts.size())) {
+ using std::swap;
+ swap(hosts[0], hosts[bestHostIdx]);
+ }
+
+ return hosts;
+ }
+
+ default:
+ uassert(16337, "Unknown read preference", false);
+ break;
+ }
+}
+
+Node* SetState::findNode(const HostAndPort& host) {
+ const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
+ if (it == nodes.end() || it->host != host)
+ return nullptr;
+
+ return &(*it);
+}
+
+Node* SetState::findOrCreateNode(const HostAndPort& host) {
+ // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class
+ // must function correctly even with more nodes). If we lift that restriction, we may need
+ // to consider alternate algorithms.
+ Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
+ if (it == nodes.end() || it->host != host) {
+ LOG(2) << "Adding node " << host << " to our view of replica set " << name;
+ it = nodes.insert(it, Node(host));
+ }
+ return &(*it);
+}
+
+void SetState::updateNodeIfInNodes(const IsMasterReply& reply) {
+ Node* node = findNode(reply.host);
+ if (!node) {
+ LOG(2) << "Skipping application of ismaster reply from " << reply.host
+ << " since it isn't a confirmed member of set " << name;
+ return;
+ }
+
+ node->update(reply);
+}
+
+ConnectionString SetState::confirmedConnectionString() const {
+ std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes));
+
+ return ConnectionString::forReplicaSet(name, std::move(hosts));
+}
+
+ConnectionString SetState::possibleConnectionString() const {
+ std::vector<HostAndPort> hosts;
+ hosts.reserve(nodes.size());
+
+ for (auto& node : nodes) {
+ hosts.push_back(node.host);
+ }
+
+ return ConnectionString::forReplicaSet(name, std::move(hosts));
+}
+
+void SetState::notify() {
+ if (!waiters.size()) {
+ return;
+ }
+
+ const auto cachedNow = now();
+ auto shouldQuickFail = areRefreshRetriesDisabledForTest() && !currentScan;
+
+ for (auto it = waiters.begin(); it != waiters.end();) {
+ if (isDropped) {
+ it->promise.setError({ErrorCodes::ShutdownInProgress,
+ str::stream() << "ScanningReplicaSetMonitor is shutting down"});
+ waiters.erase(it++);
+ continue;
+ }
+
+ auto match = getMatchingHosts(it->criteria);
+ if (!match.empty()) {
+ // match;
+ it->promise.emplaceValue(std::move(match));
+ waiters.erase(it++);
+ } else if (it->deadline <= cachedNow) {
+ LOG(1) << "Unable to statisfy read preference " << it->criteria << " by deadline "
+ << it->deadline;
+ it->promise.setError(makeUnsatisfedReadPrefError(it->criteria));
+ waiters.erase(it++);
+ } else if (shouldQuickFail) {
+ LOG(1) << "Unable to statisfy read preference because tests fail quickly";
+ it->promise.setError(makeUnsatisfedReadPrefError(it->criteria));
+ waiters.erase(it++);
+ } else {
+ it++;
+ }
+ }
+
+ if (waiters.empty()) {
+ // No current waiters so we can stop the expedited scanning.
+ isExpedited = false;
+ rescheduleRefresh(SchedulingStrategy::kCancelPreviousScan);
+ }
+}
+
+Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const {
+ return Status(ErrorCodes::FailedToSatisfyReadPreference,
+ str::stream() << "Could not find host matching read preference "
+ << criteria.toString() << " for set " << name);
+}
+
+void SetState::init() {
+ rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan);
+ notifier->onFoundSet(name);
+}
+
+void SetState::drop() {
+ if (std::exchange(isDropped, true)) {
+ // If a SetState calls drop() from destruction after the RSMM calls shutdown(), then the
+ // RSMM's executor may no longer exist. Thus, only drop once.
+ return;
+ }
+
+ currentScan.reset();
+ notify();
+
+ if (auto handle = std::exchange(refresherHandle, {})) {
+ // Cancel our refresh on the way out
+ executor->cancel(handle);
+ }
+
+ for (auto& node : nodes) {
+ if (auto handle = std::exchange(node.scheduledIsMasterHandle, {})) {
+ // Cancel any isMasters we had scheduled
+ executor->cancel(handle);
+ }
+ }
+
+ // No point in notifying if we never started
+ if (workingConnStr.isValid()) {
+ notifier->onDroppedSet(name);
+ }
+}
+
+void SetState::checkInvariants() const {
+ bool foundMaster = false;
+ for (size_t i = 0; i < nodes.size(); i++) {
+ // no empty hosts
+ invariant(!nodes[i].host.empty());
+
+ if (nodes[i].isMaster) {
+ // masters must be up
+ invariant(nodes[i].isUp);
+
+ // at most one master
+ invariant(!foundMaster);
+ foundMaster = true;
+
+ // if we have a master it should be the same as lastSeenMaster
+ invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster);
+ }
+
+ // should never end up with negative latencies
+ invariant(nodes[i].latencyMicros >= 0);
+
+ // nodes must be sorted by host with no-dupes
+ invariant(i == 0 || (nodes[i - 1].host < nodes[i].host));
+ }
+
+ // nodes should be a (non-strict) superset of the seedNodes
+ invariant(std::includes(
+ nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts));
+
+ if (currentScan) {
+ // hostsToScan can't have dups or hosts already in triedHosts.
+ std::set<HostAndPort> cantSee = currentScan->triedHosts;
+ for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin();
+ it != currentScan->hostsToScan.end();
+ ++it) {
+ invariant(!cantSee.count(*it));
+ cantSee.insert(*it); // make sure we don't see this again
+ }
+
+ // We should only be waitingFor hosts that are in triedHosts
+ invariant(std::includes(currentScan->triedHosts.begin(),
+ currentScan->triedHosts.end(),
+ currentScan->waitingFor.begin(),
+ currentScan->waitingFor.end()));
+
+ // We should only have unconfirmedReplies if we haven't found a master yet
+ invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty());
+ }
+}
+
+template <typename Container>
+void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) {
+ invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue.
+
+ // no std::copy_if before c++11
+ for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end;
+ ++it) {
+ if (!triedHosts.count(*it)) {
+ hostsToScan.push_back(*it);
+ }
+ }
+ std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg());
+}
+
+void ScanState::retryAllTriedHosts(PseudoRandom& rand) {
+ invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue.
+ // Move hosts that are in triedHosts but not in waitingFor from triedHosts to hostsToScan.
+ std::set_difference(triedHosts.begin(),
+ triedHosts.end(),
+ waitingFor.begin(),
+ waitingFor.end(),
+ std::inserter(hostsToScan, hostsToScan.end()));
+ std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg());
+ triedHosts = waitingFor;
+}
+
+void ScanState::markHostsToScanAsTried() noexcept {
+ while (!hostsToScan.empty()) {
+ auto host = hostsToScan.front();
+ hostsToScan.pop_front();
+ /**
+ * Mark the popped host as tried to avoid deleting hosts in multiple points.
+ * This emulates the final effect of Refresher::getNextStep() on the set.
+ */
+ triedHosts.insert(host);
+ }
+}
+} // namespace mongo
diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h
new file mode 100644
index 00000000000..efd5f10be14
--- /dev/null
+++ b/src/mongo/client/scanning_replica_set_monitor.h
@@ -0,0 +1,213 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <set>
+#include <string>
+
+#include "mongo/base/string_data.h"
+#include "mongo/client/mongo_uri.h"
+#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class ScanningReplicaSetMonitor : public ReplicaSetMonitor {
+ ScanningReplicaSetMonitor(const ScanningReplicaSetMonitor&) = delete;
+ ScanningReplicaSetMonitor& operator=(const ScanningReplicaSetMonitor&) = delete;
+
+public:
+ class Refresher;
+
+ static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500);
+ static constexpr auto kCheckTimeout = Seconds(5);
+
+ ScanningReplicaSetMonitor(const MongoURI& uri);
+
+ void init() override;
+
+ void drop() override;
+
+ SemiFuture<HostAndPort> getHostOrRefresh(
+ const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait = kDefaultFindHostTimeout) override;
+
+ SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
+ const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait = kDefaultFindHostTimeout) override;
+
+ HostAndPort getMasterOrUassert() override;
+
+ void failedHost(const HostAndPort& host, const Status& status) override;
+
+ bool isPrimary(const HostAndPort& host) const override;
+
+ bool isHostUp(const HostAndPort& host) const override;
+
+ int getMinWireVersion() const override;
+
+ int getMaxWireVersion() const override;
+
+ std::string getName() const override;
+
+ std::string getServerAddress() const override;
+
+ const MongoURI& getOriginalUri() const override;
+
+ bool contains(const HostAndPort& server) const override;
+
+ void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override;
+
+ bool isKnownToHaveGoodPrimary() const override;
+
+ /**
+ * Returns the refresh period that is given to all new SetStates.
+ */
+ static Seconds getDefaultRefreshPeriod();
+
+ //
+ // internal types (defined in scanning_replica_set_monitor_internal.h)
+ //
+
+ struct IsMasterReply;
+ struct ScanState;
+ struct SetState;
+ typedef std::shared_ptr<ScanState> ScanStatePtr;
+ typedef std::shared_ptr<SetState> SetStatePtr;
+
+ /**
+ * Allows tests to set initial conditions and introspect the current state.
+ */
+ explicit ScanningReplicaSetMonitor(const SetStatePtr& initialState);
+ ~ScanningReplicaSetMonitor();
+
+ /**
+ * This is for use in tests using MockReplicaSet to ensure that a full scan completes before
+ * continuing.
+ */
+ void runScanForMockReplicaSet();
+
+private:
+ Future<std::vector<HostAndPort>> _getHostsOrRefresh(const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait);
+ /**
+ * If no scan is in-progress, this function is responsible for setting up a new scan. Otherwise,
+ * does nothing.
+ */
+ static void _ensureScanInProgress(const SetStatePtr&);
+
+ const SetStatePtr _state;
+};
+
+
+/**
+ * Refreshes the local view of a replica set.
+ *
+ * All logic related to choosing the hosts to contact and updating the SetState based on replies
+ * lives in this class. Use of this class should always be guarded by SetState::mutex unless in
+ * single-threaded use by ScanningReplicaSetMonitorTest.
+ */
+class ScanningReplicaSetMonitor::Refresher {
+public:
+ explicit Refresher(const SetStatePtr& setState);
+
+ struct NextStep {
+ enum StepKind {
+ CONTACT_HOST, /// Contact the returned host
+ WAIT, /// Wait on condition variable and try again.
+ DONE, /// No more hosts to contact in this Refresh round
+ };
+
+ explicit NextStep(StepKind step, const HostAndPort& host = HostAndPort())
+ : step(step), host(host) {}
+
+ StepKind step;
+ HostAndPort host;
+ };
+
+ /**
+ * Returns the next step to take.
+ *
+ * By calling this, you promise to call receivedIsMaster or failedHost if the NextStep is
+ * CONTACT_HOST.
+ */
+ NextStep getNextStep();
+
+ /**
+ * Call this if a host returned from getNextStep successfully replied to an isMaster call.
+ * Negative latencyMicros are ignored.
+ */
+ void receivedIsMaster(const HostAndPort& from, int64_t latencyMicros, const BSONObj& reply);
+
+ /**
+ * Call this if a host returned from getNextStep failed to reply to an isMaster call.
+ */
+ void failedHost(const HostAndPort& host, const Status& status);
+
+ /**
+ * Starts a new scan over the hosts in set.
+ */
+ void startNewScan();
+
+ /**
+ * First, checks that the "reply" is not from a stale primary by comparing the electionId of
+ * "reply" to the maxElectionId recorded by the SetState and returns OK status if "reply"
+ * belongs to a non-stale primary. Otherwise returns a failed status.
+ *
+ * The 'from' parameter specifies the node from which the response is received.
+ *
+ * Updates _set and _scan based on set-membership information from a master.
+ * Applies _scan->unconfirmedReplies to confirmed nodes.
+ * Does not update this host's node in _set->nodes.
+ */
+ Status receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply);
+
+ /**
+ * Schedules isMaster requests to all hosts that currently need to be contacted.
+ * Does nothing if requests have already been sent to all known hosts.
+ */
+ void scheduleNetworkRequests();
+
+ void scheduleIsMaster(const HostAndPort& host);
+
+private:
+ // Both pointers are never NULL
+ SetStatePtr _set;
+ ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started.
+};
+
+} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/scanning_replica_set_monitor_internal.h
index 1c02db4f65d..a0ff4ac8e92 100644
--- a/src/mongo/client/replica_set_monitor_internal.h
+++ b/src/mongo/client/scanning_replica_set_monitor_internal.h
@@ -42,7 +42,7 @@
#include <vector>
#include "mongo/client/read_preference.h"
-#include "mongo/client/replica_set_monitor.h"
+#include "mongo/client/scanning_replica_set_monitor.h"
#include "mongo/db/jsobj.h"
#include "mongo/platform/mutex.h"
#include "mongo/platform/random.h"
@@ -51,7 +51,7 @@
namespace mongo {
-struct ReplicaSetMonitor::IsMasterReply {
+struct ScanningReplicaSetMonitor::IsMasterReply {
IsMasterReply() : ok(false) {}
IsMasterReply(const HostAndPort& host, int64_t latencyMicros, const BSONObj& reply)
: ok(false), host(host), latencyMicros(latencyMicros) {
@@ -86,14 +86,15 @@ struct ReplicaSetMonitor::IsMasterReply {
};
/**
- * The SetState is the underlying data object behind both the ReplicaSetMonitor and the Refresher
+ * The SetState is the underlying data object behind both the ScanningReplicaSetMonitor and the
+ * Refresher
*
* Note that the SetState only holds its own lock in init() and drop(). Even those uses can probably
* be offloaded to the RSM eventually. In all other cases, the RSM and RSM::Refresher use the
* SetState lock to synchronize.
*/
-struct ReplicaSetMonitor::SetState
- : public std::enable_shared_from_this<ReplicaSetMonitor::SetState> {
+struct ScanningReplicaSetMonitor::SetState
+ : public std::enable_shared_from_this<ScanningReplicaSetMonitor::SetState> {
SetState(const SetState&) = delete;
SetState& operator=(const SetState&) = delete;
@@ -271,7 +272,7 @@ public:
ConnectionString workingConnStr; // The connection string from our last scan
// For tracking replies
- OID maxElectionId; // largest election id observed by this ReplicaSetMonitor
+ OID maxElectionId; // largest election id observed by this ScanningReplicaSetMonitor
int configVersion = 0; // version number of the replica set config.
// For matching hosts
@@ -288,7 +289,7 @@ public:
Date_t nextScanTime; // The time at which the next scan is scheduled to start
};
-struct ReplicaSetMonitor::ScanState {
+struct ScanningReplicaSetMonitor::ScanState {
ScanState(const ScanState&) = delete;
ScanState& operator=(const ScanState&) = delete;
diff --git a/src/mongo/client/replica_set_monitor_internal_test.cpp b/src/mongo/client/scanning_replica_set_monitor_internal_test.cpp
index 1a90deed113..766558de2f5 100644
--- a/src/mongo/client/replica_set_monitor_internal_test.cpp
+++ b/src/mongo/client/scanning_replica_set_monitor_internal_test.cpp
@@ -29,7 +29,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/client/replica_set_monitor_test_fixture.h"
+#include "mongo/client/scanning_replica_set_monitor_test_fixture.h"
#include "mongo/client/mongo_uri.h"
@@ -37,7 +37,7 @@ namespace mongo {
namespace {
// -- SetState Construction --
-using InitialStateTest = ReplicaSetMonitorTest;
+using InitialStateTest = ScanningReplicaSetMonitorTest;
TEST_F(InitialStateTest, InitialStateMongoURI) {
auto uri = MongoURI::parse("mongodb://a,b,c/?replicaSet=name");
@@ -58,7 +58,7 @@ TEST_F(InitialStateTest, InitialStateMongoURI) {
}
// -- Node operations --
-class NodeTest : public ReplicaSetMonitorTest {
+class NodeTest : public ScanningReplicaSetMonitorTest {
public:
bool isCompatible(const Node& node, ReadPreference pref, const TagSet& tagSet) {
auto connStr = ConnectionString::forReplicaSet(kSetName, {node.host});
@@ -356,7 +356,7 @@ TEST_F(NodeTest, SecNodeNotCompatibleMultiTag) {
}
// -- IsMasterReply operations --
-using IsMasterReplyTest = ReplicaSetMonitorTest;
+using IsMasterReplyTest = ScanningReplicaSetMonitorTest;
TEST_F(IsMasterReplyTest, IsMasterBadParse) {
BSONObj ismaster = BSON("hosts" << BSON_ARRAY("mongo.example:badport"));
IsMasterReply imr(HostAndPort("mongo.example:27017"), -1, ismaster);
diff --git a/src/mongo/client/replica_set_monitor_read_preference_test.cpp b/src/mongo/client/scanning_replica_set_monitor_read_preference_test.cpp
index fa702580389..5fafe64071d 100644
--- a/src/mongo/client/replica_set_monitor_read_preference_test.cpp
+++ b/src/mongo/client/scanning_replica_set_monitor_read_preference_test.cpp
@@ -29,7 +29,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/client/replica_set_monitor_test_fixture.h"
+#include "mongo/client/scanning_replica_set_monitor_test_fixture.h"
#include <memory>
@@ -39,7 +39,7 @@
namespace mongo {
namespace {
-class ReadPrefTest : public ReplicaSetMonitorTest {
+class ReadPrefTest : public ScanningReplicaSetMonitorTest {
public:
ReadPrefTest() = default;
virtual ~ReadPrefTest() = default;
diff --git a/src/mongo/client/replica_set_monitor_scan_test.cpp b/src/mongo/client/scanning_replica_set_monitor_scan_test.cpp
index 2bc894f27ba..451b20af3e0 100644
--- a/src/mongo/client/replica_set_monitor_scan_test.cpp
+++ b/src/mongo/client/scanning_replica_set_monitor_scan_test.cpp
@@ -31,7 +31,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/client/replica_set_monitor_test_fixture.h"
+#include "mongo/client/scanning_replica_set_monitor_test_fixture.h"
#include "mongo/client/mongo_uri.h"
#include "mongo/logv2/log.h"
@@ -39,7 +39,7 @@
namespace mongo {
namespace {
-using CoreScanTest = ReplicaSetMonitorTest;
+using CoreScanTest = ScanningReplicaSetMonitorTest;
TEST_F(CoreScanTest, CheckAllSeedsSerial) {
auto state = makeState(basicUri);
@@ -568,7 +568,7 @@ TEST_F(CoreScanTest, GetMatchingDuringScan) {
// Ensure nothing breaks when out-of-band failedHost is called during scan
TEST_F(CoreScanTest, OutOfBandFailedHost) {
auto state = makeState(basicUri);
- ReplicaSetMonitorPtr rsm = std::make_shared<ReplicaSetMonitor>(state);
+ auto rsm = std::make_shared<ScanningReplicaSetMonitor>(state);
Refresher refresher(state);
for (size_t i = 0; i != basicSeeds.size(); ++i) {
@@ -802,14 +802,14 @@ TEST_F(CoreScanTest, StalePrimaryWithObsoleteElectionId) {
TEST_F(CoreScanTest, NoPrimaryUpCheck) {
auto state = makeState(basicUri);
- ReplicaSetMonitor rsm(state);
+ ScanningReplicaSetMonitor rsm(state);
ASSERT_FALSE(rsm.isKnownToHaveGoodPrimary());
}
TEST_F(CoreScanTest, PrimaryIsUpCheck) {
auto state = makeState(basicUri);
state->nodes.front().isMaster = true;
- ReplicaSetMonitor rsm(state);
+ ScanningReplicaSetMonitor rsm(state);
ASSERT_TRUE(rsm.isKnownToHaveGoodPrimary());
}
@@ -901,7 +901,7 @@ TEST_F(CoreScanTest, TwoPrimaries2ndHasOlderConfigVersion) {
ASSERT_EQUALS(state->configVersion, 2);
}
-using MaxStalenessMSTest = ReplicaSetMonitorTest;
+using MaxStalenessMSTest = ScanningReplicaSetMonitorTest;
/**
* Success finding node matching maxStalenessMS parameter
@@ -1289,7 +1289,7 @@ TEST_F(MaxStalenessMSTest, MaxStalenessMSZeroNoLastWrite) {
ASSERT(!state->getMatchingHost(secondary).empty());
}
-using MinOpTimeTest = ReplicaSetMonitorTest;
+using MinOpTimeTest = ScanningReplicaSetMonitorTest;
/**
* Success matching minOpTime
*/
@@ -1454,7 +1454,7 @@ public:
State lastState;
};
-class ChangeNotifierTest : public ReplicaSetMonitorTest {
+class ChangeNotifierTest : public ScanningReplicaSetMonitorTest {
public:
ChangeNotifierTest() = default;
virtual ~ChangeNotifierTest() = default;
diff --git a/src/mongo/client/replica_set_monitor_test_concurrent.cpp b/src/mongo/client/scanning_replica_set_monitor_test_concurrent.cpp
index 358dcc29cb7..1d10a4805ee 100644
--- a/src/mongo/client/replica_set_monitor_test_concurrent.cpp
+++ b/src/mongo/client/scanning_replica_set_monitor_test_concurrent.cpp
@@ -32,7 +32,7 @@
#include "mongo/platform/basic.h"
#include "mongo/client/replica_set_monitor.h"
-#include "mongo/client/replica_set_monitor_internal.h"
+#include "mongo/client/scanning_replica_set_monitor_internal.h"
#include "mongo/dbtests/mock/mock_replica_set.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/thread_pool_mock.h"
@@ -51,7 +51,7 @@ using executor::RemoteCommandResponse;
using executor::ThreadPoolExecutorTest;
using InNetworkGuard = NetworkInterfaceMock::InNetworkGuard;
using NetworkOperationIterator = NetworkInterfaceMock::NetworkOperationIterator;
-using StepKind = ReplicaSetMonitor::Refresher::NextStep::StepKind;
+using StepKind = ScanningReplicaSetMonitor::Refresher::NextStep::StepKind;
class ReplicaSetMonitorConcurrentTest : public ThreadPoolExecutorTest {
protected:
@@ -178,9 +178,9 @@ TEST_F(ReplicaSetMonitorConcurrentTest, RechecksAvailableNodesUntilExpiration) {
MockReplicaSet replSet("test", 2, false /* hasPrimary */, false /* dollarPrefixHosts */);
const auto node0 = HostAndPort(replSet.getSecondaries()[0]);
const auto node1 = HostAndPort(replSet.getSecondaries()[1]);
- auto state = std::make_shared<ReplicaSetMonitor::SetState>(
+ auto state = std::make_shared<ScanningReplicaSetMonitor::SetState>(
replSet.getURI(), &getNotifier(), &getExecutor());
- auto monitor = std::make_shared<ReplicaSetMonitor>(state);
+ auto monitor = std::make_shared<ScanningReplicaSetMonitor>(state);
// Node 1 is unresponsive.
replSet.kill(replSet.getSecondaries()[1]);
@@ -245,9 +245,9 @@ TEST_F(ReplicaSetMonitorConcurrentTest, StepdownAndElection) {
const auto node0 = HostAndPort(replSet.getSecondaries()[0]);
const auto node1 = HostAndPort(replSet.getSecondaries()[1]);
const auto node2 = HostAndPort(replSet.getSecondaries()[2]);
- auto state = std::make_shared<ReplicaSetMonitor::SetState>(
+ auto state = std::make_shared<ScanningReplicaSetMonitor::SetState>(
replSet.getURI(), &getNotifier(), &getExecutor());
- auto monitor = std::make_shared<ReplicaSetMonitor>(state);
+ auto monitor = std::make_shared<ScanningReplicaSetMonitor>(state);
// Node 2 is unresponsive.
replSet.kill(replSet.getSecondaries()[2]);
@@ -328,9 +328,9 @@ TEST_F(ReplicaSetMonitorConcurrentTest, IsMasterFrequency) {
const auto node0 = HostAndPort(replSet.getSecondaries()[0]);
const auto node1 = HostAndPort(replSet.getSecondaries()[1]);
- auto state = std::make_shared<ReplicaSetMonitor::SetState>(
+ auto state = std::make_shared<ScanningReplicaSetMonitor::SetState>(
replSet.getURI(), &getNotifier(), &getExecutor());
- auto monitor = std::make_shared<ReplicaSetMonitor>(state);
+ auto monitor = std::make_shared<ScanningReplicaSetMonitor>(state);
// Node 1 is unresponsive.
replSet.kill(replSet.getSecondaries()[1]);
@@ -389,9 +389,9 @@ TEST_F(ReplicaSetMonitorConcurrentTest, RecheckUntilTimeout) {
const auto node0 = HostAndPort(replSet.getSecondaries()[0]);
const auto node1 = HostAndPort(replSet.getSecondaries()[1]);
- auto state = std::make_shared<ReplicaSetMonitor::SetState>(
+ auto state = std::make_shared<ScanningReplicaSetMonitor::SetState>(
replSet.getURI(), &getNotifier(), &getExecutor());
- auto monitor = std::make_shared<ReplicaSetMonitor>(state);
+ auto monitor = std::make_shared<ScanningReplicaSetMonitor>(state);
// Node 1 is unresponsive.
replSet.kill(replSet.getSecondaries()[1]);
@@ -414,11 +414,12 @@ TEST_F(ReplicaSetMonitorConcurrentTest, RecheckUntilTimeout) {
// Every 500ms, the monitor rechecks node 0 after the previous successful isMaster.
// Every 5s, the monitor rechecks node 1 after the previous isMaster experiences timeout.
- constexpr auto kTimeoutPeriodMS = Milliseconds(ReplicaSetMonitor::kCheckTimeout).count() +
- ReplicaSetMonitor::kExpeditedRefreshPeriod.count();
+ constexpr auto kTimeoutPeriodMS =
+ Milliseconds(ScanningReplicaSetMonitor::kCheckTimeout).count() +
+ ScanningReplicaSetMonitor::kExpeditedRefreshPeriod.count();
checkUntil(Milliseconds(14500), [&]() {
ASSERT_EQ(getNumChecks(node0),
- elapsedMS() / ReplicaSetMonitor::kExpeditedRefreshPeriod.count() + 1);
+ elapsedMS() / ScanningReplicaSetMonitor::kExpeditedRefreshPeriod.count() + 1);
ASSERT_EQ(getNumChecks(node1), elapsedMS() / kTimeoutPeriodMS + 1);
ASSERT(!hostFuture.isReady());
});
diff --git a/src/mongo/client/replica_set_monitor_test_fixture.cpp b/src/mongo/client/scanning_replica_set_monitor_test_fixture.cpp
index 4f0e158fa99..fa7f13cccc8 100644
--- a/src/mongo/client/replica_set_monitor_test_fixture.cpp
+++ b/src/mongo/client/scanning_replica_set_monitor_test_fixture.cpp
@@ -29,13 +29,13 @@
#include "mongo/platform/basic.h"
-#include "mongo/client/replica_set_monitor_test_fixture.h"
+#include "mongo/client/scanning_replica_set_monitor_test_fixture.h"
namespace mongo {
-const std::vector<HostAndPort> ReplicaSetMonitorTest::basicSeeds = {
+const std::vector<HostAndPort> ScanningReplicaSetMonitorTest::basicSeeds = {
HostAndPort("a"), HostAndPort("b"), HostAndPort("c")};
-const std::set<HostAndPort> ReplicaSetMonitorTest::basicSeedsSet = {std::begin(basicSeeds),
- std::end(basicSeeds)};
-const MongoURI ReplicaSetMonitorTest::basicUri(ConnectionString::forReplicaSet(kSetName,
- basicSeeds));
+const std::set<HostAndPort> ScanningReplicaSetMonitorTest::basicSeedsSet = {std::begin(basicSeeds),
+ std::end(basicSeeds)};
+const MongoURI ScanningReplicaSetMonitorTest::basicUri(ConnectionString::forReplicaSet(kSetName,
+ basicSeeds));
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_test_fixture.h b/src/mongo/client/scanning_replica_set_monitor_test_fixture.h
index 7e842008453..a48584ad362 100644
--- a/src/mongo/client/replica_set_monitor_test_fixture.h
+++ b/src/mongo/client/scanning_replica_set_monitor_test_fixture.h
@@ -34,8 +34,8 @@
#include <vector>
#include "mongo/client/replica_set_change_notifier.h"
-#include "mongo/client/replica_set_monitor.h"
-#include "mongo/client/replica_set_monitor_internal.h"
+#include "mongo/client/scanning_replica_set_monitor.h"
+#include "mongo/client/scanning_replica_set_monitor_internal.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -44,21 +44,21 @@ namespace mongo {
// current (only) thread, so they do not lock SetState::mutex before examining state. This is
// NOT something that non-test code should do.
-class ReplicaSetMonitorTest : public unittest::Test {
+class ScanningReplicaSetMonitorTest : public unittest::Test {
public:
// Pull in nested types
- using SetState = ReplicaSetMonitor::SetState;
+ using SetState = ScanningReplicaSetMonitor::SetState;
using Node = SetState::Node;
- using IsMasterReply = ReplicaSetMonitor::IsMasterReply;
+ using IsMasterReply = ScanningReplicaSetMonitor::IsMasterReply;
- using Refresher = ReplicaSetMonitor::Refresher;
+ using Refresher = ScanningReplicaSetMonitor::Refresher;
using NextStep = Refresher::NextStep;
static constexpr StringData kSetName = "name"_sd;
- ReplicaSetMonitorTest() = default;
- virtual ~ReplicaSetMonitorTest() = default;
+ ScanningReplicaSetMonitorTest() = default;
+ virtual ~ScanningReplicaSetMonitorTest() = default;
template <typename... Args>
using StateIsConstructible =
@@ -69,7 +69,7 @@ public:
template <typename... Args, typename = StateIsConstructible<Args...>>
auto makeState(Args&&... args) {
- return std::make_shared<ReplicaSetMonitor::SetState>(
+ return std::make_shared<ScanningReplicaSetMonitor::SetState>(
std::forward<Args>(args)..., &_notifier, nullptr);
}
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
new file mode 100644
index 00000000000..0056a98c567
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -0,0 +1,108 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/streamable_replica_set_monitor.h"
+
+#include <functional>
+#include <memory>
+#include <set>
+#include <string>
+
+#include "mongo/client/mongo_uri.h"
+#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri) {}
+void StreamableReplicaSetMonitor::init() {}
+
+void StreamableReplicaSetMonitor::drop() {}
+
+SemiFuture<HostAndPort> StreamableReplicaSetMonitor::getHostOrRefresh(
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
+ MONGO_UNREACHABLE;
+}
+
+SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefresh(
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait) {
+ MONGO_UNREACHABLE;
+}
+
+HostAndPort StreamableReplicaSetMonitor::getMasterOrUassert() {
+ MONGO_UNREACHABLE;
+}
+void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {}
+bool StreamableReplicaSetMonitor::isPrimary(const HostAndPort& host) const {
+ MONGO_UNREACHABLE;
+}
+
+bool StreamableReplicaSetMonitor::isHostUp(const HostAndPort& host) const {
+ MONGO_UNREACHABLE;
+}
+
+int StreamableReplicaSetMonitor::getMinWireVersion() const {
+ MONGO_UNREACHABLE;
+}
+
+int StreamableReplicaSetMonitor::getMaxWireVersion() const {
+ MONGO_UNREACHABLE;
+}
+
+std::string StreamableReplicaSetMonitor::getName() const {
+ MONGO_UNREACHABLE;
+}
+
+std::string StreamableReplicaSetMonitor::getServerAddress() const {
+ MONGO_UNREACHABLE;
+}
+
+const MongoURI& StreamableReplicaSetMonitor::getOriginalUri() const {
+ MONGO_UNREACHABLE;
+};
+bool StreamableReplicaSetMonitor::contains(const HostAndPort& server) const {
+ MONGO_UNREACHABLE;
+}
+
+void StreamableReplicaSetMonitor::appendInfo(BSONObjBuilder& b, bool forFTDC) const {
+ MONGO_UNREACHABLE;
+}
+
+bool StreamableReplicaSetMonitor::isKnownToHaveGoodPrimary() const {
+ MONGO_UNREACHABLE;
+}
+
+} // namespace mongo
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
new file mode 100644
index 00000000000..8c27301660e
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -0,0 +1,93 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <set>
+#include <string>
+
+#include "mongo/base/string_data.h"
+#include "mongo/client/mongo_uri.h"
+#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class StreamableReplicaSetMonitor : public ReplicaSetMonitor {
+ StreamableReplicaSetMonitor(const StreamableReplicaSetMonitor&) = delete;
+ StreamableReplicaSetMonitor& operator=(const StreamableReplicaSetMonitor&) = delete;
+
+public:
+ StreamableReplicaSetMonitor(const MongoURI& uri);
+
+ void init() override;
+
+ void drop() override;
+
+ SemiFuture<HostAndPort> getHostOrRefresh(
+ const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait = kDefaultFindHostTimeout) override;
+
+ SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
+ const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait = kDefaultFindHostTimeout) override;
+
+ HostAndPort getMasterOrUassert() override;
+
+ void failedHost(const HostAndPort& host, const Status& status) override;
+
+ bool isPrimary(const HostAndPort& host) const override;
+
+ bool isHostUp(const HostAndPort& host) const override;
+
+ int getMinWireVersion() const override;
+
+ int getMaxWireVersion() const override;
+
+ std::string getName() const override;
+
+ std::string getServerAddress() const override;
+
+ const MongoURI& getOriginalUri() const override;
+
+ bool contains(const HostAndPort& server) const override;
+
+ void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override;
+
+ bool isKnownToHaveGoodPrimary() const override;
+};
+
+} // namespace mongo
diff --git a/src/mongo/dbtests/replica_set_monitor_test.cpp b/src/mongo/dbtests/replica_set_monitor_test.cpp
index 7e1d403834b..14571926693 100644
--- a/src/mongo/dbtests/replica_set_monitor_test.cpp
+++ b/src/mongo/dbtests/replica_set_monitor_test.cpp
@@ -36,7 +36,7 @@
#include "mongo/client/connpool.h"
#include "mongo/client/dbclient_rs.h"
#include "mongo/client/replica_set_monitor.h"
-#include "mongo/client/replica_set_monitor_internal.h"
+#include "mongo/client/scanning_replica_set_monitor_internal.h"
#include "mongo/dbtests/mock/mock_conn_registry.h"
#include "mongo/dbtests/mock/mock_replica_set.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp
index 83360c2f958..d02ef15adc4 100644
--- a/src/mongo/s/client/parallel.cpp
+++ b/src/mongo/s/client/parallel.cpp
@@ -362,7 +362,7 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
if (allowShardVersionFailure) {
const DBClientReplicaSet* replConn = dynamic_cast<const DBClientReplicaSet*>(rawConn);
invariant(replConn);
- ReplicaSetMonitorPtr rsMonitor = ReplicaSetMonitor::get(replConn->getSetName());
+ auto rsMonitor = ReplicaSetMonitor::get(replConn->getSetName());
uassert(16388,
str::stream() << "cannot access unknown replica set: " << replConn->getSetName(),
rsMonitor != nullptr);
diff --git a/src/mongo/shell/shell_utils.cpp b/src/mongo/shell/shell_utils.cpp
index 28ebda1afaa..3979d5b2010 100644
--- a/src/mongo/shell/shell_utils.cpp
+++ b/src/mongo/shell/shell_utils.cpp
@@ -413,7 +413,7 @@ BSONObj replMonitorStats(const BSONObj& a, void* data) {
a.nFields() == 1 && a.firstElement().type() == String);
auto name = a.firstElement().valuestrsafe();
- ReplicaSetMonitorPtr rsm = ReplicaSetMonitor::get(name);
+ auto rsm = ReplicaSetMonitor::get(name);
if (!rsm) {
return BSON(""
<< "no ReplSetMonitor exists by that name");