diff options
author | Davis Haupt <davis.haupt@mongodb.com> | 2022-01-19 19:41:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-01-19 20:17:16 +0000 |
commit | ac9d5b9ea14af74e9b4ba3bad952cf4442fa4b93 (patch) | |
tree | 560e6f5b2dd5662999b639bb02b8540ab9a14e7b /src/mongo/client | |
parent | d20d317c08fd48b71f790c1bb013aa61c6f0f4b7 (diff) | |
download | mongo-ac9d5b9ea14af74e9b4ba3bad952cf4442fa4b93.tar.gz |
SERVER-62079 removes rsm scanning
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.cpp | 64 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_server_parameters.cpp | 8 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_server_parameters.h | 2 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_server_parameters_test.cpp | 19 | ||||
-rw-r--r-- | src/mongo/client/scanning_replica_set_monitor.cpp | 1591 | ||||
-rw-r--r-- | src/mongo/client/scanning_replica_set_monitor.h | 257 | ||||
-rw-r--r-- | src/mongo/client/scanning_replica_set_monitor_internal.h | 337 |
8 files changed, 30 insertions, 2249 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 0f8a4903bfa..0b1f03cc2e1 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -220,7 +220,6 @@ clientDriverEnv.Library( 'replica_set_monitor.cpp', 'replica_set_monitor_manager.cpp', 'replica_set_monitor_stats.cpp', - 'scanning_replica_set_monitor.cpp', 'streamable_replica_set_monitor.cpp', 'streamable_replica_set_monitor_query_processor.cpp', 'streamable_replica_set_monitor_error_handler.cpp', diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index 3919437676a..f48832015cd 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -40,7 +40,6 @@ #include "mongo/client/mongo_uri.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/client/replica_set_monitor_server_parameters.h" -#include "mongo/client/scanning_replica_set_monitor.h" #include "mongo/client/streamable_replica_set_monitor.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface_factory.h" @@ -75,34 +74,31 @@ Status ReplicaSetMonitorManagerNetworkConnectionHook::validateHost( const HostAndPort& remoteHost, const BSONObj& isMasterRequest, const executor::RemoteCommandResponse& isMasterReply) { - if (gReplicaSetMonitorProtocol != ReplicaSetMonitorProtocol::kScanning) { - auto monitor = ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost); - if (!monitor) { - return Status::OK(); - } + auto monitor = ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost); + if (!monitor) { + return Status::OK(); + } - if (std::shared_ptr<StreamableReplicaSetMonitor> streamableMonitor = - std::dynamic_pointer_cast<StreamableReplicaSetMonitor>( - ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost))) { - - auto publisher = streamableMonitor->getEventsPublisher(); - if (publisher) { - try { - if (isMasterReply.status.isOK()) { - publisher->onServerHandshakeCompleteEvent( - *isMasterReply.elapsed, remoteHost, isMasterReply.data); - } else { - publisher->onServerHandshakeFailedEvent( - remoteHost, isMasterReply.status, isMasterReply.data); - } - } catch (const DBException& exception) { - LOGV2_ERROR(4712101, - "An error occurred publishing a ReplicaSetMonitor handshake event", - "error"_attr = exception.toStatus(), - "replicaSet"_attr = monitor->getName(), - "handshakeStatus"_attr = isMasterReply.status); - return exception.toStatus(); + if (auto streamableMonitor = std::dynamic_pointer_cast<StreamableReplicaSetMonitor>( + ReplicaSetMonitorManager::get()->getMonitorForHost(remoteHost))) { + + auto publisher = streamableMonitor->getEventsPublisher(); + if (publisher) { + try { + if (isMasterReply.status.isOK()) { + publisher->onServerHandshakeCompleteEvent( + *isMasterReply.elapsed, remoteHost, isMasterReply.data); + } else { + publisher->onServerHandshakeFailedEvent( + remoteHost, isMasterReply.status, isMasterReply.data); } + } catch (const DBException& exception) { + LOGV2_ERROR(4712101, + "An error occurred publishing a ReplicaSetMonitor handshake event", + "error"_attr = exception.toStatus(), + "replicaSet"_attr = monitor->getName(), + "handshakeStatus"_attr = isMasterReply.status); + return exception.toStatus(); } } } @@ -197,16 +193,10 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( "protocol"_attr = toString(gReplicaSetMonitorProtocol), "uri"_attr = uri.toString()); invariant(_taskExecutor); - if (gReplicaSetMonitorProtocol == ReplicaSetMonitorProtocol::kScanning) { - newMonitor = - std::make_shared<ScanningReplicaSetMonitor>(uri, _taskExecutor, cleanupCallback); - newMonitor->init(); - } else { - // Both ReplicaSetMonitorProtocol::kSdam and ReplicaSetMonitorProtocol::kStreamable use the - // StreamableReplicaSetMonitor. - newMonitor = StreamableReplicaSetMonitor::make( - uri, _taskExecutor, _getConnectionManager(), cleanupCallback, _stats); - } + + newMonitor = StreamableReplicaSetMonitor::make( + uri, _taskExecutor, _getConnectionManager(), cleanupCallback, _stats); + _monitors[setName] = newMonitor; _numMonitorsCreated++; return newMonitor; diff --git a/src/mongo/client/replica_set_monitor_server_parameters.cpp b/src/mongo/client/replica_set_monitor_server_parameters.cpp index 91ab6b5cc83..4d0c1dd4e0e 100644 --- a/src/mongo/client/replica_set_monitor_server_parameters.cpp +++ b/src/mongo/client/replica_set_monitor_server_parameters.cpp @@ -38,9 +38,7 @@ namespace mongo { ReplicaSetMonitorProtocol gReplicaSetMonitorProtocol{ReplicaSetMonitorProtocol::kStreamable}; std::string toString(ReplicaSetMonitorProtocol protocol) { - if (protocol == ReplicaSetMonitorProtocol::kScanning) { - return "scanning"; - } else if (protocol == ReplicaSetMonitorProtocol::kStreamable) { + if (protocol == ReplicaSetMonitorProtocol::kStreamable) { return "streamable"; } else { return "sdam"; @@ -54,9 +52,7 @@ void RSMProtocolServerParameter::append(OperationContext*, } Status RSMProtocolServerParameter::setFromString(const std::string& protocolStr) { - if (protocolStr == toString(ReplicaSetMonitorProtocol::kScanning)) { - gReplicaSetMonitorProtocol = ReplicaSetMonitorProtocol::kScanning; - } else if (protocolStr == toString(ReplicaSetMonitorProtocol::kStreamable)) { + if (protocolStr == toString(ReplicaSetMonitorProtocol::kStreamable)) { gReplicaSetMonitorProtocol = ReplicaSetMonitorProtocol::kStreamable; } else if (protocolStr == toString(ReplicaSetMonitorProtocol::kSdam)) { gReplicaSetMonitorProtocol = ReplicaSetMonitorProtocol::kSdam; diff --git a/src/mongo/client/replica_set_monitor_server_parameters.h b/src/mongo/client/replica_set_monitor_server_parameters.h index 0a900b7ddf9..40bcf3f7016 100644 --- a/src/mongo/client/replica_set_monitor_server_parameters.h +++ b/src/mongo/client/replica_set_monitor_server_parameters.h @@ -35,7 +35,7 @@ namespace mongo { -enum class ReplicaSetMonitorProtocol { kScanning, kSdam, kStreamable }; +enum class ReplicaSetMonitorProtocol { kSdam, kStreamable }; extern ReplicaSetMonitorProtocol gReplicaSetMonitorProtocol; std::string toString(ReplicaSetMonitorProtocol protocol); diff --git a/src/mongo/client/replica_set_monitor_server_parameters_test.cpp b/src/mongo/client/replica_set_monitor_server_parameters_test.cpp index d52fb788094..7a994cf3f95 100644 --- a/src/mongo/client/replica_set_monitor_server_parameters_test.cpp +++ b/src/mongo/client/replica_set_monitor_server_parameters_test.cpp @@ -32,7 +32,6 @@ #include "mongo/client/replica_set_monitor.h" #include "mongo/client/replica_set_monitor_protocol_test_util.h" #include "mongo/client/replica_set_monitor_server_parameters.h" -#include "mongo/client/scanning_replica_set_monitor.h" #include "mongo/client/streamable_replica_set_monitor.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -77,23 +76,5 @@ TEST_F(ReplicaSetMonitorProtocolTest, checkRSMProtocolParamSdam) { } #endif -/** - * Checks that a ScanningReplicaSetMonitor is created when the replicaSetMonitorProtocol server - * parameter is set to 'scanning'. - */ -TEST_F(ReplicaSetMonitorProtocolTest, checkRSMProtocolParamScanning) { - ReplicaSetMonitorProtocolTestUtil::setRSMProtocol(ReplicaSetMonitorProtocol::kScanning); - auto uri = MongoURI::parse("mongodb:a,b,c/?replicaSet=name"); - ASSERT_OK(uri.getStatus()); - auto createdMonitor = ReplicaSetMonitor::createIfNeeded(uri.getValue()); - - // If the created monitor does not point to a ScanningReplicaSetMonitor, the cast returns a - // nullptr. - auto scanningMonitorCast = dynamic_cast<ScanningReplicaSetMonitor*>(createdMonitor.get()); - ASSERT(scanningMonitorCast); - - auto streamableMonitorCast = dynamic_cast<StreamableReplicaSetMonitor*>(createdMonitor.get()); - ASSERT_FALSE(streamableMonitorCast); -} } // namespace } // namespace mongo diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp deleted file mode 100644 index b72cf67ab59..00000000000 --- a/src/mongo/client/scanning_replica_set_monitor.cpp +++ /dev/null @@ -1,1591 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include "mongo/client/scanning_replica_set_monitor.h" - -#include <algorithm> -#include <limits> -#include <random> - -#include "mongo/bson/simple_bsonelement_comparator.h" -#include "mongo/client/connpool.h" -#include "mongo/client/global_conn_pool.h" -#include "mongo/client/read_preference.h" -#include "mongo/client/scanning_replica_set_monitor_internal.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/bson_extract_optime.h" -#include "mongo/db/server_options.h" -#include "mongo/logv2/log.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/platform/mutex.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/util/background.h" -#include "mongo/util/debug_util.h" -#include "mongo/util/exit.h" -#include "mongo/util/fail_point.h" -#include "mongo/util/string_map.h" -#include "mongo/util/timer.h" - -namespace mongo { - -using std::numeric_limits; -using std::set; -using std::shared_ptr; -using std::string; -using std::vector; - -namespace { -MONGO_FAIL_POINT_DEFINE(scanningServerSelectorIgnoreLatencyWindow); - -// Pull nested types to top-level scope -typedef ScanningReplicaSetMonitor::IsMasterReply IsMasterReply; -typedef ScanningReplicaSetMonitor::ScanState ScanState; -typedef ScanningReplicaSetMonitor::ScanStatePtr ScanStatePtr; -typedef ScanningReplicaSetMonitor::SetState SetState; -typedef ScanningReplicaSetMonitor::SetStatePtr SetStatePtr; -typedef ScanningReplicaSetMonitor::Refresher Refresher; -typedef ScanState::UnconfirmedReplies UnconfirmedReplies; -typedef SetState::Node Node; -typedef SetState::Nodes Nodes; -using executor::TaskExecutor; -using CallbackArgs = TaskExecutor::CallbackArgs; -using CallbackHandle = TaskExecutor::CallbackHandle; - -// Intentionally chosen to compare worse than all known latencies. -const int64_t unknownLatency = numeric_limits<int64_t>::max(); - -const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet()); - -// -// Helpers for stl algorithms -// - -bool isMaster(const Node& node) { - return node.isMaster; -} - -bool opTimeGreater(const Node* lhs, const Node* rhs) { - return lhs->opTime > rhs->opTime; -} - -bool compareLatencies(const Node* lhs, const Node* rhs) { - // NOTE: this automatically compares Node::unknownLatency worse than all others. - return lhs->latencyMicros < rhs->latencyMicros; -} - -bool hostsEqual(const Node& lhs, const HostAndPort& rhs) { - return lhs.host == rhs; -} - -// Allows comparing two Nodes, or a HostAndPort and a Node. -// NOTE: the two HostAndPort overload is only needed to support extra checks in some STL -// implementations. For simplicity, no comparator should be used with collections of just -// HostAndPort. -struct CompareHosts { - bool operator()(const Node& lhs, const Node& rhs) { - return lhs.host < rhs.host; - } - bool operator()(const Node& lhs, const HostAndPort& rhs) { - return lhs.host < rhs; - } - bool operator()(const HostAndPort& lhs, const Node& rhs) { - return lhs < rhs.host; - } - bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) { - return lhs < rhs; - } -} compareHosts; // like an overloaded function, but able to pass to stl algorithms - -// The following structs should be treated as functions returning a UnaryPredicate. -// Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost)); -// They all hold their constructor argument by reference. - -struct HostIs { - explicit HostIs(const HostAndPort& host) : _host(host) {} - bool operator()(const HostAndPort& host) { - return host == _host; - } - bool operator()(const Node& node) { - return node.host == _host; - } - const HostAndPort& _host; -}; - -struct HostNotIn { - explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {} - bool operator()(const HostAndPort& host) { - return !_hosts.count(host); - } - bool operator()(const Node& node) { - return !_hosts.count(node.host); - } - const std::set<HostAndPort>& _hosts; -}; - -int32_t pingTimeMillis(const Node& node) { - auto latencyMillis = node.latencyMicros / 1000; - if (latencyMillis > numeric_limits<int32_t>::max()) { - // In particular, Node::unknownLatency does not fit in an int32. - return numeric_limits<int32_t>::max(); - } - return latencyMillis; -} - -/** - * Replica set refresh period on the task executor. - */ -const Seconds kDefaultRefreshPeriod(30); -} // namespace - -// Defaults to random selection as required by the spec -bool ScanningReplicaSetMonitor::useDeterministicHostSelection = false; - -ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const SetStatePtr& initialState, - std::function<void()> cleanupCallback) - : ReplicaSetMonitor(cleanupCallback), _state(initialState) {} - -ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const MongoURI& uri, - std::shared_ptr<TaskExecutor> executor, - std::function<void()> cleanupCallback) - : ScanningReplicaSetMonitor( - std::make_shared<SetState>( - uri, &ReplicaSetMonitorManager::get()->getNotifier(), executor.get()), - cleanupCallback) {} - -void ScanningReplicaSetMonitor::init() { - if (areRefreshRetriesDisabledForTest()) { - // This is for MockReplicaSet. Those tests want to control when scanning happens. - LOGV2_WARNING(24088, - "*** Not starting background refresh because refresh retries are disabled."); - return; - } - - { - stdx::lock_guard lk(_state->mutex); - _state->init(); - } -} - -void ScanningReplicaSetMonitor::drop() { - { - stdx::lock_guard lk(_state->mutex); - _state->drop(); - } -} - -ScanningReplicaSetMonitor::~ScanningReplicaSetMonitor() { - // `drop` is idempotent and a duplicate call from ReplicaSetMonitorManager::removeMonitor() is - // safe. - drop(); -} - -template <typename Callback> -auto ScanningReplicaSetMonitor::SetState::scheduleWorkAt(Date_t when, Callback&& cb) const { - auto wrappedCallback = [cb = std::forward<Callback>(cb), - anchor = shared_from_this()](const CallbackArgs& cbArgs) mutable { - if (ErrorCodes::isCancellationError(cbArgs.status)) { - // Do no more work if we're removed or canceled - return; - } - invariant(cbArgs.status); - - stdx::lock_guard lk(anchor->mutex); - if (anchor->isDropped) { - return; - } - - cb(cbArgs); - }; - return executor->scheduleWorkAt(std::move(when), std::move(wrappedCallback)); -} - -void ScanningReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy strategy) { - // Reschedule the refresh - - if (!executor || isMocked) { - // Without an executor, we can't do refreshes -- we're in a test - return; - } - - if (isDropped) { // already removed so no need to refresh - LOGV2_DEBUG(24070, - 1, - "Stopping refresh for replica set {replicaSet} because it's removed", - "Stopping refresh for replica set because it's removed", - "replicaSet"_attr = name); - return; - } - - Milliseconds period = _getRefreshPeriod(); // Supports fail injection. - if (isExpedited) { - period = std::min<Milliseconds>(period, kExpeditedRefreshPeriod); - } - - auto currentTime = now(); - auto possibleNextScanTime = currentTime + period; - if (refresherHandle && // - (strategy == SchedulingStrategy::kKeepEarlyScan) && // - (nextScanTime > currentTime) && // - (possibleNextScanTime >= nextScanTime)) { - // If the next scan would be sooner than our desired, why cancel? - return; - } - - // Cancel out the last refresh - if (auto currentHandle = std::exchange(refresherHandle, {})) { - executor->cancel(currentHandle); - } - - nextScanTime = possibleNextScanTime; - LOGV2_DEBUG(24071, - 1, - "Next replica set scan scheduled for {nextScanTime}", - "Next replica set scan scheduled", - "nextScanTime"_attr = nextScanTime); - auto swHandle = scheduleWorkAt(nextScanTime, [this](const CallbackArgs& cbArgs) { - if (cbArgs.myHandle != refresherHandle) - return; // We've been replaced! - - // It is possible that a waiter will have already expired by the point of this rescan. - // Thus we notify here to trigger that logic. - notify(); - - _ensureScanInProgress(shared_from_this()); - - // And now we set up the next one - rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan); - }); - - if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) { - LOGV2_DEBUG(24072, - 1, - "Can't schedule refresh for {replicaSet}. Executor shutdown in progress", - "Can't schedule refresh for replica set. Executor shutdown in progress", - "replicaSet"_attr = name); - return; - } - - if (!swHandle.isOK()) { - LOGV2_FATAL(40140, - "Can't continue refresh for replica set {replicaSet}: {error}", - "Can't continue refresh for replica set", - "error"_attr = redact(swHandle.getStatus()), - "replicaSet"_attr = name); - } - - refresherHandle = std::move(swHandle.getValue()); -} - -SemiFuture<HostAndPort> ScanningReplicaSetMonitor::getHostOrRefresh( - const ReadPreferenceSetting& criteria, - const std::vector<HostAndPort>& excludedHosts, - const CancellationToken&) { - return _getHostsOrRefresh( - criteria, ReplicaSetMonitorInterface::kDefaultFindHostTimeout, excludedHosts) - .then([](const auto& hosts) { - invariant(hosts.size()); - return hosts[0]; - }) - .semi(); -} - -SemiFuture<std::vector<HostAndPort>> ScanningReplicaSetMonitor::getHostsOrRefresh( - const ReadPreferenceSetting& criteria, - const std::vector<HostAndPort>& excludedHosts, - const CancellationToken&) { - return _getHostsOrRefresh( - criteria, ReplicaSetMonitorInterface::kDefaultFindHostTimeout, excludedHosts) - .semi(); -} - -Future<std::vector<HostAndPort>> ScanningReplicaSetMonitor::_getHostsOrRefresh( - const ReadPreferenceSetting& criteria, - Milliseconds maxWait, - const std::vector<HostAndPort>& excludedHosts) { - - stdx::lock_guard<Latch> lk(_state->mutex); - if (_state->isDropped) { - return Status(ErrorCodes::ShutdownInProgress, - str::stream() - << "ScanningReplicaSetMonitor for set " << getName() << " is removed"); - } - - auto out = _state->getMatchingHosts(criteria, excludedHosts); - if (!out.empty()) - return {std::move(out)}; - - // Fail early without doing any more work if the timeout has already expired. - if (maxWait <= Milliseconds(0)) - return _state->makeUnsatisfedReadPrefError(criteria); - - // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is - // dealing with maxWait. - auto pf = makePromiseFuture<decltype(out)>(); - _state->waiters.emplace_back( - SetState::Waiter{_state->now() + maxWait, criteria, excludedHosts, std::move(pf.promise)}); - - // This must go after we set up the wait state to correctly handle unittests using - // MockReplicaSet. - _ensureScanInProgress(_state); - - // Switch to expedited scanning. - _state->isExpedited = true; - _state->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan); - - return std::move(pf.future); -} - -HostAndPort ScanningReplicaSetMonitor::getPrimaryOrUassert() { - return ReplicaSetMonitorInterface::getHostOrRefresh(kPrimaryOnlyReadPreference, - CancellationToken::uncancelable()) - .get(); -} - -void ScanningReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) { - stdx::lock_guard<Latch> lk(_state->mutex); - Node* node = _state->findNode(host); - if (node) - node->markFailed(status); - if (kDebugBuild) - _state->checkInvariants(); -} - -void ScanningReplicaSetMonitor::failedHostPreHandshake(const HostAndPort& host, - const Status& status, - BSONObj bson) { - failedHost(host, status); -} - -void ScanningReplicaSetMonitor::failedHostPostHandshake(const HostAndPort& host, - const Status& status, - BSONObj bson) { - failedHost(host, status); -} - -bool ScanningReplicaSetMonitor::isPrimary(const HostAndPort& host) const { - stdx::lock_guard<Latch> lk(_state->mutex); - Node* node = _state->findNode(host); - return node ? node->isMaster : false; -} - -bool ScanningReplicaSetMonitor::isHostUp(const HostAndPort& host) const { - stdx::lock_guard<Latch> lk(_state->mutex); - Node* node = _state->findNode(host); - return node ? node->isUp : false; -} - -int ScanningReplicaSetMonitor::getMinWireVersion() const { - stdx::lock_guard<Latch> lk(_state->mutex); - int minVersion = 0; - for (const auto& host : _state->nodes) { - if (host.isUp) { - minVersion = std::max(minVersion, host.minWireVersion); - } - } - - return minVersion; -} - -int ScanningReplicaSetMonitor::getMaxWireVersion() const { - stdx::lock_guard<Latch> lk(_state->mutex); - int maxVersion = std::numeric_limits<int>::max(); - for (const auto& host : _state->nodes) { - if (host.isUp) { - maxVersion = std::min(maxVersion, host.maxWireVersion); - } - } - - return maxVersion; -} - -std::string ScanningReplicaSetMonitor::getName() const { - // name is const so don't need to lock - return _state->name; -} - -std::string ScanningReplicaSetMonitor::getServerAddress() const { - stdx::lock_guard<Latch> lk(_state->mutex); - // We return our setUri until first confirmation - return _state->seedConnStr.isValid() ? _state->seedConnStr.toString() - : _state->setUri.connectionString().toString(); -} - -const MongoURI& ScanningReplicaSetMonitor::getOriginalUri() const { - // setUri is const so no need to lock. - return _state->setUri; -} - -bool ScanningReplicaSetMonitor::contains(const HostAndPort& host) const { - stdx::lock_guard<Latch> lk(_state->mutex); - return _state->seedNodes.count(host); -} - -// TODO move to correct order with non-statics before pushing -void ScanningReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const { - stdx::lock_guard<Latch> lk(_state->mutex); - - BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName())); - if (forFTDC) { - for (size_t i = 0; i < _state->nodes.size(); i++) { - const Node& node = _state->nodes[i]; - monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node)); - } - return; - } - - // NOTE: the format here must be consistent for backwards compatibility - BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts")); - for (size_t i = 0; i < _state->nodes.size(); i++) { - const Node& node = _state->nodes[i]; - - BSONObjBuilder builder; - builder.append("addr", node.host.toString()); - builder.append("ok", node.isUp); - builder.append("ismaster", node.isMaster); // intentionally not camelCase - builder.append("hidden", false); // we don't keep hidden nodes in the set - builder.append("secondary", node.isUp && !node.isMaster); - builder.append("pingTimeMillis", pingTimeMillis(node)); - - if (!node.tags.isEmpty()) { - builder.append("tags", node.tags); - } - - hosts.append(builder.obj()); - } -} -bool ScanningReplicaSetMonitor::isKnownToHaveGoodPrimary() const { - stdx::lock_guard<Latch> lk(_state->mutex); - - for (const auto& node : _state->nodes) { - if (node.isMaster) { - return true; - } - } - - return false; -} - -Seconds ScanningReplicaSetMonitor::_getRefreshPeriod() { - boost::optional<Seconds> failInjected = _getFailInjectedRefreshPeriod(); - return failInjected ? *failInjected : kDefaultRefreshPeriod; -} - -boost::optional<Seconds> ScanningReplicaSetMonitor::_getFailInjectedRefreshPeriod() { - boost::optional<Seconds> result; - static constexpr auto kPeriodField = "period"_sd; - globalFailPointRegistry() - .find("modifyReplicaSetMonitorDefaultRefreshPeriod") - ->executeIf( - [&result](const BSONObj& data) { result = Seconds{data.getIntField(kPeriodField)}; }, - [](const BSONObj& data) { return data.hasField(kPeriodField); }); - return result; -} - -void ScanningReplicaSetMonitor::runScanForMockReplicaSet() { - stdx::lock_guard<Latch> lk(_state->mutex); - _ensureScanInProgress(_state); - - // This function should only be called from tests using MockReplicaSet and they should use the - // synchronous path to complete before returning. - invariant(_state->currentScan == nullptr); -} - -namespace { -AtomicWord<bool> refreshRetriesDisabledForTest{false}; -} // namespace - -void ScanningReplicaSetMonitor::disableRefreshRetries_forTest() { - refreshRetriesDisabledForTest.store(true); -} - -bool ScanningReplicaSetMonitor::areRefreshRetriesDisabledForTest() { - return refreshRetriesDisabledForTest.load(); -} - -void ScanningReplicaSetMonitor::_ensureScanInProgress(const SetStatePtr& state) { - Refresher(state).scheduleNetworkRequests(); -} - -Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) { - if (_scan) { - _set->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan); - _scan->retryAllTriedHosts(_set->rand); - return; // participate in in-progress scan - } - - startNewScan(); -} - -void Refresher::scheduleNetworkRequests() { - for (auto ns = getNextStep(); ns.step == NextStep::CONTACT_HOST; ns = getNextStep()) { - if (!_set->executor || _set->isMocked) { - // If we're mocked, just schedule an isMaster - scheduleIsMaster(ns.host); - continue; - } - - // cancel any scheduled isMaster calls that haven't yet been called - Node* node = _set->findOrCreateNode(ns.host); - if (auto handle = std::exchange(node->scheduledIsMasterHandle, {})) { - _set->executor->cancel(handle); - } - - // ensure that the call to isMaster is scheduled at most every 500ms - auto swHandle = - _set->scheduleWorkAt(node->nextPossibleIsMasterCall, - [*this, host = ns.host](const CallbackArgs& cbArgs) mutable { - scheduleIsMaster(host); - }); - - if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) { - _scan->markHostsToScanAsTried(); - break; - } - - if (!swHandle.isOK()) { - LOGV2_FATAL(31176, - "Can't continue scan for replica set {replicaSet}: {error}", - "Can't continue scan for replica set", - "error"_attr = redact(swHandle.getStatus()), - "replicaSet"_attr = _set->name); - } - - node->scheduledIsMasterHandle = uassertStatusOK(std::move(swHandle)); - } - - if (kDebugBuild) - _set->checkInvariants(); -} - -void Refresher::scheduleIsMaster(const HostAndPort& host) { - if (_set->isMocked) { - // MockReplicaSet only works with DBClient-style access since it injects itself into the - // ScopedDbConnection pool connection creation. - try { - ScopedDbConnection conn(ConnectionString(host), kCheckTimeout.count()); - - auto timer = Timer(); - auto reply = BSONObj(); - bool ignoredOutParam = false; - conn->isPrimary(ignoredOutParam, &reply); - conn.done(); // return to pool on success. - - receivedIsMaster(host, timer.micros(), reply); - } catch (DBException& ex) { - failedHost(host, ex.toStatus()); - } - - return; - } - - BSONObjBuilder bob; - bob.append("isMaster", 1); - if (auto wireSpec = WireSpec::instance().get(); wireSpec->isInternalClient) { - WireSpec::appendInternalClientWireVersion(wireSpec->outgoing, &bob); - } - auto request = executor::RemoteCommandRequest(host, "admin", bob.obj(), nullptr, kCheckTimeout); - request.sslMode = _set->setUri.getSSLMode(); - - auto status = - _set->executor - ->scheduleRemoteCommand( - std::move(request), - [copy = *this, host, timer = Timer()]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { - stdx::lock_guard lk(copy._set->mutex); - // Ignore the reply and return if we are no longer the current scan. This might - // happen if it was decided that the host we were contacting isn't part of the - // set. - if (copy._scan != copy._set->currentScan) { - return; - } - - // ensure that isMaster calls occur at most 500ms after the previous call ended - if (auto node = copy._set->findNode(host)) { - node->nextPossibleIsMasterCall = - copy._set->executor->now() + Milliseconds(500); - } - - if (result.response.isOK()) { - // Not using result.response.elapsedMillis because higher precision is - // useful for computing the rolling average. - copy.receivedIsMaster(host, timer.micros(), result.response.data); - } else { - copy.failedHost(host, result.response.status); - } - - // This reply may have discovered new hosts to contact so we need to schedule - // them. - copy.scheduleNetworkRequests(); - }) - .getStatus(); - - if (!status.isOK()) { - failedHost(host, status); - // This is only called from scheduleNetworkRequests() which will still be looping, so we - // don't need to call it here after updating the state. - } -} - -Refresher::NextStep Refresher::getNextStep() { - // No longer the current scan - if (_scan != _set->currentScan) { - return NextStep(NextStep::DONE); - } - - // Wait for all dispatched hosts to return before trying any fallback hosts. - if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) { - return NextStep(NextStep::WAIT); - } - - // If we haven't yet found a master, try contacting unconfirmed hosts - if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) { - _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand); - _scan->possibleNodes.clear(); - } - - if (_scan->hostsToScan.empty()) { - // We've tried all hosts we can, so nothing more to do in this round. - if (!_scan->foundUpMaster) { - LOGV2_WARNING(24089, - "Unable to reach primary for replica set {replicaSet}", - "Unable to reach primary for replica set", - "replicaSet"_attr = _set->name); - - // Since we've talked to everyone we could but still didn't find a primary, we - // do the best we can, and assume all unconfirmedReplies are actually from nodes - // in the set (we've already confirmed that they think they are). This is - // important since it allows us to bootstrap to a usable state even if we are - // unable to talk to a master while starting up. As soon as we are able to - // contact a master, we will remove any nodes that it doesn't think are part of - // the set, undoing the damage we cause here. - - // NOTE: we don't modify seedNodes or notify about set membership change in this - // case since it hasn't been confirmed by a master. - for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); - it != _scan->unconfirmedReplies.end(); - ++it) { - _set->findOrCreateNode(it->first)->update(it->second); - } - - auto connStr = _set->possibleConnectionString(); - if (connStr != _set->workingConnStr) { - _set->workingConnStr = std::move(connStr); - _set->notifier->onPossibleSet(_set->workingConnStr); - } - - // If at some point we care about lacking a primary, on it here - _set->lastSeenMaster = {}; - } - - if (_scan->foundAnyUpNodes) { - _set->consecutiveFailedScans = 0; - } else { - auto nScans = _set->consecutiveFailedScans++; - if (nScans <= 10 || nScans % 10 == 0) { - LOGV2(24073, - "Cannot reach any nodes for replica set {replicaSet}. Please check network " - "connectivity and the status of the set. This has happened for " - "{numConsecutiveFailedScans} checks in a row.", - "Cannot reach any nodes for replica set. Please check network connectivity " - "and the status of the set. A number of consecutive scan checks have failed", - "replicaSet"_attr = _set->name, - "numConsecutiveFailedScans"_attr = _set->consecutiveFailedScans); - } - } - - // Makes sure all other Refreshers in this round return DONE - _set->currentScan.reset(); - _set->notify(); - - LOGV2_DEBUG(24074, - 1, - "Refreshing replica set {replicaSet} took {duration}", - "Replica set refreshed", - "duration"_attr = Milliseconds(_scan->timer.millis()), - "replicaSet"_attr = _set->name); - - return NextStep(NextStep::DONE); - } - - // Pop and return the next hostToScan. - HostAndPort host = _scan->hostsToScan.front(); - _scan->hostsToScan.pop_front(); - _scan->waitingFor.insert(host); - _scan->triedHosts.insert(host); - - return NextStep(NextStep::CONTACT_HOST, host); -} - -void Refresher::receivedIsMaster(const HostAndPort& from, - int64_t latencyMicros, - const BSONObj& replyObj) { - _scan->waitingFor.erase(from); - - const IsMasterReply reply(from, latencyMicros, replyObj); - - // Handle various failure cases - if (!reply.ok) { - failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"}); - return; - } - - if (reply.setName != _set->name) { - if (reply.raw["isreplicaset"].trueValue()) { - // The reply came from a node in the state referred to as RSGhost in the SDAM - // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event, - // if a reply from a ghost offers a list of possible other members of the replica set, - // and if this refresher has yet to find the replica set master, we add hosts listed in - // the reply to the list of possible replica set members. - if (!_scan->foundUpMaster) { - _scan->possibleNodes.insert(reply.members.begin(), reply.members.end()); - } - } else { - LOGV2_ERROR( - 24091, - "replset name mismatch: expected {expectedSetName}, but remote node " - "{fromRemoteNode} has replset name {receivedSetName}, isMaster: {isMasterReply}", - "replset name mismatch. The expected set name does not match the set name " - "received from the remote node", - "expectedSetName"_attr = _set->name, - "fromRemoteNode"_attr = from, - "receivedSetName"_attr = reply.setName, - "isMasterReply"_attr = replyObj); - } - - failedHost(from, - {ErrorCodes::InconsistentReplicaSetNames, - str::stream() << "Target replica set name " << reply.setName - << " does not match the monitored set name " << _set->name}); - return; - } - - if (reply.isMaster) { - Status status = receivedIsMasterFromMaster(from, reply); - if (!status.isOK()) { - failedHost(from, status); - return; - } - } - - if (_scan->foundUpMaster) { - // We only update a Node if a master has confirmed it is in the set. - _set->updateNodeIfInNodes(reply); - } else { - // Populate possibleNodes. - _scan->possibleNodes.insert(reply.members.begin(), reply.members.end()); - _scan->unconfirmedReplies[from] = reply; - } - - // _set->nodes may still not have any nodes with isUp==true, but we have at least found a - // connectible host that is that claims to be in the set. - _scan->foundAnyUpNodes = true; - - _set->notify(); - - if (kDebugBuild) - _set->checkInvariants(); -} - -void Refresher::failedHost(const HostAndPort& host, const Status& status) { - _scan->waitingFor.erase(host); - - Node* node = _set->findNode(host); - if (node) - node->markFailed(status); - - if (_scan->waitingFor.empty()) { - // If this was the last host that needed a response, we should notify the SetState so that - // we can fail any waiters that have timed out. - _set->notify(); - } -} - -void Refresher::startNewScan() { - // The heuristics we use in deciding the order to contact hosts are designed to find a - // master as quickly as possible. This is because we can't use any hosts we find until - // we either get the latest set of members from a master or talk to all possible hosts - // without finding a master. - - // TODO It might make sense to check down nodes first if the last seen master is still - // marked as up. - - _scan = std::make_shared<ScanState>(); - _set->currentScan = _scan; - - int upNodes = 0; - for (Nodes::const_iterator it(_set->nodes.begin()), end(_set->nodes.end()); it != end; ++it) { - if (it->isUp) { - // _scan the nodes we think are up first - _scan->hostsToScan.push_front(it->host); - upNodes++; - } else { - _scan->hostsToScan.push_back(it->host); - } - } - - // shuffle the queue, but keep "up" nodes at the front - std::shuffle( - _scan->hostsToScan.begin(), _scan->hostsToScan.begin() + upNodes, _set->rand.urbg()); - std::shuffle(_scan->hostsToScan.begin() + upNodes, _scan->hostsToScan.end(), _set->rand.urbg()); - - if (!_set->lastSeenMaster.empty()) { - // move lastSeenMaster to front of queue - std::stable_partition( - _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(_set->lastSeenMaster)); - } -} - -Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) { - invariant(reply.isMaster); - - // Reject if config version is older. This is for backwards compatibility with nodes in pv0 - // since they don't have the same ordering with pv1 electionId. - if (reply.configVersion < _set->configVersion) { - return { - ErrorCodes::NotWritablePrimary, - str::stream() << "Node " << from << " believes it is primary, but its config version " - << reply.configVersion << " is older than the most recent config version " - << _set->configVersion}; - } - - if (reply.electionId.isSet()) { - // ElectionIds are only comparable if they are of the same protocol version. However, since - // isMaster has no protocol version field, we use the configVersion instead. This works - // because configVersion needs to be incremented whenever the protocol version is changed. - if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() && - _set->maxElectionId.compare(reply.electionId) > 0) { - return { - ErrorCodes::NotWritablePrimary, - str::stream() << "Node " << from << " believes it is primary, but its election id " - << reply.electionId << " is older than the most recent election id " - << _set->maxElectionId}; - } - - _set->maxElectionId = reply.electionId; - } - - _set->configVersion = reply.configVersion; - - // Mark all nodes as not master. We will mark ourself as master before releasing the lock. - // NOTE: we use a "last-wins" policy if multiple hosts claim to be master. - for (size_t i = 0; i < _set->nodes.size(); i++) { - _set->nodes[i].isMaster = false; - } - - // Check if the master agrees with our current list of nodes. - // REMINDER: both _set->nodes and reply.members are sorted. - if (_set->nodes.size() != reply.members.size() || - !std::equal(_set->nodes.begin(), _set->nodes.end(), reply.members.begin(), hostsEqual)) { - LOGV2_DEBUG(24075, - 2, - "Adjusting nodes in our view of replica set {replicaSet} based on isMaster " - "reply: {isMasterReply}", - "Adjusting nodes in our view of the replica set based on isMaster reply", - "replicaSet"_attr = _set->name, - "isMasterReply"_attr = redact(reply.raw)); - - // remove non-members from _set->nodes - _set->nodes.erase( - std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.members)), - _set->nodes.end()); - - // add new members to _set->nodes - for (auto& host : reply.members) { - _set->findOrCreateNode(host); - } - - // replace hostToScan queue with untried normal hosts. can both add and remove - // hosts from the queue. - _scan->hostsToScan.clear(); - _scan->enqueAllUntriedHosts(reply.members, _set->rand); - - if (!_scan->waitingFor.empty()) { - // make sure we don't wait for any hosts that aren't considered members - std::set<HostAndPort> newWaitingFor; - std::set_intersection(reply.members.begin(), - reply.members.end(), - _scan->waitingFor.begin(), - _scan->waitingFor.end(), - std::inserter(newWaitingFor, newWaitingFor.end())); - _scan->waitingFor.swap(newWaitingFor); - } - } - - bool changedHosts = reply.members != _set->seedNodes; - bool changedPrimary = reply.host != _set->lastSeenMaster; - if (changedHosts || changedPrimary) { - ++_set->seedGen; - _set->seedNodes = reply.members; - _set->seedConnStr = _set->confirmedConnectionString(); - - // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare - // and we want to record our changes - LOGV2(24076, - "Confirmed replica set for {replicaSet} is {connectionString}", - "Confirmed replica set", - "replicaSet"_attr = _set->name, - "connectionString"_attr = _set->seedConnStr); - - _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host, reply.passives); - } - - // Update our working string - _set->workingConnStr = _set->seedConnStr; - - // Update other nodes's information based on replies we've already seen - for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); - it != _scan->unconfirmedReplies.end(); - ++it) { - // this ignores replies from hosts not in _set->nodes (as modified above) - _set->updateNodeIfInNodes(it->second); - } - _scan->unconfirmedReplies.clear(); - - _scan->foundUpMaster = true; - _set->lastSeenMaster = reply.host; - - return Status::OK(); -} - -void IsMasterReply::parse(const BSONObj& obj) { - try { - raw = obj.getOwned(); // don't use obj again after this line - - ok = raw["ok"].trueValue(); - if (!ok) - return; - - setName = raw["setName"].str(); - hidden = raw["hidden"].trueValue(); - secondary = raw["secondary"].trueValue(); - - minWireVersion = raw["minWireVersion"].numberInt(); - maxWireVersion = raw["maxWireVersion"].numberInt(); - - // hidden nodes can't be master, even if they claim to be. - isMaster = !hidden && (raw["isWritablePrimary"].trueValue() || raw["ismaster"].trueValue()); - - if (isMaster && raw.hasField("electionId")) { - electionId = raw["electionId"].OID(); - } - - configVersion = raw["setVersion"].numberInt(); - - const string primaryString = raw["primary"].str(); - primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString); - - // both hosts and passives, but not arbiters, are considered "normal hosts" - members.clear(); - BSONForEach(host, raw.getObjectField("hosts")) { - members.insert(HostAndPort(host.String())); - } - BSONForEach(host, raw.getObjectField("passives")) { - members.insert(HostAndPort(host.String())); - passives.insert(HostAndPort(host.String())); - } - - tags = raw.getObjectField("tags"); - BSONObj lastWriteField = raw.getObjectField("lastWrite"); - if (!lastWriteField.isEmpty()) { - if (auto lastWrite = lastWriteField["lastWriteDate"]) { - lastWriteDate = lastWrite.date(); - } - - uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime)); - } - } catch (const std::exception& e) { - ok = false; - LOGV2(24077, - "Exception while parsing isMaster reply: {error} {isMasterReply}", - "Exception while parsing isMaster reply", - "error"_attr = e.what(), - "isMasterReply"_attr = obj); - } -} - -Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {} - -void Node::markFailed(const Status& status) { - if (isUp) { - LOGV2(24078, - "Marking host {host} as failed: {error}", - "Marking host as failed", - "host"_attr = host, - "error"_attr = redact(status)); - - isUp = false; - } - - isMaster = false; -} - -bool Node::matches(const ReadPreference pref) const { - if (!isUp) { - LOGV2_DEBUG(24079, 3, "Host {host} is not up", "Host is not up", "host"_attr = host); - return false; - } - - LOGV2_DEBUG(24080, - 3, - "Host {host} is primary? {isPrimary}", - "Host is primary?", - "host"_attr = host, - "isPrimary"_attr = isMaster); - if (pref == ReadPreference::PrimaryOnly) { - return isMaster; - } - - if (pref == ReadPreference::SecondaryOnly) { - return !isMaster; - } - - return true; -} - -bool Node::matches(const BSONObj& tag) const { - BSONForEach(tagCriteria, tag) { - if (SimpleBSONElementComparator::kInstance.evaluate( - this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) { - return false; - } - } - - return true; -} - -void Node::update(const IsMasterReply& reply) { - invariant(host == reply.host); - invariant(reply.ok); - - LOGV2_DEBUG(24081, - 3, - "Updating host {host} based on isMaster reply: {isMasterReply}", - "Updating host based on isMaster reply", - "host"_attr = host, - "isMasterReply"_attr = reply.raw); - - // Nodes that are hidden or neither master or secondary are considered down since we can't - // send any operations to them. - isUp = !reply.hidden && (reply.isMaster || reply.secondary); - isMaster = reply.isMaster; - - minWireVersion = reply.minWireVersion; - maxWireVersion = reply.maxWireVersion; - - // save a copy if unchanged - if (!tags.binaryEqual(reply.tags)) - tags = reply.tags.getOwned(); - - if (reply.latencyMicros >= 0) { // TODO upper bound? - if (latencyMicros == unknownLatency) { - latencyMicros = reply.latencyMicros; - } else { - // update latency with smoothed moving average (1/4th the delta) - latencyMicros += (reply.latencyMicros - latencyMicros) / 4; - } - } - - LOGV2_DEBUG(24082, - 3, - "Updating {host} lastWriteDate to {lastWriteDate}", - "Updating host lastWriteDate", - "host"_attr = host, - "lastWriteDate"_attr = reply.lastWriteDate); - lastWriteDate = reply.lastWriteDate; - - LOGV2_DEBUG(24083, - 3, - "Updating {host} opTime to {opTime}", - "Updating host opTime", - "host"_attr = host, - "opTime"_attr = reply.opTime); - opTime = reply.opTime; - lastWriteDateUpdateTime = Date_t::now(); -} - -SetState::SetState(const MongoURI& uri, - ReplicaSetChangeNotifier* notifier, - executor::TaskExecutor* executor) - : setUri(std::move(uri)), - name(setUri.getSetName()), - notifier(notifier), - executor(executor), - seedNodes(setUri.getServers().begin(), setUri.getServers().end()), - latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)), - rand(std::random_device()()), - refreshPeriod(_getRefreshPeriod()) { - uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty()); - - if (name.empty()) - LOGV2_WARNING(24090, - "Replica set name empty, first node: {firstNode}", - "Replica set name empty, adding first node", - "firstNode"_attr = *(seedNodes.begin())); - - // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a - // scan until we start a scan and either find a master or contact all hosts without finding - // one. - // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to - // sort nodes after this loop. - for (auto&& addr : seedNodes) { - nodes.push_back(Node(addr)); - - if (addr.host()[0] == '$') { - invariant(isMocked || &addr == &*seedNodes.begin()); // Can't mix and match. - isMocked = true; - } else { - invariant(!isMocked); // Can't mix and match. - } - } - - if (kDebugBuild) - checkInvariants(); -} - -HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria, - const std::vector<HostAndPort>& excludedHosts) const { - auto hosts = getMatchingHosts(criteria, excludedHosts); - - if (hosts.empty()) { - return HostAndPort(); - } - - return hosts[0]; -} - -std::vector<HostAndPort> SetState::getMatchingHosts( - const ReadPreferenceSetting& criteria, const std::vector<HostAndPort>& excludedHosts) const { - switch (criteria.pref) { - // "Prefered" read preferences are defined in terms of other preferences - case ReadPreference::PrimaryPreferred: { - std::vector<HostAndPort> out = getMatchingHosts( - ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags), excludedHosts); - // NOTE: the spec says we should use the primary even if tags don't match - if (!out.empty()) - return out; - return getMatchingHosts(ReadPreferenceSetting(ReadPreference::SecondaryOnly, - criteria.tags, - criteria.maxStalenessSeconds), - excludedHosts); - } - - case ReadPreference::SecondaryPreferred: { - std::vector<HostAndPort> out = getMatchingHosts( - ReadPreferenceSetting( - ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds), - excludedHosts); - if (!out.empty()) - return out; - // NOTE: the spec says we should use the primary even if tags don't match - return getMatchingHosts( - ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags), excludedHosts); - } - - case ReadPreference::PrimaryOnly: { - // NOTE: isMaster implies isUp - Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster); - if (it == nodes.end()) - return {}; - const auto& primaryHost = it->host; - - // If the primary is excluded and we have specified readPreference primaryOnly, return - // empty host. - if (std::find(excludedHosts.begin(), excludedHosts.end(), primaryHost) != - excludedHosts.end()) { - return {}; - } - return {primaryHost}; - } - - // The difference between these is handled by Node::matches - case ReadPreference::SecondaryOnly: - case ReadPreference::Nearest: { - std::function<bool(const Node&)> matchNode = [](const Node& node) -> bool { - return true; - }; - // build comparator - if (criteria.maxStalenessSeconds.count()) { - auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster); - if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) { - auto writeDateCmp = [](const Node* a, const Node* b) -> bool { - return a->lastWriteDate < b->lastWriteDate; - }; - // use only non failed nodes - std::vector<const Node*> upNodes; - for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) { - if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) { - upNodes.push_back(&(*nodeIt)); - } - } - auto latestSecNode = - std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp); - if (latestSecNode == upNodes.end()) { - matchNode = [](const Node& node) -> bool { return false; }; - } else { - Date_t maxWriteTime = (*latestSecNode)->lastWriteDate; - matchNode = [=](const Node& node) -> bool { - return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) + - refreshPeriod <= - criteria.maxStalenessSeconds; - }; - } - } else { - Seconds primaryStaleness = duration_cast<Seconds>( - masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate); - matchNode = [=](const Node& node) -> bool { - return duration_cast<Seconds>(node.lastWriteDateUpdateTime - - node.lastWriteDate) - - primaryStaleness + refreshPeriod <= - criteria.maxStalenessSeconds; - }; - } - } - - std::vector<const Node*> allMatchingNodes; - BSONForEach(tagElem, criteria.tags.getTagBSON()) { - uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj()); - BSONObj tag = tagElem.Obj(); - - std::vector<const Node*> matchingNodes; - for (size_t i = 0; i < nodes.size(); i++) { - auto isNotExcluded = - (std::find(excludedHosts.begin(), excludedHosts.end(), nodes[i].host) == - excludedHosts.end()); - if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) && - matchNode(nodes[i]) && isNotExcluded) { - matchingNodes.push_back(&nodes[i]); - } - } - - // Only consider nodes that satisfy the minClusterTime - if (!criteria.minClusterTime.isNull()) { - std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater); - for (size_t i = 0; i < matchingNodes.size(); i++) { - if (matchingNodes[i]->opTime.getTimestamp() < criteria.minClusterTime) { - if (i == 0) { - // If no nodes satisfy the minClusterTime criteria, we ignore the - // minClusterTime requirement. - break; - } - matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end()); - break; - } - } - } - - allMatchingNodes.insert( - allMatchingNodes.end(), matchingNodes.begin(), matchingNodes.end()); - } - - // don't do more complicated selection if not needed - if (allMatchingNodes.empty()) { - return {}; - } - if (allMatchingNodes.size() == 1) { - return {allMatchingNodes.front()->host}; - } - - // If there are multiple nodes satisfying the minClusterTime, next order by latency and - // don't consider hosts further than a threshold from the closest. - std::sort(allMatchingNodes.begin(), allMatchingNodes.end(), compareLatencies); - - if (!MONGO_unlikely(scanningServerSelectorIgnoreLatencyWindow.shouldFail())) { - for (size_t i = 1; i < allMatchingNodes.size(); i++) { - int64_t distance = - allMatchingNodes[i]->latencyMicros - allMatchingNodes[0]->latencyMicros; - if (distance >= latencyThresholdMicros) { - // this node and all remaining ones are too far away - allMatchingNodes.erase(allMatchingNodes.begin() + i, - allMatchingNodes.end()); - break; - } - } - } - - std::vector<HostAndPort> hosts; - std::transform(allMatchingNodes.begin(), - allMatchingNodes.end(), - std::back_inserter(hosts), - [](const auto& node) { return node->host; }); - - // Note that the host list is only deterministic (or random) for the first node. - // The rest of the list is in matchingNodes order (latency) with one element swapped - // for the first element. - if (auto bestHostIdx = useDeterministicHostSelection ? roundRobin++ % hosts.size() - : rand.nextInt32(hosts.size())) { - using std::swap; - swap(hosts[0], hosts[bestHostIdx]); - } - - return hosts; - } - - default: - uassert(16337, "Unknown read preference", false); - break; - } -} - -Node* SetState::findNode(const HostAndPort& host) { - const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); - if (it == nodes.end() || it->host != host) - return nullptr; - - return &(*it); -} - -Node* SetState::findOrCreateNode(const HostAndPort& host) { - // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class - // must function correctly even with more nodes). If we lift that restriction, we may need - // to consider alternate algorithms. - Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); - if (it == nodes.end() || it->host != host) { - LOGV2_DEBUG(24084, - 2, - "Adding node {host} to our view of replica set {replicaSet}", - "Adding node to our view of the replica set", - "host"_attr = host, - "replicaSet"_attr = name); - it = nodes.insert(it, Node(host)); - } - return &(*it); -} - -void SetState::updateNodeIfInNodes(const IsMasterReply& reply) { - Node* node = findNode(reply.host); - if (!node) { - LOGV2_DEBUG(24085, - 2, - "Skipping application of isMaster reply from {host} since it isn't a " - "confirmed member of set {replicaSet}", - "Skipping application of isMaster reply from host since it isn't a confirmed " - "member of the replica set", - "host"_attr = reply.host, - "replicaSet"_attr = name); - return; - } - - node->update(reply); -} - -ConnectionString SetState::confirmedConnectionString() const { - std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes)); - - return ConnectionString::forReplicaSet(name, std::move(hosts)); -} - -ConnectionString SetState::possibleConnectionString() const { - std::vector<HostAndPort> hosts; - hosts.reserve(nodes.size()); - - for (auto& node : nodes) { - hosts.push_back(node.host); - } - - return ConnectionString::forReplicaSet(name, std::move(hosts)); -} - -void SetState::notify() { - if (!waiters.size()) { - return; - } - - const auto cachedNow = now(); - auto shouldQuickFail = areRefreshRetriesDisabledForTest() && !currentScan; - - for (auto it = waiters.begin(); it != waiters.end();) { - if (isDropped) { - it->promise.setError({ErrorCodes::ShutdownInProgress, - str::stream() << "ScanningReplicaSetMonitor is shutting down"}); - waiters.erase(it++); - continue; - } - auto match = getMatchingHosts(it->criteria, it->excludedHosts); - if (!match.empty()) { - // match; - it->promise.emplaceValue(std::move(match)); - waiters.erase(it++); - } else if (it->deadline <= cachedNow) { - LOGV2_DEBUG(24086, - 1, - "Unable to statisfy read preference {criteria} by deadline {deadline}", - "Unable to statisfy read preference by deadline", - "criteria"_attr = it->criteria, - "deadline"_attr = it->deadline); - it->promise.setError(makeUnsatisfedReadPrefError(it->criteria)); - waiters.erase(it++); - } else if (shouldQuickFail) { - LOGV2_DEBUG(24087, 1, "Unable to statisfy read preference because tests fail quickly"); - it->promise.setError(makeUnsatisfedReadPrefError(it->criteria)); - waiters.erase(it++); - } else { - it++; - } - } - - if (waiters.empty()) { - // No current waiters so we can stop the expedited scanning. - isExpedited = false; - rescheduleRefresh(SchedulingStrategy::kCancelPreviousScan); - } -} - -Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const { - return Status(ErrorCodes::FailedToSatisfyReadPreference, - str::stream() << "Could not find host matching read preference " - << criteria.toString() << " for set " << name); -} - -void SetState::init() { - rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan); - notifier->onFoundSet(name); -} - -void SetState::drop() { - // This is invoked from ScanningReplicaSetMonitor::drop() under lock. - if (std::exchange(isDropped, true)) { - // If a SetState calls drop() from destruction after the RSMM calls shutdown(), then the - // RSMM's executor may no longer exist. Thus, only drop once. - return; - } - - currentScan.reset(); - notify(); - - if (auto handle = std::exchange(refresherHandle, {})) { - // Cancel our refresh on the way out - executor->cancel(handle); - } - - for (auto& node : nodes) { - if (auto handle = std::exchange(node.scheduledIsMasterHandle, {})) { - // Cancel any isMasters we had scheduled - executor->cancel(handle); - } - } - - // No point in notifying if we never started - if (workingConnStr.isValid()) { - notifier->onDroppedSet(name); - } -} - -void SetState::checkInvariants() const { - bool foundMaster = false; - for (size_t i = 0; i < nodes.size(); i++) { - // no empty hosts - invariant(!nodes[i].host.empty()); - - if (nodes[i].isMaster) { - // masters must be up - invariant(nodes[i].isUp); - - // at most one master - invariant(!foundMaster); - foundMaster = true; - - // if we have a master it should be the same as lastSeenMaster - invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster); - } - - // should never end up with negative latencies - invariant(nodes[i].latencyMicros >= 0); - - // nodes must be sorted by host with no-dupes - invariant(i == 0 || (nodes[i - 1].host < nodes[i].host)); - } - - // nodes should be a (non-strict) superset of the seedNodes - invariant(std::includes( - nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts)); - - if (currentScan) { - // hostsToScan can't have dups or hosts already in triedHosts. - std::set<HostAndPort> cantSee = currentScan->triedHosts; - for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin(); - it != currentScan->hostsToScan.end(); - ++it) { - invariant(!cantSee.count(*it)); - cantSee.insert(*it); // make sure we don't see this again - } - - // We should only be waitingFor hosts that are in triedHosts - invariant(std::includes(currentScan->triedHosts.begin(), - currentScan->triedHosts.end(), - currentScan->waitingFor.begin(), - currentScan->waitingFor.end())); - - // We should only have unconfirmedReplies if we haven't found a master yet - invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty()); - } -} - -template <typename Container> -void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) { - invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue. - - // no std::copy_if before c++11 - for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end; - ++it) { - if (!triedHosts.count(*it)) { - hostsToScan.push_back(*it); - } - } - std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg()); -} - -void ScanState::retryAllTriedHosts(PseudoRandom& rand) { - invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue. - // Move hosts that are in triedHosts but not in waitingFor from triedHosts to hostsToScan. - std::set_difference(triedHosts.begin(), - triedHosts.end(), - waitingFor.begin(), - waitingFor.end(), - std::inserter(hostsToScan, hostsToScan.end())); - std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg()); - triedHosts = waitingFor; -} - -void ScanState::markHostsToScanAsTried() noexcept { - while (!hostsToScan.empty()) { - auto host = hostsToScan.front(); - hostsToScan.pop_front(); - /** - * Mark the popped host as tried to avoid deleting hosts in multiple points. - * This emulates the final effect of Refresher::getNextStep() on the set. - */ - triedHosts.insert(host); - } -} -} // namespace mongo diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h deleted file mode 100644 index fca197a409a..00000000000 --- a/src/mongo/client/scanning_replica_set_monitor.h +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <functional> -#include <memory> -#include <set> -#include <string> - -#include "mongo/base/string_data.h" -#include "mongo/client/mongo_uri.h" -#include "mongo/client/replica_set_change_notifier.h" -#include "mongo/client/replica_set_monitor.h" -#include "mongo/executor/task_executor.h" -#include "mongo/util/concurrency/with_lock.h" -#include "mongo/util/duration.h" -#include "mongo/util/net/hostandport.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -class ScanningReplicaSetMonitor : public ReplicaSetMonitor { - ScanningReplicaSetMonitor(const ScanningReplicaSetMonitor&) = delete; - ScanningReplicaSetMonitor& operator=(const ScanningReplicaSetMonitor&) = delete; - -public: - class Refresher; - - /** - * Defaults to false, meaning that if multiple hosts meet a criteria we pick one at random. - * This is required by the replica set driver spec. Set this to true in tests that need host - * selection to be deterministic. - * - * NOTE: Used by unit-tests only. - */ - static bool useDeterministicHostSelection; - - static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500); - static constexpr auto kCheckTimeout = Seconds(5); - - ScanningReplicaSetMonitor(const MongoURI& uri, - std::shared_ptr<executor::TaskExecutor> executor, - std::function<void()> cleanupCallback); - - void init() override; - - void drop() override; - - /** - * NOTE: Cancellation via CancellationTokens is not implemented for the - * ScanningReplicaSetMonitor, so any token passed in will be ignored. - */ - SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref, - const std::vector<HostAndPort>& excludedHosts, - const CancellationToken&) override; - - /** - * NOTE: Cancellation via CancellationTokens is not implemented for the - * ScanningReplicaSetMonitor, so any token passed in will be ignored. - */ - SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh( - const ReadPreferenceSetting& readPref, - const std::vector<HostAndPort>& excludedHosts, - const CancellationToken&) override; - - HostAndPort getPrimaryOrUassert() override; - - /* - * For the ScanningReplicaSetMonitor, all the failedHost methods are equivalent. - */ - void failedHost(const HostAndPort& host, const Status& status) override; - - void failedHostPreHandshake(const HostAndPort& host, - const Status& status, - BSONObj bson) override; - - void failedHostPostHandshake(const HostAndPort& host, - const Status& status, - BSONObj bson) override; - - bool isPrimary(const HostAndPort& host) const override; - - bool isHostUp(const HostAndPort& host) const override; - - int getMinWireVersion() const override; - - int getMaxWireVersion() const override; - - std::string getName() const override; - - std::string getServerAddress() const override; - - const MongoURI& getOriginalUri() const override; - - bool contains(const HostAndPort& server) const override; - - void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override; - - bool isKnownToHaveGoodPrimary() const override; - - // - // internal types (defined in scanning_replica_set_monitor_internal.h) - // - - struct IsMasterReply; - struct ScanState; - struct SetState; - typedef std::shared_ptr<ScanState> ScanStatePtr; - typedef std::shared_ptr<SetState> SetStatePtr; - - /** - * Allows tests to set initial conditions and introspect the current state. - */ - ScanningReplicaSetMonitor(const SetStatePtr& initialState, - std::function<void()> cleanupCallback); - ~ScanningReplicaSetMonitor() override; - - /** - * This is for use in tests using MockReplicaSet to ensure that a full scan completes before - * continuing. - */ - void runScanForMockReplicaSet(); - - static void disableRefreshRetries_forTest(); - - static bool areRefreshRetriesDisabledForTest(); - -private: - Future<std::vector<HostAndPort>> _getHostsOrRefresh( - const ReadPreferenceSetting& readPref, - Milliseconds maxWait, - const std::vector<HostAndPort>& excludedHosts); - - /** - * If no scan is in-progress, this function is responsible for setting up a new scan. Otherwise, - * does nothing. - */ - static void _ensureScanInProgress(const SetStatePtr&); - - /** - * Returns the refresh period that is given to all new SetStates. - */ - static Seconds _getRefreshPeriod(); - - /** - * Returns fail injected refresh period, if fail point is set. - */ - static boost::optional<Seconds> _getFailInjectedRefreshPeriod(); - - const SetStatePtr _state; -}; - - -/** - * Refreshes the local view of a replica set. - * - * All logic related to choosing the hosts to contact and updating the SetState based on replies - * lives in this class. Use of this class should always be guarded by SetState::mutex unless in - * single-threaded use by ScanningReplicaSetMonitorTest. - */ -class ScanningReplicaSetMonitor::Refresher { -public: - explicit Refresher(const SetStatePtr& setState); - - struct NextStep { - enum StepKind { - CONTACT_HOST, /// Contact the returned host - WAIT, /// Wait on condition variable and try again. - DONE, /// No more hosts to contact in this Refresh round - }; - - explicit NextStep(StepKind step, const HostAndPort& host = HostAndPort()) - : step(step), host(host) {} - - StepKind step; - HostAndPort host; - }; - - /** - * Returns the next step to take. - * - * By calling this, you promise to call receivedIsMaster or failedHost if the NextStep is - * CONTACT_HOST. - */ - NextStep getNextStep(); - - /** - * Call this if a host returned from getNextStep successfully replied to an isMaster call. - * Negative latencyMicros are ignored. - */ - void receivedIsMaster(const HostAndPort& from, int64_t latencyMicros, const BSONObj& reply); - - /** - * Call this if a host returned from getNextStep failed to reply to an isMaster call. - */ - void failedHost(const HostAndPort& host, const Status& status); - - /** - * Starts a new scan over the hosts in set. - */ - void startNewScan(); - - /** - * First, checks that the "reply" is not from a stale primary by comparing the electionId of - * "reply" to the maxElectionId recorded by the SetState and returns OK status if "reply" - * belongs to a non-stale primary. Otherwise returns a failed status. - * - * The 'from' parameter specifies the node from which the response is received. - * - * Updates _set and _scan based on set-membership information from a master. - * Applies _scan->unconfirmedReplies to confirmed nodes. - * Does not update this host's node in _set->nodes. - */ - Status receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply); - - /** - * Schedules isMaster requests to all hosts that currently need to be contacted. - * Does nothing if requests have already been sent to all known hosts. - */ - void scheduleNetworkRequests(); - - void scheduleIsMaster(const HostAndPort& host); - -private: - // Both pointers are never NULL - SetStatePtr _set; - ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started. -}; - -} // namespace mongo diff --git a/src/mongo/client/scanning_replica_set_monitor_internal.h b/src/mongo/client/scanning_replica_set_monitor_internal.h deleted file mode 100644 index 9641143a3a3..00000000000 --- a/src/mongo/client/scanning_replica_set_monitor_internal.h +++ /dev/null @@ -1,337 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -/** - * This is an internal header. - * This should only be included by replica_set_monitor.cpp and unittests. - * This should never be included by any header. - */ - -#pragma once - -#include <cstdint> -#include <deque> -#include <set> -#include <string> -#include <vector> - -#include "mongo/client/read_preference.h" -#include "mongo/client/scanning_replica_set_monitor.h" -#include "mongo/db/jsobj.h" -#include "mongo/platform/mutex.h" -#include "mongo/platform/random.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/util/net/hostandport.h" - -namespace mongo { - -struct ScanningReplicaSetMonitor::IsMasterReply { - IsMasterReply() : ok(false) {} - IsMasterReply(const HostAndPort& host, int64_t latencyMicros, const BSONObj& reply) - : ok(false), host(host), latencyMicros(latencyMicros) { - parse(reply); - } - - /** - * Never throws. If parsing fails for any reason, sets ok to false. - */ - void parse(const BSONObj& obj); - - bool ok; // if false, ignore all other fields - BSONObj raw; // Always owned. Other fields are allowed to be a view into this. - std::string setName; - bool isMaster; - bool secondary; - bool hidden; - int configVersion{}; - OID electionId; // Set if this isMaster reply is from the primary - HostAndPort primary; // empty if not present - std::set<HostAndPort> members; // both "hosts" and "passives" - std::set<HostAndPort> passives; - BSONObj tags; - int minWireVersion{}; - int maxWireVersion{}; - - // remaining fields aren't in isMaster reply, but are known to caller. - HostAndPort host; - int64_t latencyMicros; // ignored if negative - Date_t lastWriteDate{}; - repl::OpTime opTime{}; -}; - -/** - * The SetState is the underlying data object behind both the ScanningReplicaSetMonitor and the - * Refresher - * - * Note that the SetState only holds its own lock in init() and drop(). Even those uses can probably - * be offloaded to the RSM eventually. In all other cases, the RSM and RSM::Refresher use the - * SetState lock to synchronize. - */ -struct ScanningReplicaSetMonitor::SetState - : public std::enable_shared_from_this<ScanningReplicaSetMonitor::SetState> { - SetState(const SetState&) = delete; - SetState& operator=(const SetState&) = delete; - -public: - /** - * Holds the state of a single node in the replicaSet - */ - struct Node { - explicit Node(const HostAndPort& host); - - void markFailed(const Status& status); - - bool matches(ReadPreference pref) const; - - /** - * Checks if the given tag matches the tag attached to this node. - * - * Example: - * - * Tag of this node: { "dc": "nyc", "region": "na", "rack": "4" } - * - * match: {} - * match: { "dc": "nyc", "rack": 4 } - * match: { "region": "na", "dc": "nyc" } - * not match: { "dc": "nyc", "rack": 2 } - * not match: { "dc": "sf" } - */ - bool matches(const BSONObj&) const; - - /** - * Returns true if all of the tags in the tag set match node's tags - */ - bool matches(const TagSet&) const; - - /** - * Updates this Node based on information in reply. The reply must be from this host. - */ - void update(const IsMasterReply& reply); - - HostAndPort host; - bool isUp{false}; - bool isMaster{false}; - int64_t latencyMicros{}; - BSONObj tags; // owned - int minWireVersion{}; - int maxWireVersion{}; - Date_t lastWriteDate{}; // from isMasterReply - Date_t lastWriteDateUpdateTime{}; // set to the local system's time at the time of updating - // lastWriteDate - Date_t nextPossibleIsMasterCall{}; // time that previous isMaster check ended - executor::TaskExecutor::CallbackHandle scheduledIsMasterHandle; // - repl::OpTime opTime{}; // from isMasterReply - }; - - using Nodes = std::vector<Node>; - - struct Waiter { - Date_t deadline; - ReadPreferenceSetting criteria; - std::vector<HostAndPort> excludedHosts; - Promise<std::vector<HostAndPort>> promise; - }; - - SetState(const MongoURI& uri, ReplicaSetChangeNotifier*, executor::TaskExecutor*); - - bool isUsable() const; - - /** - * Returns a host matching criteria or an empty host if no known host matches. - * - * Note: Uses only local data and does not go over the network. - */ - std::vector<HostAndPort> getMatchingHosts( - const ReadPreferenceSetting& criteria, - const std::vector<HostAndPort>& excludedHosts = std::vector<HostAndPort>()) const; - - HostAndPort getMatchingHost( - const ReadPreferenceSetting& criteria, - const std::vector<HostAndPort>& excludedHosts = std::vector<HostAndPort>()) const; - - /** - * Returns the Node with the given host, or NULL if no Node has that host. - */ - Node* findNode(const HostAndPort& host); - - /** - * Returns the Node with the given host, or creates one if no Node has that host. - * Maintains the sorted order of nodes. - */ - Node* findOrCreateNode(const HostAndPort& host); - - void updateNodeIfInNodes(const IsMasterReply& reply); - - /** - * Returns the connection string of the nodes that are known the be in the set because we've - * seen them in the isMaster reply of a PRIMARY. - */ - ConnectionString confirmedConnectionString() const; - - /** - * Returns the connection string of the nodes that are believed to be in the set because we've - * seen them in the isMaster reply of non-PRIMARY nodes in our seed list. - */ - ConnectionString possibleConnectionString() const; - - /** - * Call this to notify waiters after a scan processes a valid reply, rescans, or finishes. - */ - void notify(); - - Date_t now() const { - return executor ? executor->now() : Date_t::now(); - } - - Status makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const; - - // Tiny enum to convey semantics for rescheduleFefresh() - enum class SchedulingStrategy { - kKeepEarlyScan, - kCancelPreviousScan, - }; - - /** - * Schedules a refresh via the task executor and cancel any previous refresh. - * (Task is automatically canceled in the d-tor.) - */ - void rescheduleRefresh(SchedulingStrategy strategy); - - /** - * Notifies all listeners that the ReplicaSet is in use. - */ - void init(); - - /** - * Resets the current scan and notifies all listeners that the ReplicaSet isn't in use. - */ - void drop(); - - /** - * Before unlocking, do `if (kDebugBuild) checkInvariants();` - */ - void checkInvariants() const; - - /** - * Wrap the callback and schedule it to run at some time - * - * The callback wrapper does the following: - * * Return before running cb if isDropped is true - * * Return before running cb if the handle was canceled - * * Lock before running cb and unlock after - */ - template <typename Callback> - auto scheduleWorkAt(Date_t when, Callback&& cb) const; - - const MongoURI setUri; // URI passed to ctor -- THIS IS NOT UPDATED BY SCANS - const std::string name; - - ReplicaSetChangeNotifier* const notifier; - executor::TaskExecutor* const executor; - - bool isDropped = false; - - // You must hold this to access any member below. - mutable Mutex mutex = MONGO_MAKE_LATCH("SetState::mutex"); - - executor::TaskExecutor::CallbackHandle refresherHandle; - - // For starting scans - std::set<HostAndPort> seedNodes; // updated whenever a master reports set membership changes - ConnectionString seedConnStr; // The connection string from the last time we had valid seeds - int64_t seedGen = 0; - - bool isMocked = false; // True if this set is using nodes from MockReplicaSet - - // For tracking scans - // lastSeenMaster is empty if we have never seen a master or the last scan didn't have one - HostAndPort lastSeenMaster; - int consecutiveFailedScans = 0; - Nodes nodes; // maintained sorted and unique by host - ConnectionString workingConnStr; // The connection string from our last scan - - // For tracking replies - OID maxElectionId; // largest election id observed by this ScanningReplicaSetMonitor - int configVersion = 0; // version number of the replica set config. - - // For matching hosts - int64_t latencyThresholdMicros = 0; - mutable int roundRobin = 0; // used when useDeterministicHostSelection is true - mutable PseudoRandom rand; // only used for host selection to balance load - - // For scheduling scans - Seconds refreshPeriod; // Normal refresh period when not expedited - bool isExpedited = false; // True when we are doing more frequent refreshes due to waiters - std::list<Waiter> waiters; // Everyone waiting for some ReadPreference to be satisfied - uint64_t nextScanId = 0; // The id for the next scan - ScanStatePtr currentScan; // NULL if no scan in progress - Date_t nextScanTime; // The time at which the next scan is scheduled to start -}; - -struct ScanningReplicaSetMonitor::ScanState { - ScanState(const ScanState&) = delete; - ScanState& operator=(const ScanState&) = delete; - -public: - ScanState() = default; - - /** - * Adds all hosts in container that aren't in triedHosts to hostsToScan, then shuffles the - * queue. - */ - template <typename Container> - void enqueAllUntriedHosts(const Container& container, PseudoRandom& rand); - - /** - * Adds all completed hosts back to hostsToScan and shuffles the queue. - */ - void retryAllTriedHosts(PseudoRandom& rand); - - /** - * A safe way to clear interrupted scans - */ - void markHostsToScanAsTried() noexcept; - - // This is only for logging and should not affect behavior otherwise. - Timer timer; - - // Access to fields is guarded by associated SetState's mutex. - bool foundUpMaster = false; - bool foundAnyUpNodes = false; - std::deque<HostAndPort> hostsToScan; // Work queue. - std::set<HostAndPort> possibleNodes; // Nodes reported by non-primary hosts. - std::set<HostAndPort> waitingFor; // Hosts we have dispatched but haven't replied yet. - std::set<HostAndPort> triedHosts; // Hosts that have been returned from getNextStep. - - // All responses go here until we find a master. - typedef std::map<HostAndPort, IsMasterReply> UnconfirmedReplies; - UnconfirmedReplies unconfirmedReplies; -}; - -} // namespace mongo |