summaryrefslogtreecommitdiff
path: root/src/mongo/client
diff options
context:
space:
mode:
authorDavis Haupt <davis.haupt@mongodb.com>2022-01-19 19:41:56 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-01-19 20:17:16 +0000
commitac9d5b9ea14af74e9b4ba3bad952cf4442fa4b93 (patch)
tree560e6f5b2dd5662999b639bb02b8540ab9a14e7b /src/mongo/client
parentd20d317c08fd48b71f790c1bb013aa61c6f0f4b7 (diff)
downloadmongo-ac9d5b9ea14af74e9b4ba3bad952cf4442fa4b93.tar.gz
SERVER-62079 removes rsm scanning
Diffstat (limited to 'src/mongo/client')
-rw-r--r--src/mongo/client/SConscript1
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp64
-rw-r--r--src/mongo/client/replica_set_monitor_server_parameters.cpp8
-rw-r--r--src/mongo/client/replica_set_monitor_server_parameters.h2
-rw-r--r--src/mongo/client/replica_set_monitor_server_parameters_test.cpp19
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.cpp1591
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.h257
-rw-r--r--src/mongo/client/scanning_replica_set_monitor_internal.h337
8 files changed, 30 insertions, 2249 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 0f8a4903bfa..0b1f03cc2e1 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -220,7 +220,6 @@ clientDriverEnv.Library(
'replica_set_monitor.cpp',
'replica_set_monitor_manager.cpp',
'replica_set_monitor_stats.cpp',
- 'scanning_replica_set_monitor.cpp',
'streamable_replica_set_monitor.cpp',
'streamable_replica_set_monitor_query_processor.cpp',
'streamable_replica_set_monitor_error_handler.cpp',
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index 3919437676a..f48832015cd 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -40,7 +40,6 @@
#include "mongo/client/mongo_uri.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/client/replica_set_monitor_server_parameters.h"
-#include "mongo/client/scanning_replica_set_monitor.h"
#include "mongo/client/streamable_replica_set_monitor.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface_factory.h"
@@ -75,34 +74,31 @@ Status ReplicaSetMonitorManagerNetworkConnectionHook::validateHost(
const HostAndPort& remoteHost,
const BSONObj& isMasterRequest,
const executor::RemoteCommandResponse& isMasterReply) {
- if (gReplicaSetMonitorProtocol != ReplicaSetMonitorProtocol::kScanning) {
- auto monitor = ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost);
- if (!monitor) {
- return Status::OK();
- }
+ auto monitor = ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost);
+ if (!monitor) {
+ return Status::OK();
+ }
- if (std::shared_ptr<StreamableReplicaSetMonitor> streamableMonitor =
- std::dynamic_pointer_cast<StreamableReplicaSetMonitor>(
- ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost))) {
-
- auto publisher = streamableMonitor->getEventsPublisher();
- if (publisher) {
- try {
- if (isMasterReply.status.isOK()) {
- publisher->onServerHandshakeCompleteEvent(
- *isMasterReply.elapsed, remoteHost, isMasterReply.data);
- } else {
- publisher->onServerHandshakeFailedEvent(
- remoteHost, isMasterReply.status, isMasterReply.data);
- }
- } catch (const DBException& exception) {
- LOGV2_ERROR(4712101,
- "An error occurred publishing a ReplicaSetMonitor handshake event",
- "error"_attr = exception.toStatus(),
- "replicaSet"_attr = monitor->getName(),
- "handshakeStatus"_attr = isMasterReply.status);
- return exception.toStatus();
+ if (auto streamableMonitor = std::dynamic_pointer_cast<StreamableReplicaSetMonitor>(
+ ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost))) {
+
+ auto publisher = streamableMonitor->getEventsPublisher();
+ if (publisher) {
+ try {
+ if (isMasterReply.status.isOK()) {
+ publisher->onServerHandshakeCompleteEvent(
+ *isMasterReply.elapsed, remoteHost, isMasterReply.data);
+ } else {
+ publisher->onServerHandshakeFailedEvent(
+ remoteHost, isMasterReply.status, isMasterReply.data);
}
+ } catch (const DBException& exception) {
+ LOGV2_ERROR(4712101,
+ "An error occurred publishing a ReplicaSetMonitor handshake event",
+ "error"_attr = exception.toStatus(),
+ "replicaSet"_attr = monitor->getName(),
+ "handshakeStatus"_attr = isMasterReply.status);
+ return exception.toStatus();
}
}
}
@@ -197,16 +193,10 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
"protocol"_attr = toString(gReplicaSetMonitorProtocol),
"uri"_attr = uri.toString());
invariant(_taskExecutor);
- if (gReplicaSetMonitorProtocol == ReplicaSetMonitorProtocol::kScanning) {
- newMonitor =
- std::make_shared<ScanningReplicaSetMonitor>(uri, _taskExecutor, cleanupCallback);
- newMonitor->init();
- } else {
- // Both ReplicaSetMonitorProtocol::kSdam and ReplicaSetMonitorProtocol::kStreamable use the
- // StreamableReplicaSetMonitor.
- newMonitor = StreamableReplicaSetMonitor::make(
- uri, _taskExecutor, _getConnectionManager(), cleanupCallback, _stats);
- }
+
+ newMonitor = StreamableReplicaSetMonitor::make(
+ uri, _taskExecutor, _getConnectionManager(), cleanupCallback, _stats);
+
_monitors[setName] = newMonitor;
_numMonitorsCreated++;
return newMonitor;
diff --git a/src/mongo/client/replica_set_monitor_server_parameters.cpp b/src/mongo/client/replica_set_monitor_server_parameters.cpp
index 91ab6b5cc83..4d0c1dd4e0e 100644
--- a/src/mongo/client/replica_set_monitor_server_parameters.cpp
+++ b/src/mongo/client/replica_set_monitor_server_parameters.cpp
@@ -38,9 +38,7 @@ namespace mongo {
ReplicaSetMonitorProtocol gReplicaSetMonitorProtocol{ReplicaSetMonitorProtocol::kStreamable};
std::string toString(ReplicaSetMonitorProtocol protocol) {
- if (protocol == ReplicaSetMonitorProtocol::kScanning) {
- return "scanning";
- } else if (protocol == ReplicaSetMonitorProtocol::kStreamable) {
+ if (protocol == ReplicaSetMonitorProtocol::kStreamable) {
return "streamable";
} else {
return "sdam";
@@ -54,9 +52,7 @@ void RSMProtocolServerParameter::append(OperationContext*,
}
Status RSMProtocolServerParameter::setFromString(const std::string& protocolStr) {
- if (protocolStr == toString(ReplicaSetMonitorProtocol::kScanning)) {
- gReplicaSetMonitorProtocol = ReplicaSetMonitorProtocol::kScanning;
- } else if (protocolStr == toString(ReplicaSetMonitorProtocol::kStreamable)) {
+ if (protocolStr == toString(ReplicaSetMonitorProtocol::kStreamable)) {
gReplicaSetMonitorProtocol = ReplicaSetMonitorProtocol::kStreamable;
} else if (protocolStr == toString(ReplicaSetMonitorProtocol::kSdam)) {
gReplicaSetMonitorProtocol = ReplicaSetMonitorProtocol::kSdam;
diff --git a/src/mongo/client/replica_set_monitor_server_parameters.h b/src/mongo/client/replica_set_monitor_server_parameters.h
index 0a900b7ddf9..40bcf3f7016 100644
--- a/src/mongo/client/replica_set_monitor_server_parameters.h
+++ b/src/mongo/client/replica_set_monitor_server_parameters.h
@@ -35,7 +35,7 @@
namespace mongo {
-enum class ReplicaSetMonitorProtocol { kScanning, kSdam, kStreamable };
+enum class ReplicaSetMonitorProtocol { kSdam, kStreamable };
extern ReplicaSetMonitorProtocol gReplicaSetMonitorProtocol;
std::string toString(ReplicaSetMonitorProtocol protocol);
diff --git a/src/mongo/client/replica_set_monitor_server_parameters_test.cpp b/src/mongo/client/replica_set_monitor_server_parameters_test.cpp
index d52fb788094..7a994cf3f95 100644
--- a/src/mongo/client/replica_set_monitor_server_parameters_test.cpp
+++ b/src/mongo/client/replica_set_monitor_server_parameters_test.cpp
@@ -32,7 +32,6 @@
#include "mongo/client/replica_set_monitor.h"
#include "mongo/client/replica_set_monitor_protocol_test_util.h"
#include "mongo/client/replica_set_monitor_server_parameters.h"
-#include "mongo/client/scanning_replica_set_monitor.h"
#include "mongo/client/streamable_replica_set_monitor.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -77,23 +76,5 @@ TEST_F(ReplicaSetMonitorProtocolTest, checkRSMProtocolParamSdam) {
}
#endif
-/**
- * Checks that a ScanningReplicaSetMonitor is created when the replicaSetMonitorProtocol server
- * parameter is set to 'scanning'.
- */
-TEST_F(ReplicaSetMonitorProtocolTest, checkRSMProtocolParamScanning) {
- ReplicaSetMonitorProtocolTestUtil::setRSMProtocol(ReplicaSetMonitorProtocol::kScanning);
- auto uri = MongoURI::parse("mongodb:a,b,c/?replicaSet=name");
- ASSERT_OK(uri.getStatus());
- auto createdMonitor = ReplicaSetMonitor::createIfNeeded(uri.getValue());
-
- // If the created monitor does not point to a ScanningReplicaSetMonitor, the cast returns a
- // nullptr.
- auto scanningMonitorCast = dynamic_cast<ScanningReplicaSetMonitor*>(createdMonitor.get());
- ASSERT(scanningMonitorCast);
-
- auto streamableMonitorCast = dynamic_cast<StreamableReplicaSetMonitor*>(createdMonitor.get());
- ASSERT_FALSE(streamableMonitorCast);
-}
} // namespace
} // namespace mongo
diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp
deleted file mode 100644
index b72cf67ab59..00000000000
--- a/src/mongo/client/scanning_replica_set_monitor.cpp
+++ /dev/null
@@ -1,1591 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/client/scanning_replica_set_monitor.h"
-
-#include <algorithm>
-#include <limits>
-#include <random>
-
-#include "mongo/bson/simple_bsonelement_comparator.h"
-#include "mongo/client/connpool.h"
-#include "mongo/client/global_conn_pool.h"
-#include "mongo/client/read_preference.h"
-#include "mongo/client/scanning_replica_set_monitor_internal.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/bson_extract_optime.h"
-#include "mongo/db/server_options.h"
-#include "mongo/logv2/log.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/util/background.h"
-#include "mongo/util/debug_util.h"
-#include "mongo/util/exit.h"
-#include "mongo/util/fail_point.h"
-#include "mongo/util/string_map.h"
-#include "mongo/util/timer.h"
-
-namespace mongo {
-
-using std::numeric_limits;
-using std::set;
-using std::shared_ptr;
-using std::string;
-using std::vector;
-
-namespace {
-MONGO_FAIL_POINT_DEFINE(scanningServerSelectorIgnoreLatencyWindow);
-
-// Pull nested types to top-level scope
-typedef ScanningReplicaSetMonitor::IsMasterReply IsMasterReply;
-typedef ScanningReplicaSetMonitor::ScanState ScanState;
-typedef ScanningReplicaSetMonitor::ScanStatePtr ScanStatePtr;
-typedef ScanningReplicaSetMonitor::SetState SetState;
-typedef ScanningReplicaSetMonitor::SetStatePtr SetStatePtr;
-typedef ScanningReplicaSetMonitor::Refresher Refresher;
-typedef ScanState::UnconfirmedReplies UnconfirmedReplies;
-typedef SetState::Node Node;
-typedef SetState::Nodes Nodes;
-using executor::TaskExecutor;
-using CallbackArgs = TaskExecutor::CallbackArgs;
-using CallbackHandle = TaskExecutor::CallbackHandle;
-
-// Intentionally chosen to compare worse than all known latencies.
-const int64_t unknownLatency = numeric_limits<int64_t>::max();
-
-const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet());
-
-//
-// Helpers for stl algorithms
-//
-
-bool isMaster(const Node& node) {
- return node.isMaster;
-}
-
-bool opTimeGreater(const Node* lhs, const Node* rhs) {
- return lhs->opTime > rhs->opTime;
-}
-
-bool compareLatencies(const Node* lhs, const Node* rhs) {
- // NOTE: this automatically compares Node::unknownLatency worse than all others.
- return lhs->latencyMicros < rhs->latencyMicros;
-}
-
-bool hostsEqual(const Node& lhs, const HostAndPort& rhs) {
- return lhs.host == rhs;
-}
-
-// Allows comparing two Nodes, or a HostAndPort and a Node.
-// NOTE: the two HostAndPort overload is only needed to support extra checks in some STL
-// implementations. For simplicity, no comparator should be used with collections of just
-// HostAndPort.
-struct CompareHosts {
- bool operator()(const Node& lhs, const Node& rhs) {
- return lhs.host < rhs.host;
- }
- bool operator()(const Node& lhs, const HostAndPort& rhs) {
- return lhs.host < rhs;
- }
- bool operator()(const HostAndPort& lhs, const Node& rhs) {
- return lhs < rhs.host;
- }
- bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) {
- return lhs < rhs;
- }
-} compareHosts; // like an overloaded function, but able to pass to stl algorithms
-
-// The following structs should be treated as functions returning a UnaryPredicate.
-// Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost));
-// They all hold their constructor argument by reference.
-
-struct HostIs {
- explicit HostIs(const HostAndPort& host) : _host(host) {}
- bool operator()(const HostAndPort& host) {
- return host == _host;
- }
- bool operator()(const Node& node) {
- return node.host == _host;
- }
- const HostAndPort& _host;
-};
-
-struct HostNotIn {
- explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {}
- bool operator()(const HostAndPort& host) {
- return !_hosts.count(host);
- }
- bool operator()(const Node& node) {
- return !_hosts.count(node.host);
- }
- const std::set<HostAndPort>& _hosts;
-};
-
-int32_t pingTimeMillis(const Node& node) {
- auto latencyMillis = node.latencyMicros / 1000;
- if (latencyMillis > numeric_limits<int32_t>::max()) {
- // In particular, Node::unknownLatency does not fit in an int32.
- return numeric_limits<int32_t>::max();
- }
- return latencyMillis;
-}
-
-/**
- * Replica set refresh period on the task executor.
- */
-const Seconds kDefaultRefreshPeriod(30);
-} // namespace
-
-// Defaults to random selection as required by the spec
-bool ScanningReplicaSetMonitor::useDeterministicHostSelection = false;
-
-ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const SetStatePtr& initialState,
- std::function<void()> cleanupCallback)
- : ReplicaSetMonitor(cleanupCallback), _state(initialState) {}
-
-ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const MongoURI& uri,
- std::shared_ptr<TaskExecutor> executor,
- std::function<void()> cleanupCallback)
- : ScanningReplicaSetMonitor(
- std::make_shared<SetState>(
- uri, &ReplicaSetMonitorManager::get()->getNotifier(), executor.get()),
- cleanupCallback) {}
-
-void ScanningReplicaSetMonitor::init() {
- if (areRefreshRetriesDisabledForTest()) {
- // This is for MockReplicaSet. Those tests want to control when scanning happens.
- LOGV2_WARNING(24088,
- "*** Not starting background refresh because refresh retries are disabled.");
- return;
- }
-
- {
- stdx::lock_guard lk(_state->mutex);
- _state->init();
- }
-}
-
-void ScanningReplicaSetMonitor::drop() {
- {
- stdx::lock_guard lk(_state->mutex);
- _state->drop();
- }
-}
-
-ScanningReplicaSetMonitor::~ScanningReplicaSetMonitor() {
- // `drop` is idempotent and a duplicate call from ReplicaSetMonitorManager::removeMonitor() is
- // safe.
- drop();
-}
-
-template <typename Callback>
-auto ScanningReplicaSetMonitor::SetState::scheduleWorkAt(Date_t when, Callback&& cb) const {
- auto wrappedCallback = [cb = std::forward<Callback>(cb),
- anchor = shared_from_this()](const CallbackArgs& cbArgs) mutable {
- if (ErrorCodes::isCancellationError(cbArgs.status)) {
- // Do no more work if we're removed or canceled
- return;
- }
- invariant(cbArgs.status);
-
- stdx::lock_guard lk(anchor->mutex);
- if (anchor->isDropped) {
- return;
- }
-
- cb(cbArgs);
- };
- return executor->scheduleWorkAt(std::move(when), std::move(wrappedCallback));
-}
-
-void ScanningReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy strategy) {
- // Reschedule the refresh
-
- if (!executor || isMocked) {
- // Without an executor, we can't do refreshes -- we're in a test
- return;
- }
-
- if (isDropped) { // already removed so no need to refresh
- LOGV2_DEBUG(24070,
- 1,
- "Stopping refresh for replica set {replicaSet} because it's removed",
- "Stopping refresh for replica set because it's removed",
- "replicaSet"_attr = name);
- return;
- }
-
- Milliseconds period = _getRefreshPeriod(); // Supports fail injection.
- if (isExpedited) {
- period = std::min<Milliseconds>(period, kExpeditedRefreshPeriod);
- }
-
- auto currentTime = now();
- auto possibleNextScanTime = currentTime + period;
- if (refresherHandle && //
- (strategy == SchedulingStrategy::kKeepEarlyScan) && //
- (nextScanTime > currentTime) && //
- (possibleNextScanTime >= nextScanTime)) {
- // If the next scan would be sooner than our desired, why cancel?
- return;
- }
-
- // Cancel out the last refresh
- if (auto currentHandle = std::exchange(refresherHandle, {})) {
- executor->cancel(currentHandle);
- }
-
- nextScanTime = possibleNextScanTime;
- LOGV2_DEBUG(24071,
- 1,
- "Next replica set scan scheduled for {nextScanTime}",
- "Next replica set scan scheduled",
- "nextScanTime"_attr = nextScanTime);
- auto swHandle = scheduleWorkAt(nextScanTime, [this](const CallbackArgs& cbArgs) {
- if (cbArgs.myHandle != refresherHandle)
- return; // We've been replaced!
-
- // It is possible that a waiter will have already expired by the point of this rescan.
- // Thus we notify here to trigger that logic.
- notify();
-
- _ensureScanInProgress(shared_from_this());
-
- // And now we set up the next one
- rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan);
- });
-
- if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) {
- LOGV2_DEBUG(24072,
- 1,
- "Can't schedule refresh for {replicaSet}. Executor shutdown in progress",
- "Can't schedule refresh for replica set. Executor shutdown in progress",
- "replicaSet"_attr = name);
- return;
- }
-
- if (!swHandle.isOK()) {
- LOGV2_FATAL(40140,
- "Can't continue refresh for replica set {replicaSet}: {error}",
- "Can't continue refresh for replica set",
- "error"_attr = redact(swHandle.getStatus()),
- "replicaSet"_attr = name);
- }
-
- refresherHandle = std::move(swHandle.getValue());
-}
-
-SemiFuture<HostAndPort> ScanningReplicaSetMonitor::getHostOrRefresh(
- const ReadPreferenceSetting& criteria,
- const std::vector<HostAndPort>& excludedHosts,
- const CancellationToken&) {
- return _getHostsOrRefresh(
- criteria, ReplicaSetMonitorInterface::kDefaultFindHostTimeout, excludedHosts)
- .then([](const auto& hosts) {
- invariant(hosts.size());
- return hosts[0];
- })
- .semi();
-}
-
-SemiFuture<std::vector<HostAndPort>> ScanningReplicaSetMonitor::getHostsOrRefresh(
- const ReadPreferenceSetting& criteria,
- const std::vector<HostAndPort>& excludedHosts,
- const CancellationToken&) {
- return _getHostsOrRefresh(
- criteria, ReplicaSetMonitorInterface::kDefaultFindHostTimeout, excludedHosts)
- .semi();
-}
-
-Future<std::vector<HostAndPort>> ScanningReplicaSetMonitor::_getHostsOrRefresh(
- const ReadPreferenceSetting& criteria,
- Milliseconds maxWait,
- const std::vector<HostAndPort>& excludedHosts) {
-
- stdx::lock_guard<Latch> lk(_state->mutex);
- if (_state->isDropped) {
- return Status(ErrorCodes::ShutdownInProgress,
- str::stream()
- << "ScanningReplicaSetMonitor for set " << getName() << " is removed");
- }
-
- auto out = _state->getMatchingHosts(criteria, excludedHosts);
- if (!out.empty())
- return {std::move(out)};
-
- // Fail early without doing any more work if the timeout has already expired.
- if (maxWait <= Milliseconds(0))
- return _state->makeUnsatisfedReadPrefError(criteria);
-
- // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is
- // dealing with maxWait.
- auto pf = makePromiseFuture<decltype(out)>();
- _state->waiters.emplace_back(
- SetState::Waiter{_state->now() + maxWait, criteria, excludedHosts, std::move(pf.promise)});
-
- // This must go after we set up the wait state to correctly handle unittests using
- // MockReplicaSet.
- _ensureScanInProgress(_state);
-
- // Switch to expedited scanning.
- _state->isExpedited = true;
- _state->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan);
-
- return std::move(pf.future);
-}
-
-HostAndPort ScanningReplicaSetMonitor::getPrimaryOrUassert() {
- return ReplicaSetMonitorInterface::getHostOrRefresh(kPrimaryOnlyReadPreference,
- CancellationToken::uncancelable())
- .get();
-}
-
-void ScanningReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
- stdx::lock_guard<Latch> lk(_state->mutex);
- Node* node = _state->findNode(host);
- if (node)
- node->markFailed(status);
- if (kDebugBuild)
- _state->checkInvariants();
-}
-
-void ScanningReplicaSetMonitor::failedHostPreHandshake(const HostAndPort& host,
- const Status& status,
- BSONObj bson) {
- failedHost(host, status);
-}
-
-void ScanningReplicaSetMonitor::failedHostPostHandshake(const HostAndPort& host,
- const Status& status,
- BSONObj bson) {
- failedHost(host, status);
-}
-
-bool ScanningReplicaSetMonitor::isPrimary(const HostAndPort& host) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- Node* node = _state->findNode(host);
- return node ? node->isMaster : false;
-}
-
-bool ScanningReplicaSetMonitor::isHostUp(const HostAndPort& host) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- Node* node = _state->findNode(host);
- return node ? node->isUp : false;
-}
-
-int ScanningReplicaSetMonitor::getMinWireVersion() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- int minVersion = 0;
- for (const auto& host : _state->nodes) {
- if (host.isUp) {
- minVersion = std::max(minVersion, host.minWireVersion);
- }
- }
-
- return minVersion;
-}
-
-int ScanningReplicaSetMonitor::getMaxWireVersion() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- int maxVersion = std::numeric_limits<int>::max();
- for (const auto& host : _state->nodes) {
- if (host.isUp) {
- maxVersion = std::min(maxVersion, host.maxWireVersion);
- }
- }
-
- return maxVersion;
-}
-
-std::string ScanningReplicaSetMonitor::getName() const {
- // name is const so don't need to lock
- return _state->name;
-}
-
-std::string ScanningReplicaSetMonitor::getServerAddress() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- // We return our setUri until first confirmation
- return _state->seedConnStr.isValid() ? _state->seedConnStr.toString()
- : _state->setUri.connectionString().toString();
-}
-
-const MongoURI& ScanningReplicaSetMonitor::getOriginalUri() const {
- // setUri is const so no need to lock.
- return _state->setUri;
-}
-
-bool ScanningReplicaSetMonitor::contains(const HostAndPort& host) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
- return _state->seedNodes.count(host);
-}
-
-// TODO move to correct order with non-statics before pushing
-void ScanningReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const {
- stdx::lock_guard<Latch> lk(_state->mutex);
-
- BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName()));
- if (forFTDC) {
- for (size_t i = 0; i < _state->nodes.size(); i++) {
- const Node& node = _state->nodes[i];
- monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node));
- }
- return;
- }
-
- // NOTE: the format here must be consistent for backwards compatibility
- BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts"));
- for (size_t i = 0; i < _state->nodes.size(); i++) {
- const Node& node = _state->nodes[i];
-
- BSONObjBuilder builder;
- builder.append("addr", node.host.toString());
- builder.append("ok", node.isUp);
- builder.append("ismaster", node.isMaster); // intentionally not camelCase
- builder.append("hidden", false); // we don't keep hidden nodes in the set
- builder.append("secondary", node.isUp && !node.isMaster);
- builder.append("pingTimeMillis", pingTimeMillis(node));
-
- if (!node.tags.isEmpty()) {
- builder.append("tags", node.tags);
- }
-
- hosts.append(builder.obj());
- }
-}
-bool ScanningReplicaSetMonitor::isKnownToHaveGoodPrimary() const {
- stdx::lock_guard<Latch> lk(_state->mutex);
-
- for (const auto& node : _state->nodes) {
- if (node.isMaster) {
- return true;
- }
- }
-
- return false;
-}
-
-Seconds ScanningReplicaSetMonitor::_getRefreshPeriod() {
- boost::optional<Seconds> failInjected = _getFailInjectedRefreshPeriod();
- return failInjected ? *failInjected : kDefaultRefreshPeriod;
-}
-
-boost::optional<Seconds> ScanningReplicaSetMonitor::_getFailInjectedRefreshPeriod() {
- boost::optional<Seconds> result;
- static constexpr auto kPeriodField = "period"_sd;
- globalFailPointRegistry()
- .find("modifyReplicaSetMonitorDefaultRefreshPeriod")
- ->executeIf(
- [&result](const BSONObj& data) { result = Seconds{data.getIntField(kPeriodField)}; },
- [](const BSONObj& data) { return data.hasField(kPeriodField); });
- return result;
-}
-
-void ScanningReplicaSetMonitor::runScanForMockReplicaSet() {
- stdx::lock_guard<Latch> lk(_state->mutex);
- _ensureScanInProgress(_state);
-
- // This function should only be called from tests using MockReplicaSet and they should use the
- // synchronous path to complete before returning.
- invariant(_state->currentScan == nullptr);
-}
-
-namespace {
-AtomicWord<bool> refreshRetriesDisabledForTest{false};
-} // namespace
-
-void ScanningReplicaSetMonitor::disableRefreshRetries_forTest() {
- refreshRetriesDisabledForTest.store(true);
-}
-
-bool ScanningReplicaSetMonitor::areRefreshRetriesDisabledForTest() {
- return refreshRetriesDisabledForTest.load();
-}
-
-void ScanningReplicaSetMonitor::_ensureScanInProgress(const SetStatePtr& state) {
- Refresher(state).scheduleNetworkRequests();
-}
-
-Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) {
- if (_scan) {
- _set->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan);
- _scan->retryAllTriedHosts(_set->rand);
- return; // participate in in-progress scan
- }
-
- startNewScan();
-}
-
-void Refresher::scheduleNetworkRequests() {
- for (auto ns = getNextStep(); ns.step == NextStep::CONTACT_HOST; ns = getNextStep()) {
- if (!_set->executor || _set->isMocked) {
- // If we're mocked, just schedule an isMaster
- scheduleIsMaster(ns.host);
- continue;
- }
-
- // cancel any scheduled isMaster calls that haven't yet been called
- Node* node = _set->findOrCreateNode(ns.host);
- if (auto handle = std::exchange(node->scheduledIsMasterHandle, {})) {
- _set->executor->cancel(handle);
- }
-
- // ensure that the call to isMaster is scheduled at most every 500ms
- auto swHandle =
- _set->scheduleWorkAt(node->nextPossibleIsMasterCall,
- [*this, host = ns.host](const CallbackArgs& cbArgs) mutable {
- scheduleIsMaster(host);
- });
-
- if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) {
- _scan->markHostsToScanAsTried();
- break;
- }
-
- if (!swHandle.isOK()) {
- LOGV2_FATAL(31176,
- "Can't continue scan for replica set {replicaSet}: {error}",
- "Can't continue scan for replica set",
- "error"_attr = redact(swHandle.getStatus()),
- "replicaSet"_attr = _set->name);
- }
-
- node->scheduledIsMasterHandle = uassertStatusOK(std::move(swHandle));
- }
-
- if (kDebugBuild)
- _set->checkInvariants();
-}
-
-void Refresher::scheduleIsMaster(const HostAndPort& host) {
- if (_set->isMocked) {
- // MockReplicaSet only works with DBClient-style access since it injects itself into the
- // ScopedDbConnection pool connection creation.
- try {
- ScopedDbConnection conn(ConnectionString(host), kCheckTimeout.count());
-
- auto timer = Timer();
- auto reply = BSONObj();
- bool ignoredOutParam = false;
- conn->isPrimary(ignoredOutParam, &reply);
- conn.done(); // return to pool on success.
-
- receivedIsMaster(host, timer.micros(), reply);
- } catch (DBException& ex) {
- failedHost(host, ex.toStatus());
- }
-
- return;
- }
-
- BSONObjBuilder bob;
- bob.append("isMaster", 1);
- if (auto wireSpec = WireSpec::instance().get(); wireSpec->isInternalClient) {
- WireSpec::appendInternalClientWireVersion(wireSpec->outgoing, &bob);
- }
- auto request = executor::RemoteCommandRequest(host, "admin", bob.obj(), nullptr, kCheckTimeout);
- request.sslMode = _set->setUri.getSSLMode();
-
- auto status =
- _set->executor
- ->scheduleRemoteCommand(
- std::move(request),
- [copy = *this, host, timer = Timer()](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
- stdx::lock_guard lk(copy._set->mutex);
- // Ignore the reply and return if we are no longer the current scan. This might
- // happen if it was decided that the host we were contacting isn't part of the
- // set.
- if (copy._scan != copy._set->currentScan) {
- return;
- }
-
- // ensure that isMaster calls occur at most 500ms after the previous call ended
- if (auto node = copy._set->findNode(host)) {
- node->nextPossibleIsMasterCall =
- copy._set->executor->now() + Milliseconds(500);
- }
-
- if (result.response.isOK()) {
- // Not using result.response.elapsedMillis because higher precision is
- // useful for computing the rolling average.
- copy.receivedIsMaster(host, timer.micros(), result.response.data);
- } else {
- copy.failedHost(host, result.response.status);
- }
-
- // This reply may have discovered new hosts to contact so we need to schedule
- // them.
- copy.scheduleNetworkRequests();
- })
- .getStatus();
-
- if (!status.isOK()) {
- failedHost(host, status);
- // This is only called from scheduleNetworkRequests() which will still be looping, so we
- // don't need to call it here after updating the state.
- }
-}
-
-Refresher::NextStep Refresher::getNextStep() {
- // No longer the current scan
- if (_scan != _set->currentScan) {
- return NextStep(NextStep::DONE);
- }
-
- // Wait for all dispatched hosts to return before trying any fallback hosts.
- if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) {
- return NextStep(NextStep::WAIT);
- }
-
- // If we haven't yet found a master, try contacting unconfirmed hosts
- if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) {
- _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand);
- _scan->possibleNodes.clear();
- }
-
- if (_scan->hostsToScan.empty()) {
- // We've tried all hosts we can, so nothing more to do in this round.
- if (!_scan->foundUpMaster) {
- LOGV2_WARNING(24089,
- "Unable to reach primary for replica set {replicaSet}",
- "Unable to reach primary for replica set",
- "replicaSet"_attr = _set->name);
-
- // Since we've talked to everyone we could but still didn't find a primary, we
- // do the best we can, and assume all unconfirmedReplies are actually from nodes
- // in the set (we've already confirmed that they think they are). This is
- // important since it allows us to bootstrap to a usable state even if we are
- // unable to talk to a master while starting up. As soon as we are able to
- // contact a master, we will remove any nodes that it doesn't think are part of
- // the set, undoing the damage we cause here.
-
- // NOTE: we don't modify seedNodes or notify about set membership change in this
- // case since it hasn't been confirmed by a master.
- for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
- it != _scan->unconfirmedReplies.end();
- ++it) {
- _set->findOrCreateNode(it->first)->update(it->second);
- }
-
- auto connStr = _set->possibleConnectionString();
- if (connStr != _set->workingConnStr) {
- _set->workingConnStr = std::move(connStr);
- _set->notifier->onPossibleSet(_set->workingConnStr);
- }
-
- // If at some point we care about lacking a primary, on it here
- _set->lastSeenMaster = {};
- }
-
- if (_scan->foundAnyUpNodes) {
- _set->consecutiveFailedScans = 0;
- } else {
- auto nScans = _set->consecutiveFailedScans++;
- if (nScans <= 10 || nScans % 10 == 0) {
- LOGV2(24073,
- "Cannot reach any nodes for replica set {replicaSet}. Please check network "
- "connectivity and the status of the set. This has happened for "
- "{numConsecutiveFailedScans} checks in a row.",
- "Cannot reach any nodes for replica set. Please check network connectivity "
- "and the status of the set. A number of consecutive scan checks have failed",
- "replicaSet"_attr = _set->name,
- "numConsecutiveFailedScans"_attr = _set->consecutiveFailedScans);
- }
- }
-
- // Makes sure all other Refreshers in this round return DONE
- _set->currentScan.reset();
- _set->notify();
-
- LOGV2_DEBUG(24074,
- 1,
- "Refreshing replica set {replicaSet} took {duration}",
- "Replica set refreshed",
- "duration"_attr = Milliseconds(_scan->timer.millis()),
- "replicaSet"_attr = _set->name);
-
- return NextStep(NextStep::DONE);
- }
-
- // Pop and return the next hostToScan.
- HostAndPort host = _scan->hostsToScan.front();
- _scan->hostsToScan.pop_front();
- _scan->waitingFor.insert(host);
- _scan->triedHosts.insert(host);
-
- return NextStep(NextStep::CONTACT_HOST, host);
-}
-
-void Refresher::receivedIsMaster(const HostAndPort& from,
- int64_t latencyMicros,
- const BSONObj& replyObj) {
- _scan->waitingFor.erase(from);
-
- const IsMasterReply reply(from, latencyMicros, replyObj);
-
- // Handle various failure cases
- if (!reply.ok) {
- failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"});
- return;
- }
-
- if (reply.setName != _set->name) {
- if (reply.raw["isreplicaset"].trueValue()) {
- // The reply came from a node in the state referred to as RSGhost in the SDAM
- // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event,
- // if a reply from a ghost offers a list of possible other members of the replica set,
- // and if this refresher has yet to find the replica set master, we add hosts listed in
- // the reply to the list of possible replica set members.
- if (!_scan->foundUpMaster) {
- _scan->possibleNodes.insert(reply.members.begin(), reply.members.end());
- }
- } else {
- LOGV2_ERROR(
- 24091,
- "replset name mismatch: expected {expectedSetName}, but remote node "
- "{fromRemoteNode} has replset name {receivedSetName}, isMaster: {isMasterReply}",
- "replset name mismatch. The expected set name does not match the set name "
- "received from the remote node",
- "expectedSetName"_attr = _set->name,
- "fromRemoteNode"_attr = from,
- "receivedSetName"_attr = reply.setName,
- "isMasterReply"_attr = replyObj);
- }
-
- failedHost(from,
- {ErrorCodes::InconsistentReplicaSetNames,
- str::stream() << "Target replica set name " << reply.setName
- << " does not match the monitored set name " << _set->name});
- return;
- }
-
- if (reply.isMaster) {
- Status status = receivedIsMasterFromMaster(from, reply);
- if (!status.isOK()) {
- failedHost(from, status);
- return;
- }
- }
-
- if (_scan->foundUpMaster) {
- // We only update a Node if a master has confirmed it is in the set.
- _set->updateNodeIfInNodes(reply);
- } else {
- // Populate possibleNodes.
- _scan->possibleNodes.insert(reply.members.begin(), reply.members.end());
- _scan->unconfirmedReplies[from] = reply;
- }
-
- // _set->nodes may still not have any nodes with isUp==true, but we have at least found a
- // connectible host that is that claims to be in the set.
- _scan->foundAnyUpNodes = true;
-
- _set->notify();
-
- if (kDebugBuild)
- _set->checkInvariants();
-}
-
-void Refresher::failedHost(const HostAndPort& host, const Status& status) {
- _scan->waitingFor.erase(host);
-
- Node* node = _set->findNode(host);
- if (node)
- node->markFailed(status);
-
- if (_scan->waitingFor.empty()) {
- // If this was the last host that needed a response, we should notify the SetState so that
- // we can fail any waiters that have timed out.
- _set->notify();
- }
-}
-
-void Refresher::startNewScan() {
- // The heuristics we use in deciding the order to contact hosts are designed to find a
- // master as quickly as possible. This is because we can't use any hosts we find until
- // we either get the latest set of members from a master or talk to all possible hosts
- // without finding a master.
-
- // TODO It might make sense to check down nodes first if the last seen master is still
- // marked as up.
-
- _scan = std::make_shared<ScanState>();
- _set->currentScan = _scan;
-
- int upNodes = 0;
- for (Nodes::const_iterator it(_set->nodes.begin()), end(_set->nodes.end()); it != end; ++it) {
- if (it->isUp) {
- // _scan the nodes we think are up first
- _scan->hostsToScan.push_front(it->host);
- upNodes++;
- } else {
- _scan->hostsToScan.push_back(it->host);
- }
- }
-
- // shuffle the queue, but keep "up" nodes at the front
- std::shuffle(
- _scan->hostsToScan.begin(), _scan->hostsToScan.begin() + upNodes, _set->rand.urbg());
- std::shuffle(_scan->hostsToScan.begin() + upNodes, _scan->hostsToScan.end(), _set->rand.urbg());
-
- if (!_set->lastSeenMaster.empty()) {
- // move lastSeenMaster to front of queue
- std::stable_partition(
- _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(_set->lastSeenMaster));
- }
-}
-
-Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) {
- invariant(reply.isMaster);
-
- // Reject if config version is older. This is for backwards compatibility with nodes in pv0
- // since they don't have the same ordering with pv1 electionId.
- if (reply.configVersion < _set->configVersion) {
- return {
- ErrorCodes::NotWritablePrimary,
- str::stream() << "Node " << from << " believes it is primary, but its config version "
- << reply.configVersion << " is older than the most recent config version "
- << _set->configVersion};
- }
-
- if (reply.electionId.isSet()) {
- // ElectionIds are only comparable if they are of the same protocol version. However, since
- // isMaster has no protocol version field, we use the configVersion instead. This works
- // because configVersion needs to be incremented whenever the protocol version is changed.
- if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() &&
- _set->maxElectionId.compare(reply.electionId) > 0) {
- return {
- ErrorCodes::NotWritablePrimary,
- str::stream() << "Node " << from << " believes it is primary, but its election id "
- << reply.electionId << " is older than the most recent election id "
- << _set->maxElectionId};
- }
-
- _set->maxElectionId = reply.electionId;
- }
-
- _set->configVersion = reply.configVersion;
-
- // Mark all nodes as not master. We will mark ourself as master before releasing the lock.
- // NOTE: we use a "last-wins" policy if multiple hosts claim to be master.
- for (size_t i = 0; i < _set->nodes.size(); i++) {
- _set->nodes[i].isMaster = false;
- }
-
- // Check if the master agrees with our current list of nodes.
- // REMINDER: both _set->nodes and reply.members are sorted.
- if (_set->nodes.size() != reply.members.size() ||
- !std::equal(_set->nodes.begin(), _set->nodes.end(), reply.members.begin(), hostsEqual)) {
- LOGV2_DEBUG(24075,
- 2,
- "Adjusting nodes in our view of replica set {replicaSet} based on isMaster "
- "reply: {isMasterReply}",
- "Adjusting nodes in our view of the replica set based on isMaster reply",
- "replicaSet"_attr = _set->name,
- "isMasterReply"_attr = redact(reply.raw));
-
- // remove non-members from _set->nodes
- _set->nodes.erase(
- std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.members)),
- _set->nodes.end());
-
- // add new members to _set->nodes
- for (auto& host : reply.members) {
- _set->findOrCreateNode(host);
- }
-
- // replace hostToScan queue with untried normal hosts. can both add and remove
- // hosts from the queue.
- _scan->hostsToScan.clear();
- _scan->enqueAllUntriedHosts(reply.members, _set->rand);
-
- if (!_scan->waitingFor.empty()) {
- // make sure we don't wait for any hosts that aren't considered members
- std::set<HostAndPort> newWaitingFor;
- std::set_intersection(reply.members.begin(),
- reply.members.end(),
- _scan->waitingFor.begin(),
- _scan->waitingFor.end(),
- std::inserter(newWaitingFor, newWaitingFor.end()));
- _scan->waitingFor.swap(newWaitingFor);
- }
- }
-
- bool changedHosts = reply.members != _set->seedNodes;
- bool changedPrimary = reply.host != _set->lastSeenMaster;
- if (changedHosts || changedPrimary) {
- ++_set->seedGen;
- _set->seedNodes = reply.members;
- _set->seedConnStr = _set->confirmedConnectionString();
-
- // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare
- // and we want to record our changes
- LOGV2(24076,
- "Confirmed replica set for {replicaSet} is {connectionString}",
- "Confirmed replica set",
- "replicaSet"_attr = _set->name,
- "connectionString"_attr = _set->seedConnStr);
-
- _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host, reply.passives);
- }
-
- // Update our working string
- _set->workingConnStr = _set->seedConnStr;
-
- // Update other nodes's information based on replies we've already seen
- for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
- it != _scan->unconfirmedReplies.end();
- ++it) {
- // this ignores replies from hosts not in _set->nodes (as modified above)
- _set->updateNodeIfInNodes(it->second);
- }
- _scan->unconfirmedReplies.clear();
-
- _scan->foundUpMaster = true;
- _set->lastSeenMaster = reply.host;
-
- return Status::OK();
-}
-
-void IsMasterReply::parse(const BSONObj& obj) {
- try {
- raw = obj.getOwned(); // don't use obj again after this line
-
- ok = raw["ok"].trueValue();
- if (!ok)
- return;
-
- setName = raw["setName"].str();
- hidden = raw["hidden"].trueValue();
- secondary = raw["secondary"].trueValue();
-
- minWireVersion = raw["minWireVersion"].numberInt();
- maxWireVersion = raw["maxWireVersion"].numberInt();
-
- // hidden nodes can't be master, even if they claim to be.
- isMaster = !hidden && (raw["isWritablePrimary"].trueValue() || raw["ismaster"].trueValue());
-
- if (isMaster && raw.hasField("electionId")) {
- electionId = raw["electionId"].OID();
- }
-
- configVersion = raw["setVersion"].numberInt();
-
- const string primaryString = raw["primary"].str();
- primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString);
-
- // both hosts and passives, but not arbiters, are considered "normal hosts"
- members.clear();
- BSONForEach(host, raw.getObjectField("hosts")) {
- members.insert(HostAndPort(host.String()));
- }
- BSONForEach(host, raw.getObjectField("passives")) {
- members.insert(HostAndPort(host.String()));
- passives.insert(HostAndPort(host.String()));
- }
-
- tags = raw.getObjectField("tags");
- BSONObj lastWriteField = raw.getObjectField("lastWrite");
- if (!lastWriteField.isEmpty()) {
- if (auto lastWrite = lastWriteField["lastWriteDate"]) {
- lastWriteDate = lastWrite.date();
- }
-
- uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime));
- }
- } catch (const std::exception& e) {
- ok = false;
- LOGV2(24077,
- "Exception while parsing isMaster reply: {error} {isMasterReply}",
- "Exception while parsing isMaster reply",
- "error"_attr = e.what(),
- "isMasterReply"_attr = obj);
- }
-}
-
-Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {}
-
-void Node::markFailed(const Status& status) {
- if (isUp) {
- LOGV2(24078,
- "Marking host {host} as failed: {error}",
- "Marking host as failed",
- "host"_attr = host,
- "error"_attr = redact(status));
-
- isUp = false;
- }
-
- isMaster = false;
-}
-
-bool Node::matches(const ReadPreference pref) const {
- if (!isUp) {
- LOGV2_DEBUG(24079, 3, "Host {host} is not up", "Host is not up", "host"_attr = host);
- return false;
- }
-
- LOGV2_DEBUG(24080,
- 3,
- "Host {host} is primary? {isPrimary}",
- "Host is primary?",
- "host"_attr = host,
- "isPrimary"_attr = isMaster);
- if (pref == ReadPreference::PrimaryOnly) {
- return isMaster;
- }
-
- if (pref == ReadPreference::SecondaryOnly) {
- return !isMaster;
- }
-
- return true;
-}
-
-bool Node::matches(const BSONObj& tag) const {
- BSONForEach(tagCriteria, tag) {
- if (SimpleBSONElementComparator::kInstance.evaluate(
- this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) {
- return false;
- }
- }
-
- return true;
-}
-
-void Node::update(const IsMasterReply& reply) {
- invariant(host == reply.host);
- invariant(reply.ok);
-
- LOGV2_DEBUG(24081,
- 3,
- "Updating host {host} based on isMaster reply: {isMasterReply}",
- "Updating host based on isMaster reply",
- "host"_attr = host,
- "isMasterReply"_attr = reply.raw);
-
- // Nodes that are hidden or neither master or secondary are considered down since we can't
- // send any operations to them.
- isUp = !reply.hidden && (reply.isMaster || reply.secondary);
- isMaster = reply.isMaster;
-
- minWireVersion = reply.minWireVersion;
- maxWireVersion = reply.maxWireVersion;
-
- // save a copy if unchanged
- if (!tags.binaryEqual(reply.tags))
- tags = reply.tags.getOwned();
-
- if (reply.latencyMicros >= 0) { // TODO upper bound?
- if (latencyMicros == unknownLatency) {
- latencyMicros = reply.latencyMicros;
- } else {
- // update latency with smoothed moving average (1/4th the delta)
- latencyMicros += (reply.latencyMicros - latencyMicros) / 4;
- }
- }
-
- LOGV2_DEBUG(24082,
- 3,
- "Updating {host} lastWriteDate to {lastWriteDate}",
- "Updating host lastWriteDate",
- "host"_attr = host,
- "lastWriteDate"_attr = reply.lastWriteDate);
- lastWriteDate = reply.lastWriteDate;
-
- LOGV2_DEBUG(24083,
- 3,
- "Updating {host} opTime to {opTime}",
- "Updating host opTime",
- "host"_attr = host,
- "opTime"_attr = reply.opTime);
- opTime = reply.opTime;
- lastWriteDateUpdateTime = Date_t::now();
-}
-
-SetState::SetState(const MongoURI& uri,
- ReplicaSetChangeNotifier* notifier,
- executor::TaskExecutor* executor)
- : setUri(std::move(uri)),
- name(setUri.getSetName()),
- notifier(notifier),
- executor(executor),
- seedNodes(setUri.getServers().begin(), setUri.getServers().end()),
- latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)),
- rand(std::random_device()()),
- refreshPeriod(_getRefreshPeriod()) {
- uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty());
-
- if (name.empty())
- LOGV2_WARNING(24090,
- "Replica set name empty, first node: {firstNode}",
- "Replica set name empty, adding first node",
- "firstNode"_attr = *(seedNodes.begin()));
-
- // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a
- // scan until we start a scan and either find a master or contact all hosts without finding
- // one.
- // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to
- // sort nodes after this loop.
- for (auto&& addr : seedNodes) {
- nodes.push_back(Node(addr));
-
- if (addr.host()[0] == '$') {
- invariant(isMocked || &addr == &*seedNodes.begin()); // Can't mix and match.
- isMocked = true;
- } else {
- invariant(!isMocked); // Can't mix and match.
- }
- }
-
- if (kDebugBuild)
- checkInvariants();
-}
-
-HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria,
- const std::vector<HostAndPort>& excludedHosts) const {
- auto hosts = getMatchingHosts(criteria, excludedHosts);
-
- if (hosts.empty()) {
- return HostAndPort();
- }
-
- return hosts[0];
-}
-
-std::vector<HostAndPort> SetState::getMatchingHosts(
- const ReadPreferenceSetting& criteria, const std::vector<HostAndPort>& excludedHosts) const {
- switch (criteria.pref) {
- // "Prefered" read preferences are defined in terms of other preferences
- case ReadPreference::PrimaryPreferred: {
- std::vector<HostAndPort> out = getMatchingHosts(
- ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags), excludedHosts);
- // NOTE: the spec says we should use the primary even if tags don't match
- if (!out.empty())
- return out;
- return getMatchingHosts(ReadPreferenceSetting(ReadPreference::SecondaryOnly,
- criteria.tags,
- criteria.maxStalenessSeconds),
- excludedHosts);
- }
-
- case ReadPreference::SecondaryPreferred: {
- std::vector<HostAndPort> out = getMatchingHosts(
- ReadPreferenceSetting(
- ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds),
- excludedHosts);
- if (!out.empty())
- return out;
- // NOTE: the spec says we should use the primary even if tags don't match
- return getMatchingHosts(
- ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags), excludedHosts);
- }
-
- case ReadPreference::PrimaryOnly: {
- // NOTE: isMaster implies isUp
- Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster);
- if (it == nodes.end())
- return {};
- const auto& primaryHost = it->host;
-
- // If the primary is excluded and we have specified readPreference primaryOnly, return
- // empty host.
- if (std::find(excludedHosts.begin(), excludedHosts.end(), primaryHost) !=
- excludedHosts.end()) {
- return {};
- }
- return {primaryHost};
- }
-
- // The difference between these is handled by Node::matches
- case ReadPreference::SecondaryOnly:
- case ReadPreference::Nearest: {
- std::function<bool(const Node&)> matchNode = [](const Node& node) -> bool {
- return true;
- };
- // build comparator
- if (criteria.maxStalenessSeconds.count()) {
- auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster);
- if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) {
- auto writeDateCmp = [](const Node* a, const Node* b) -> bool {
- return a->lastWriteDate < b->lastWriteDate;
- };
- // use only non failed nodes
- std::vector<const Node*> upNodes;
- for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) {
- if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) {
- upNodes.push_back(&(*nodeIt));
- }
- }
- auto latestSecNode =
- std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp);
- if (latestSecNode == upNodes.end()) {
- matchNode = [](const Node& node) -> bool { return false; };
- } else {
- Date_t maxWriteTime = (*latestSecNode)->lastWriteDate;
- matchNode = [=](const Node& node) -> bool {
- return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) +
- refreshPeriod <=
- criteria.maxStalenessSeconds;
- };
- }
- } else {
- Seconds primaryStaleness = duration_cast<Seconds>(
- masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate);
- matchNode = [=](const Node& node) -> bool {
- return duration_cast<Seconds>(node.lastWriteDateUpdateTime -
- node.lastWriteDate) -
- primaryStaleness + refreshPeriod <=
- criteria.maxStalenessSeconds;
- };
- }
- }
-
- std::vector<const Node*> allMatchingNodes;
- BSONForEach(tagElem, criteria.tags.getTagBSON()) {
- uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj());
- BSONObj tag = tagElem.Obj();
-
- std::vector<const Node*> matchingNodes;
- for (size_t i = 0; i < nodes.size(); i++) {
- auto isNotExcluded =
- (std::find(excludedHosts.begin(), excludedHosts.end(), nodes[i].host) ==
- excludedHosts.end());
- if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) &&
- matchNode(nodes[i]) && isNotExcluded) {
- matchingNodes.push_back(&nodes[i]);
- }
- }
-
- // Only consider nodes that satisfy the minClusterTime
- if (!criteria.minClusterTime.isNull()) {
- std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater);
- for (size_t i = 0; i < matchingNodes.size(); i++) {
- if (matchingNodes[i]->opTime.getTimestamp() < criteria.minClusterTime) {
- if (i == 0) {
- // If no nodes satisfy the minClusterTime criteria, we ignore the
- // minClusterTime requirement.
- break;
- }
- matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end());
- break;
- }
- }
- }
-
- allMatchingNodes.insert(
- allMatchingNodes.end(), matchingNodes.begin(), matchingNodes.end());
- }
-
- // don't do more complicated selection if not needed
- if (allMatchingNodes.empty()) {
- return {};
- }
- if (allMatchingNodes.size() == 1) {
- return {allMatchingNodes.front()->host};
- }
-
- // If there are multiple nodes satisfying the minClusterTime, next order by latency and
- // don't consider hosts further than a threshold from the closest.
- std::sort(allMatchingNodes.begin(), allMatchingNodes.end(), compareLatencies);
-
- if (!MONGO_unlikely(scanningServerSelectorIgnoreLatencyWindow.shouldFail())) {
- for (size_t i = 1; i < allMatchingNodes.size(); i++) {
- int64_t distance =
- allMatchingNodes[i]->latencyMicros - allMatchingNodes[0]->latencyMicros;
- if (distance >= latencyThresholdMicros) {
- // this node and all remaining ones are too far away
- allMatchingNodes.erase(allMatchingNodes.begin() + i,
- allMatchingNodes.end());
- break;
- }
- }
- }
-
- std::vector<HostAndPort> hosts;
- std::transform(allMatchingNodes.begin(),
- allMatchingNodes.end(),
- std::back_inserter(hosts),
- [](const auto& node) { return node->host; });
-
- // Note that the host list is only deterministic (or random) for the first node.
- // The rest of the list is in matchingNodes order (latency) with one element swapped
- // for the first element.
- if (auto bestHostIdx = useDeterministicHostSelection ? roundRobin++ % hosts.size()
- : rand.nextInt32(hosts.size())) {
- using std::swap;
- swap(hosts[0], hosts[bestHostIdx]);
- }
-
- return hosts;
- }
-
- default:
- uassert(16337, "Unknown read preference", false);
- break;
- }
-}
-
-Node* SetState::findNode(const HostAndPort& host) {
- const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
- if (it == nodes.end() || it->host != host)
- return nullptr;
-
- return &(*it);
-}
-
-Node* SetState::findOrCreateNode(const HostAndPort& host) {
- // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class
- // must function correctly even with more nodes). If we lift that restriction, we may need
- // to consider alternate algorithms.
- Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts);
- if (it == nodes.end() || it->host != host) {
- LOGV2_DEBUG(24084,
- 2,
- "Adding node {host} to our view of replica set {replicaSet}",
- "Adding node to our view of the replica set",
- "host"_attr = host,
- "replicaSet"_attr = name);
- it = nodes.insert(it, Node(host));
- }
- return &(*it);
-}
-
-void SetState::updateNodeIfInNodes(const IsMasterReply& reply) {
- Node* node = findNode(reply.host);
- if (!node) {
- LOGV2_DEBUG(24085,
- 2,
- "Skipping application of isMaster reply from {host} since it isn't a "
- "confirmed member of set {replicaSet}",
- "Skipping application of isMaster reply from host since it isn't a confirmed "
- "member of the replica set",
- "host"_attr = reply.host,
- "replicaSet"_attr = name);
- return;
- }
-
- node->update(reply);
-}
-
-ConnectionString SetState::confirmedConnectionString() const {
- std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes));
-
- return ConnectionString::forReplicaSet(name, std::move(hosts));
-}
-
-ConnectionString SetState::possibleConnectionString() const {
- std::vector<HostAndPort> hosts;
- hosts.reserve(nodes.size());
-
- for (auto& node : nodes) {
- hosts.push_back(node.host);
- }
-
- return ConnectionString::forReplicaSet(name, std::move(hosts));
-}
-
-void SetState::notify() {
- if (!waiters.size()) {
- return;
- }
-
- const auto cachedNow = now();
- auto shouldQuickFail = areRefreshRetriesDisabledForTest() && !currentScan;
-
- for (auto it = waiters.begin(); it != waiters.end();) {
- if (isDropped) {
- it->promise.setError({ErrorCodes::ShutdownInProgress,
- str::stream() << "ScanningReplicaSetMonitor is shutting down"});
- waiters.erase(it++);
- continue;
- }
- auto match = getMatchingHosts(it->criteria, it->excludedHosts);
- if (!match.empty()) {
- // match;
- it->promise.emplaceValue(std::move(match));
- waiters.erase(it++);
- } else if (it->deadline <= cachedNow) {
- LOGV2_DEBUG(24086,
- 1,
- "Unable to statisfy read preference {criteria} by deadline {deadline}",
- "Unable to statisfy read preference by deadline",
- "criteria"_attr = it->criteria,
- "deadline"_attr = it->deadline);
- it->promise.setError(makeUnsatisfedReadPrefError(it->criteria));
- waiters.erase(it++);
- } else if (shouldQuickFail) {
- LOGV2_DEBUG(24087, 1, "Unable to statisfy read preference because tests fail quickly");
- it->promise.setError(makeUnsatisfedReadPrefError(it->criteria));
- waiters.erase(it++);
- } else {
- it++;
- }
- }
-
- if (waiters.empty()) {
- // No current waiters so we can stop the expedited scanning.
- isExpedited = false;
- rescheduleRefresh(SchedulingStrategy::kCancelPreviousScan);
- }
-}
-
-Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const {
- return Status(ErrorCodes::FailedToSatisfyReadPreference,
- str::stream() << "Could not find host matching read preference "
- << criteria.toString() << " for set " << name);
-}
-
-void SetState::init() {
- rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan);
- notifier->onFoundSet(name);
-}
-
-void SetState::drop() {
- // This is invoked from ScanningReplicaSetMonitor::drop() under lock.
- if (std::exchange(isDropped, true)) {
- // If a SetState calls drop() from destruction after the RSMM calls shutdown(), then the
- // RSMM's executor may no longer exist. Thus, only drop once.
- return;
- }
-
- currentScan.reset();
- notify();
-
- if (auto handle = std::exchange(refresherHandle, {})) {
- // Cancel our refresh on the way out
- executor->cancel(handle);
- }
-
- for (auto& node : nodes) {
- if (auto handle = std::exchange(node.scheduledIsMasterHandle, {})) {
- // Cancel any isMasters we had scheduled
- executor->cancel(handle);
- }
- }
-
- // No point in notifying if we never started
- if (workingConnStr.isValid()) {
- notifier->onDroppedSet(name);
- }
-}
-
-void SetState::checkInvariants() const {
- bool foundMaster = false;
- for (size_t i = 0; i < nodes.size(); i++) {
- // no empty hosts
- invariant(!nodes[i].host.empty());
-
- if (nodes[i].isMaster) {
- // masters must be up
- invariant(nodes[i].isUp);
-
- // at most one master
- invariant(!foundMaster);
- foundMaster = true;
-
- // if we have a master it should be the same as lastSeenMaster
- invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster);
- }
-
- // should never end up with negative latencies
- invariant(nodes[i].latencyMicros >= 0);
-
- // nodes must be sorted by host with no-dupes
- invariant(i == 0 || (nodes[i - 1].host < nodes[i].host));
- }
-
- // nodes should be a (non-strict) superset of the seedNodes
- invariant(std::includes(
- nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts));
-
- if (currentScan) {
- // hostsToScan can't have dups or hosts already in triedHosts.
- std::set<HostAndPort> cantSee = currentScan->triedHosts;
- for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin();
- it != currentScan->hostsToScan.end();
- ++it) {
- invariant(!cantSee.count(*it));
- cantSee.insert(*it); // make sure we don't see this again
- }
-
- // We should only be waitingFor hosts that are in triedHosts
- invariant(std::includes(currentScan->triedHosts.begin(),
- currentScan->triedHosts.end(),
- currentScan->waitingFor.begin(),
- currentScan->waitingFor.end()));
-
- // We should only have unconfirmedReplies if we haven't found a master yet
- invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty());
- }
-}
-
-template <typename Container>
-void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) {
- invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue.
-
- // no std::copy_if before c++11
- for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end;
- ++it) {
- if (!triedHosts.count(*it)) {
- hostsToScan.push_back(*it);
- }
- }
- std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg());
-}
-
-void ScanState::retryAllTriedHosts(PseudoRandom& rand) {
- invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue.
- // Move hosts that are in triedHosts but not in waitingFor from triedHosts to hostsToScan.
- std::set_difference(triedHosts.begin(),
- triedHosts.end(),
- waitingFor.begin(),
- waitingFor.end(),
- std::inserter(hostsToScan, hostsToScan.end()));
- std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg());
- triedHosts = waitingFor;
-}
-
-void ScanState::markHostsToScanAsTried() noexcept {
- while (!hostsToScan.empty()) {
- auto host = hostsToScan.front();
- hostsToScan.pop_front();
- /**
- * Mark the popped host as tried to avoid deleting hosts in multiple points.
- * This emulates the final effect of Refresher::getNextStep() on the set.
- */
- triedHosts.insert(host);
- }
-}
-} // namespace mongo
diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h
deleted file mode 100644
index fca197a409a..00000000000
--- a/src/mongo/client/scanning_replica_set_monitor.h
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <functional>
-#include <memory>
-#include <set>
-#include <string>
-
-#include "mongo/base/string_data.h"
-#include "mongo/client/mongo_uri.h"
-#include "mongo/client/replica_set_change_notifier.h"
-#include "mongo/client/replica_set_monitor.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/util/concurrency/with_lock.h"
-#include "mongo/util/duration.h"
-#include "mongo/util/net/hostandport.h"
-#include "mongo/util/time_support.h"
-
-namespace mongo {
-
-class ScanningReplicaSetMonitor : public ReplicaSetMonitor {
- ScanningReplicaSetMonitor(const ScanningReplicaSetMonitor&) = delete;
- ScanningReplicaSetMonitor& operator=(const ScanningReplicaSetMonitor&) = delete;
-
-public:
- class Refresher;
-
- /**
- * Defaults to false, meaning that if multiple hosts meet a criteria we pick one at random.
- * This is required by the replica set driver spec. Set this to true in tests that need host
- * selection to be deterministic.
- *
- * NOTE: Used by unit-tests only.
- */
- static bool useDeterministicHostSelection;
-
- static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500);
- static constexpr auto kCheckTimeout = Seconds(5);
-
- ScanningReplicaSetMonitor(const MongoURI& uri,
- std::shared_ptr<executor::TaskExecutor> executor,
- std::function<void()> cleanupCallback);
-
- void init() override;
-
- void drop() override;
-
- /**
- * NOTE: Cancellation via CancellationTokens is not implemented for the
- * ScanningReplicaSetMonitor, so any token passed in will be ignored.
- */
- SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
- const std::vector<HostAndPort>& excludedHosts,
- const CancellationToken&) override;
-
- /**
- * NOTE: Cancellation via CancellationTokens is not implemented for the
- * ScanningReplicaSetMonitor, so any token passed in will be ignored.
- */
- SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
- const ReadPreferenceSetting& readPref,
- const std::vector<HostAndPort>& excludedHosts,
- const CancellationToken&) override;
-
- HostAndPort getPrimaryOrUassert() override;
-
- /*
- * For the ScanningReplicaSetMonitor, all the failedHost methods are equivalent.
- */
- void failedHost(const HostAndPort& host, const Status& status) override;
-
- void failedHostPreHandshake(const HostAndPort& host,
- const Status& status,
- BSONObj bson) override;
-
- void failedHostPostHandshake(const HostAndPort& host,
- const Status& status,
- BSONObj bson) override;
-
- bool isPrimary(const HostAndPort& host) const override;
-
- bool isHostUp(const HostAndPort& host) const override;
-
- int getMinWireVersion() const override;
-
- int getMaxWireVersion() const override;
-
- std::string getName() const override;
-
- std::string getServerAddress() const override;
-
- const MongoURI& getOriginalUri() const override;
-
- bool contains(const HostAndPort& server) const override;
-
- void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override;
-
- bool isKnownToHaveGoodPrimary() const override;
-
- //
- // internal types (defined in scanning_replica_set_monitor_internal.h)
- //
-
- struct IsMasterReply;
- struct ScanState;
- struct SetState;
- typedef std::shared_ptr<ScanState> ScanStatePtr;
- typedef std::shared_ptr<SetState> SetStatePtr;
-
- /**
- * Allows tests to set initial conditions and introspect the current state.
- */
- ScanningReplicaSetMonitor(const SetStatePtr& initialState,
- std::function<void()> cleanupCallback);
- ~ScanningReplicaSetMonitor() override;
-
- /**
- * This is for use in tests using MockReplicaSet to ensure that a full scan completes before
- * continuing.
- */
- void runScanForMockReplicaSet();
-
- static void disableRefreshRetries_forTest();
-
- static bool areRefreshRetriesDisabledForTest();
-
-private:
- Future<std::vector<HostAndPort>> _getHostsOrRefresh(
- const ReadPreferenceSetting& readPref,
- Milliseconds maxWait,
- const std::vector<HostAndPort>& excludedHosts);
-
- /**
- * If no scan is in-progress, this function is responsible for setting up a new scan. Otherwise,
- * does nothing.
- */
- static void _ensureScanInProgress(const SetStatePtr&);
-
- /**
- * Returns the refresh period that is given to all new SetStates.
- */
- static Seconds _getRefreshPeriod();
-
- /**
- * Returns fail injected refresh period, if fail point is set.
- */
- static boost::optional<Seconds> _getFailInjectedRefreshPeriod();
-
- const SetStatePtr _state;
-};
-
-
-/**
- * Refreshes the local view of a replica set.
- *
- * All logic related to choosing the hosts to contact and updating the SetState based on replies
- * lives in this class. Use of this class should always be guarded by SetState::mutex unless in
- * single-threaded use by ScanningReplicaSetMonitorTest.
- */
-class ScanningReplicaSetMonitor::Refresher {
-public:
- explicit Refresher(const SetStatePtr& setState);
-
- struct NextStep {
- enum StepKind {
- CONTACT_HOST, /// Contact the returned host
- WAIT, /// Wait on condition variable and try again.
- DONE, /// No more hosts to contact in this Refresh round
- };
-
- explicit NextStep(StepKind step, const HostAndPort& host = HostAndPort())
- : step(step), host(host) {}
-
- StepKind step;
- HostAndPort host;
- };
-
- /**
- * Returns the next step to take.
- *
- * By calling this, you promise to call receivedIsMaster or failedHost if the NextStep is
- * CONTACT_HOST.
- */
- NextStep getNextStep();
-
- /**
- * Call this if a host returned from getNextStep successfully replied to an isMaster call.
- * Negative latencyMicros are ignored.
- */
- void receivedIsMaster(const HostAndPort& from, int64_t latencyMicros, const BSONObj& reply);
-
- /**
- * Call this if a host returned from getNextStep failed to reply to an isMaster call.
- */
- void failedHost(const HostAndPort& host, const Status& status);
-
- /**
- * Starts a new scan over the hosts in set.
- */
- void startNewScan();
-
- /**
- * First, checks that the "reply" is not from a stale primary by comparing the electionId of
- * "reply" to the maxElectionId recorded by the SetState and returns OK status if "reply"
- * belongs to a non-stale primary. Otherwise returns a failed status.
- *
- * The 'from' parameter specifies the node from which the response is received.
- *
- * Updates _set and _scan based on set-membership information from a master.
- * Applies _scan->unconfirmedReplies to confirmed nodes.
- * Does not update this host's node in _set->nodes.
- */
- Status receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply);
-
- /**
- * Schedules isMaster requests to all hosts that currently need to be contacted.
- * Does nothing if requests have already been sent to all known hosts.
- */
- void scheduleNetworkRequests();
-
- void scheduleIsMaster(const HostAndPort& host);
-
-private:
- // Both pointers are never NULL
- SetStatePtr _set;
- ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started.
-};
-
-} // namespace mongo
diff --git a/src/mongo/client/scanning_replica_set_monitor_internal.h b/src/mongo/client/scanning_replica_set_monitor_internal.h
deleted file mode 100644
index 9641143a3a3..00000000000
--- a/src/mongo/client/scanning_replica_set_monitor_internal.h
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-/**
- * This is an internal header.
- * This should only be included by replica_set_monitor.cpp and unittests.
- * This should never be included by any header.
- */
-
-#pragma once
-
-#include <cstdint>
-#include <deque>
-#include <set>
-#include <string>
-#include <vector>
-
-#include "mongo/client/read_preference.h"
-#include "mongo/client/scanning_replica_set_monitor.h"
-#include "mongo/db/jsobj.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/platform/random.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/util/net/hostandport.h"
-
-namespace mongo {
-
-struct ScanningReplicaSetMonitor::IsMasterReply {
- IsMasterReply() : ok(false) {}
- IsMasterReply(const HostAndPort& host, int64_t latencyMicros, const BSONObj& reply)
- : ok(false), host(host), latencyMicros(latencyMicros) {
- parse(reply);
- }
-
- /**
- * Never throws. If parsing fails for any reason, sets ok to false.
- */
- void parse(const BSONObj& obj);
-
- bool ok; // if false, ignore all other fields
- BSONObj raw; // Always owned. Other fields are allowed to be a view into this.
- std::string setName;
- bool isMaster;
- bool secondary;
- bool hidden;
- int configVersion{};
- OID electionId; // Set if this isMaster reply is from the primary
- HostAndPort primary; // empty if not present
- std::set<HostAndPort> members; // both "hosts" and "passives"
- std::set<HostAndPort> passives;
- BSONObj tags;
- int minWireVersion{};
- int maxWireVersion{};
-
- // remaining fields aren't in isMaster reply, but are known to caller.
- HostAndPort host;
- int64_t latencyMicros; // ignored if negative
- Date_t lastWriteDate{};
- repl::OpTime opTime{};
-};
-
-/**
- * The SetState is the underlying data object behind both the ScanningReplicaSetMonitor and the
- * Refresher
- *
- * Note that the SetState only holds its own lock in init() and drop(). Even those uses can probably
- * be offloaded to the RSM eventually. In all other cases, the RSM and RSM::Refresher use the
- * SetState lock to synchronize.
- */
-struct ScanningReplicaSetMonitor::SetState
- : public std::enable_shared_from_this<ScanningReplicaSetMonitor::SetState> {
- SetState(const SetState&) = delete;
- SetState& operator=(const SetState&) = delete;
-
-public:
- /**
- * Holds the state of a single node in the replicaSet
- */
- struct Node {
- explicit Node(const HostAndPort& host);
-
- void markFailed(const Status& status);
-
- bool matches(ReadPreference pref) const;
-
- /**
- * Checks if the given tag matches the tag attached to this node.
- *
- * Example:
- *
- * Tag of this node: { "dc": "nyc", "region": "na", "rack": "4" }
- *
- * match: {}
- * match: { "dc": "nyc", "rack": 4 }
- * match: { "region": "na", "dc": "nyc" }
- * not match: { "dc": "nyc", "rack": 2 }
- * not match: { "dc": "sf" }
- */
- bool matches(const BSONObj&) const;
-
- /**
- * Returns true if all of the tags in the tag set match node's tags
- */
- bool matches(const TagSet&) const;
-
- /**
- * Updates this Node based on information in reply. The reply must be from this host.
- */
- void update(const IsMasterReply& reply);
-
- HostAndPort host;
- bool isUp{false};
- bool isMaster{false};
- int64_t latencyMicros{};
- BSONObj tags; // owned
- int minWireVersion{};
- int maxWireVersion{};
- Date_t lastWriteDate{}; // from isMasterReply
- Date_t lastWriteDateUpdateTime{}; // set to the local system's time at the time of updating
- // lastWriteDate
- Date_t nextPossibleIsMasterCall{}; // time that previous isMaster check ended
- executor::TaskExecutor::CallbackHandle scheduledIsMasterHandle; //
- repl::OpTime opTime{}; // from isMasterReply
- };
-
- using Nodes = std::vector<Node>;
-
- struct Waiter {
- Date_t deadline;
- ReadPreferenceSetting criteria;
- std::vector<HostAndPort> excludedHosts;
- Promise<std::vector<HostAndPort>> promise;
- };
-
- SetState(const MongoURI& uri, ReplicaSetChangeNotifier*, executor::TaskExecutor*);
-
- bool isUsable() const;
-
- /**
- * Returns a host matching criteria or an empty host if no known host matches.
- *
- * Note: Uses only local data and does not go over the network.
- */
- std::vector<HostAndPort> getMatchingHosts(
- const ReadPreferenceSetting& criteria,
- const std::vector<HostAndPort>& excludedHosts = std::vector<HostAndPort>()) const;
-
- HostAndPort getMatchingHost(
- const ReadPreferenceSetting& criteria,
- const std::vector<HostAndPort>& excludedHosts = std::vector<HostAndPort>()) const;
-
- /**
- * Returns the Node with the given host, or NULL if no Node has that host.
- */
- Node* findNode(const HostAndPort& host);
-
- /**
- * Returns the Node with the given host, or creates one if no Node has that host.
- * Maintains the sorted order of nodes.
- */
- Node* findOrCreateNode(const HostAndPort& host);
-
- void updateNodeIfInNodes(const IsMasterReply& reply);
-
- /**
- * Returns the connection string of the nodes that are known the be in the set because we've
- * seen them in the isMaster reply of a PRIMARY.
- */
- ConnectionString confirmedConnectionString() const;
-
- /**
- * Returns the connection string of the nodes that are believed to be in the set because we've
- * seen them in the isMaster reply of non-PRIMARY nodes in our seed list.
- */
- ConnectionString possibleConnectionString() const;
-
- /**
- * Call this to notify waiters after a scan processes a valid reply, rescans, or finishes.
- */
- void notify();
-
- Date_t now() const {
- return executor ? executor->now() : Date_t::now();
- }
-
- Status makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const;
-
- // Tiny enum to convey semantics for rescheduleFefresh()
- enum class SchedulingStrategy {
- kKeepEarlyScan,
- kCancelPreviousScan,
- };
-
- /**
- * Schedules a refresh via the task executor and cancel any previous refresh.
- * (Task is automatically canceled in the d-tor.)
- */
- void rescheduleRefresh(SchedulingStrategy strategy);
-
- /**
- * Notifies all listeners that the ReplicaSet is in use.
- */
- void init();
-
- /**
- * Resets the current scan and notifies all listeners that the ReplicaSet isn't in use.
- */
- void drop();
-
- /**
- * Before unlocking, do `if (kDebugBuild) checkInvariants();`
- */
- void checkInvariants() const;
-
- /**
- * Wrap the callback and schedule it to run at some time
- *
- * The callback wrapper does the following:
- * * Return before running cb if isDropped is true
- * * Return before running cb if the handle was canceled
- * * Lock before running cb and unlock after
- */
- template <typename Callback>
- auto scheduleWorkAt(Date_t when, Callback&& cb) const;
-
- const MongoURI setUri; // URI passed to ctor -- THIS IS NOT UPDATED BY SCANS
- const std::string name;
-
- ReplicaSetChangeNotifier* const notifier;
- executor::TaskExecutor* const executor;
-
- bool isDropped = false;
-
- // You must hold this to access any member below.
- mutable Mutex mutex = MONGO_MAKE_LATCH("SetState::mutex");
-
- executor::TaskExecutor::CallbackHandle refresherHandle;
-
- // For starting scans
- std::set<HostAndPort> seedNodes; // updated whenever a master reports set membership changes
- ConnectionString seedConnStr; // The connection string from the last time we had valid seeds
- int64_t seedGen = 0;
-
- bool isMocked = false; // True if this set is using nodes from MockReplicaSet
-
- // For tracking scans
- // lastSeenMaster is empty if we have never seen a master or the last scan didn't have one
- HostAndPort lastSeenMaster;
- int consecutiveFailedScans = 0;
- Nodes nodes; // maintained sorted and unique by host
- ConnectionString workingConnStr; // The connection string from our last scan
-
- // For tracking replies
- OID maxElectionId; // largest election id observed by this ScanningReplicaSetMonitor
- int configVersion = 0; // version number of the replica set config.
-
- // For matching hosts
- int64_t latencyThresholdMicros = 0;
- mutable int roundRobin = 0; // used when useDeterministicHostSelection is true
- mutable PseudoRandom rand; // only used for host selection to balance load
-
- // For scheduling scans
- Seconds refreshPeriod; // Normal refresh period when not expedited
- bool isExpedited = false; // True when we are doing more frequent refreshes due to waiters
- std::list<Waiter> waiters; // Everyone waiting for some ReadPreference to be satisfied
- uint64_t nextScanId = 0; // The id for the next scan
- ScanStatePtr currentScan; // NULL if no scan in progress
- Date_t nextScanTime; // The time at which the next scan is scheduled to start
-};
-
-struct ScanningReplicaSetMonitor::ScanState {
- ScanState(const ScanState&) = delete;
- ScanState& operator=(const ScanState&) = delete;
-
-public:
- ScanState() = default;
-
- /**
- * Adds all hosts in container that aren't in triedHosts to hostsToScan, then shuffles the
- * queue.
- */
- template <typename Container>
- void enqueAllUntriedHosts(const Container& container, PseudoRandom& rand);
-
- /**
- * Adds all completed hosts back to hostsToScan and shuffles the queue.
- */
- void retryAllTriedHosts(PseudoRandom& rand);
-
- /**
- * A safe way to clear interrupted scans
- */
- void markHostsToScanAsTried() noexcept;
-
- // This is only for logging and should not affect behavior otherwise.
- Timer timer;
-
- // Access to fields is guarded by associated SetState's mutex.
- bool foundUpMaster = false;
- bool foundAnyUpNodes = false;
- std::deque<HostAndPort> hostsToScan; // Work queue.
- std::set<HostAndPort> possibleNodes; // Nodes reported by non-primary hosts.
- std::set<HostAndPort> waitingFor; // Hosts we have dispatched but haven't replied yet.
- std::set<HostAndPort> triedHosts; // Hosts that have been returned from getNextStep.
-
- // All responses go here until we find a master.
- typedef std::map<HostAndPort, IsMasterReply> UnconfirmedReplies;
- UnconfirmedReplies unconfirmedReplies;
-};
-
-} // namespace mongo