diff options
22 files changed, 2270 insertions, 1752 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 6707bfa3494..b06b7d6f20d 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -194,7 +194,10 @@ clientDriverEnv.Library( env.Idlc('global_conn_pool.idl')[0], 'replica_set_change_notifier.cpp', 'replica_set_monitor.cpp', + 'scanning_replica_set_monitor.cpp', + 'streamable_replica_set_monitor.cpp', 'replica_set_monitor_manager.cpp', + env.Idlc('replica_set_monitor_params.idl')[0], 'server_ping_monitor.cpp', ], LIBDEPS=[ @@ -304,16 +307,17 @@ env.CppUnitTest( 'authenticate_test.cpp', 'connection_string_test.cpp', 'dbclient_cursor_test.cpp', + 'disable_streamable_rsm_flag_test.cpp', 'fetcher_test.cpp', 'index_spec_test.cpp', 'mongo_uri_test.cpp', 'read_preference_test.cpp', 'remote_command_retry_scheduler_test.cpp', - 'replica_set_monitor_internal_test.cpp', - 'replica_set_monitor_read_preference_test.cpp', - 'replica_set_monitor_scan_test.cpp', - 'replica_set_monitor_test_concurrent.cpp', - 'replica_set_monitor_test_fixture.cpp', + 'scanning_replica_set_monitor_internal_test.cpp', + 'scanning_replica_set_monitor_read_preference_test.cpp', + 'scanning_replica_set_monitor_scan_test.cpp', + 'scanning_replica_set_monitor_test_concurrent.cpp', + 'scanning_replica_set_monitor_test_fixture.cpp', 'server_ping_monitor_test.cpp', ], LIBDEPS=[ diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp index e39fcec61c7..c7170733b1b 100644 --- a/src/mongo/client/dbclient_connection.cpp +++ b/src/mongo/client/dbclient_connection.cpp @@ -801,7 +801,7 @@ void DBClientConnection::handleNotMasterResponse(const BSONObj& replyBody, return; } - ReplicaSetMonitorPtr monitor = ReplicaSetMonitor::get(_parentReplSetName); + auto monitor = ReplicaSetMonitor::get(_parentReplSetName); if (monitor) { monitor->failedHost(_serverAddress, {ErrorCodes::NotMaster, diff --git a/src/mongo/client/disable_streamable_rsm_flag_test.cpp b/src/mongo/client/disable_streamable_rsm_flag_test.cpp new file mode 100644 index 00000000000..0cad97280d5 --- /dev/null +++ b/src/mongo/client/disable_streamable_rsm_flag_test.cpp @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/replica_set_monitor.h" +#include "mongo/client/replica_set_monitor_params_gen.h" +#include "mongo/client/scanning_replica_set_monitor.h" +#include "mongo/client/streamable_replica_set_monitor.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace { + +class RSMDisableStreamableFlagTestFixture : public unittest::Test { +protected: + void setUp() { + setGlobalServiceContext(ServiceContext::make()); + ReplicaSetMonitor::cleanup(); + } + + void tearDown() { + unsetParameter(); + } + + /** + * Sets the data of the disableStreamableReplicaSetMonitor parameter to flagValue. + */ + void setParameter(bool flagValue) { + const BSONObj newFlagParameter = BSON(kDisableStreamableFlagName << flagValue); + BSONObjIterator parameterIterator(newFlagParameter); + BSONElement newParameter = parameterIterator.next(); + const auto foundParameter = findDisableStreamableServerParameter(); + + uassertStatusOK(foundParameter->second->set(newParameter)); + ASSERT_EQ(flagValue, disableStreamableReplicaSetMonitor.load()); + } + + /** + * Restores the disableStreamableReplicaSetMonitor parameter to its default value. + */ + void unsetParameter() { + const auto defaultParameter = kDefaultParameter[kDisableStreamableFlagName]; + const auto foundParameter = findDisableStreamableServerParameter(); + + uassertStatusOK(foundParameter->second->set(defaultParameter)); + } + + /** + * Finds the disableStreamableReplicaSetMonitor ServerParameter. + */ + ServerParameter::Map::const_iterator findDisableStreamableServerParameter() { + const ServerParameter::Map& parameterMap = ServerParameterSet::getGlobal()->getMap(); + return parameterMap.find(kDisableStreamableFlagName); + } + + static inline const std::string kDisableStreamableFlagName = + "disableStreamableReplicaSetMonitor"; + + /** + * A BSONObj containing the default for the disableStreamableReplicaSetMonitor flag. + */ + static inline const BSONObj kDefaultParameter = + BSON(kDisableStreamableFlagName << disableStreamableReplicaSetMonitor.load()); +}; + +/** + * Checks that a ScanningReplicaSetMonitor is created when the disableStreamableReplicaSetMonitor + * flag is set to true. + */ +TEST_F(RSMDisableStreamableFlagTestFixture, checkIsScanningIfDisableStreamableIsTrue) { + setParameter(true); + auto uri = MongoURI::parse("mongodb://a,b,c/?replicaSet=name"); + ASSERT_OK(uri.getStatus()); + auto createdMonitor = ReplicaSetMonitor::createIfNeeded(uri.getValue()); + + // If the created monitor does not point to a ScanningReplicaSetMonitor, the cast returns a + // nullptr. + auto scanningMonitorCast = dynamic_cast<ScanningReplicaSetMonitor*>(createdMonitor.get()); + ASSERT(scanningMonitorCast); + + auto streamableMonitorCast = dynamic_cast<StreamableReplicaSetMonitor*>(createdMonitor.get()); + ASSERT_FALSE(streamableMonitorCast); +} + +/** + * Checks that a StreamableReplicaSetMonitor is created when the the + * disableStreamableReplicaSetMonitor flag is set to false. + * + * TODO SERVER-43332: Once the StreamableReplicaSetMonitor is integrated into the codebase, this + * test should mirror the logic in checkIsScanningIfDisableStreamableIsTrue accordingly. + */ +TEST_F(RSMDisableStreamableFlagTestFixture, checkIsStreamableIfDisableStreamableIsFalse) { + setParameter(false); + auto uri = MongoURI::parse("mongodb://a,b,c/?replicaSet=name"); + ASSERT_OK(uri.getStatus()); + ASSERT_THROWS_CODE(ReplicaSetMonitor::createIfNeeded(uri.getValue()), DBException, 31451); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index 2110d43cd8a..57f43547cc8 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -41,11 +41,9 @@ #include "mongo/client/connpool.h" #include "mongo/client/global_conn_pool.h" #include "mongo/client/read_preference.h" -#include "mongo/client/replica_set_monitor_internal.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/bson_extract_optime.h" #include "mongo/db/server_options.h" -#include "mongo/logv2/log.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/mutex.h" #include "mongo/stdx/condition_variable.h" @@ -67,371 +65,9 @@ using std::vector; // Failpoint for changing the default refresh period MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod); -namespace { - -// Pull nested types to top-level scope -typedef ReplicaSetMonitor::IsMasterReply IsMasterReply; -typedef ReplicaSetMonitor::ScanState ScanState; -typedef ReplicaSetMonitor::ScanStatePtr ScanStatePtr; -typedef ReplicaSetMonitor::SetState SetState; -typedef ReplicaSetMonitor::SetStatePtr SetStatePtr; -typedef ReplicaSetMonitor::Refresher Refresher; -typedef ScanState::UnconfirmedReplies UnconfirmedReplies; -typedef SetState::Node Node; -typedef SetState::Nodes Nodes; -using executor::TaskExecutor; -using CallbackArgs = TaskExecutor::CallbackArgs; -using CallbackHandle = TaskExecutor::CallbackHandle; - -// Intentionally chosen to compare worse than all known latencies. -const int64_t unknownLatency = numeric_limits<int64_t>::max(); - -const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet()); -const Milliseconds kExpeditedRefreshPeriod(500); -AtomicWord<bool> areRefreshRetriesDisabledForTest{false}; // Only true in tests. - -// -// Helpers for stl algorithms -// - -bool isMaster(const Node& node) { - return node.isMaster; -} - -bool opTimeGreater(const Node* lhs, const Node* rhs) { - return lhs->opTime > rhs->opTime; -} - -bool compareLatencies(const Node* lhs, const Node* rhs) { - // NOTE: this automatically compares Node::unknownLatency worse than all others. - return lhs->latencyMicros < rhs->latencyMicros; -} - -bool hostsEqual(const Node& lhs, const HostAndPort& rhs) { - return lhs.host == rhs; -} - -// Allows comparing two Nodes, or a HostAndPort and a Node. -// NOTE: the two HostAndPort overload is only needed to support extra checks in some STL -// implementations. For simplicity, no comparator should be used with collections of just -// HostAndPort. -struct CompareHosts { - bool operator()(const Node& lhs, const Node& rhs) { - return lhs.host < rhs.host; - } - bool operator()(const Node& lhs, const HostAndPort& rhs) { - return lhs.host < rhs; - } - bool operator()(const HostAndPort& lhs, const Node& rhs) { - return lhs < rhs.host; - } - bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) { - return lhs < rhs; - } -} compareHosts; // like an overloaded function, but able to pass to stl algorithms - -// The following structs should be treated as functions returning a UnaryPredicate. -// Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost)); -// They all hold their constructor argument by reference. - -struct HostIs { - explicit HostIs(const HostAndPort& host) : _host(host) {} - bool operator()(const HostAndPort& host) { - return host == _host; - } - bool operator()(const Node& node) { - return node.host == _host; - } - const HostAndPort& _host; -}; - -struct HostNotIn { - explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {} - bool operator()(const HostAndPort& host) { - return !_hosts.count(host); - } - bool operator()(const Node& node) { - return !_hosts.count(node.host); - } - const std::set<HostAndPort>& _hosts; -}; - -int32_t pingTimeMillis(const Node& node) { - auto latencyMillis = node.latencyMicros / 1000; - if (latencyMillis > numeric_limits<int32_t>::max()) { - // In particular, Node::unknownLatency does not fit in an int32. - return numeric_limits<int32_t>::max(); - } - return latencyMillis; -} - -/** - * Replica set refresh period on the task executor. - */ -const Seconds kDefaultRefreshPeriod(30); -} // namespace - -// If we cannot find a host after 15 seconds of refreshing, give up -const Seconds ReplicaSetMonitor::kDefaultFindHostTimeout(15); - // Defaults to random selection as required by the spec bool ReplicaSetMonitor::useDeterministicHostSelection = false; -Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() { - Seconds r = kDefaultRefreshPeriod; - static constexpr auto kPeriodField = "period"_sd; - modifyReplicaSetMonitorDefaultRefreshPeriod.executeIf( - [&r](const BSONObj& data) { r = Seconds{data.getIntField(kPeriodField)}; }, - [](const BSONObj& data) { return data.hasField(kPeriodField); }); - return r; -} - -ReplicaSetMonitor::ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {} - -ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri) - : ReplicaSetMonitor( - std::make_shared<SetState>(uri, - &ReplicaSetMonitorManager::get()->getNotifier(), - ReplicaSetMonitorManager::get()->getExecutor())) {} - -void ReplicaSetMonitor::init() { - if (areRefreshRetriesDisabledForTest.load()) { - // This is for MockReplicaSet. Those tests want to control when scanning happens. - LOGV2_WARNING(20180, - "*** Not starting background refresh because refresh retries are disabled."); - return; - } - - { - stdx::lock_guard lk(_state->mutex); - _state->init(); - } -} - -void ReplicaSetMonitor::drop() { - { - stdx::lock_guard lk(_state->mutex); - _state->drop(); - } -} - -ReplicaSetMonitor::~ReplicaSetMonitor() { - drop(); -} - -template <typename Callback> -auto ReplicaSetMonitor::SetState::scheduleWorkAt(Date_t when, Callback&& cb) const { - auto wrappedCallback = [cb = std::forward<Callback>(cb), - anchor = shared_from_this()](const CallbackArgs& cbArgs) mutable { - if (ErrorCodes::isCancelationError(cbArgs.status)) { - // Do no more work if we're removed or canceled - return; - } - invariant(cbArgs.status); - - stdx::lock_guard lk(anchor->mutex); - if (anchor->isDropped) { - return; - } - - cb(cbArgs); - }; - return executor->scheduleWorkAt(std::move(when), std::move(wrappedCallback)); -} - -void ReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy strategy) { - // Reschedule the refresh - - if (!executor || isMocked) { - // Without an executor, we can't do refreshes -- we're in a test - return; - } - - if (isDropped) { // already removed so no need to refresh - LOGV2_DEBUG(20162, - 1, - "Stopping refresh for replica set {name} because it's removed", - "name"_attr = name); - return; - } - - Milliseconds period = refreshPeriod; - if (isExpedited) { - period = std::min<Milliseconds>(period, kExpeditedRefreshPeriod); - } - - auto currentTime = now(); - auto possibleNextScanTime = currentTime + period; - if (refresherHandle && // - (strategy == SchedulingStrategy::kKeepEarlyScan) && // - (nextScanTime > currentTime) && // - (possibleNextScanTime >= nextScanTime)) { - // If the next scan would be sooner than our desired, why cancel? - return; - } - - // Cancel out the last refresh - if (auto currentHandle = std::exchange(refresherHandle, {})) { - executor->cancel(currentHandle); - } - - nextScanTime = possibleNextScanTime; - LOGV2_DEBUG(20163, - 1, - "Next replica set scan scheduled for {nextScanTime}", - "nextScanTime"_attr = nextScanTime); - auto swHandle = scheduleWorkAt(nextScanTime, [this](const CallbackArgs& cbArgs) { - if (cbArgs.myHandle != refresherHandle) - return; // We've been replaced! - - // It is possible that a waiter will have already expired by the point of this rescan. - // Thus we notify here to trigger that logic. - notify(); - - _ensureScanInProgress(shared_from_this()); - - // And now we set up the next one - rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan); - }); - - if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) { - LOGV2_DEBUG(20164, - 1, - "Cant schedule refresh for {name}. Executor shutdown in progress", - "name"_attr = name); - return; - } - - if (!swHandle.isOK()) { - LOGV2_FATAL(20184, - "Can't continue refresh for replica set {name} due to {swHandle_getStatus}", - "name"_attr = name, - "swHandle_getStatus"_attr = redact(swHandle.getStatus())); - fassertFailed(40140); - } - - refresherHandle = std::move(swHandle.getValue()); -} - -SemiFuture<HostAndPort> ReplicaSetMonitor::getHostOrRefresh(const ReadPreferenceSetting& criteria, - Milliseconds maxWait) { - return _getHostsOrRefresh(criteria, maxWait) - .then([](const auto& hosts) { - invariant(hosts.size()); - return hosts[0]; - }) - .semi(); -} - -SemiFuture<std::vector<HostAndPort>> ReplicaSetMonitor::getHostsOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { - return _getHostsOrRefresh(criteria, maxWait).semi(); -} - -Future<std::vector<HostAndPort>> ReplicaSetMonitor::_getHostsOrRefresh( - const ReadPreferenceSetting& criteria, Milliseconds maxWait) { - - stdx::lock_guard<Latch> lk(_state->mutex); - if (_state->isDropped) { - return Status(ErrorCodes::ReplicaSetMonitorRemoved, - str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"); - } - - auto out = _state->getMatchingHosts(criteria); - if (!out.empty()) - return {std::move(out)}; - - // Fail early without doing any more work if the timeout has already expired. - if (maxWait <= Milliseconds(0)) - return _state->makeUnsatisfedReadPrefError(criteria); - - // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is - // dealing with maxWait. - auto pf = makePromiseFuture<decltype(out)>(); - _state->waiters.emplace_back( - SetState::Waiter{_state->now() + maxWait, criteria, std::move(pf.promise)}); - - // This must go after we set up the wait state to correctly handle unittests using - // MockReplicaSet. - _ensureScanInProgress(_state); - - // Switch to expedited scanning. - _state->isExpedited = true; - _state->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan); - - return std::move(pf.future); -} -HostAndPort ReplicaSetMonitor::getMasterOrUassert() { - return getHostOrRefresh(kPrimaryOnlyReadPreference).get(); -} - -void ReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) { - stdx::lock_guard<Latch> lk(_state->mutex); - Node* node = _state->findNode(host); - if (node) - node->markFailed(status); - if (kDebugBuild) - _state->checkInvariants(); -} - -bool ReplicaSetMonitor::isPrimary(const HostAndPort& host) const { - stdx::lock_guard<Latch> lk(_state->mutex); - Node* node = _state->findNode(host); - return node ? node->isMaster : false; -} - -bool ReplicaSetMonitor::isHostUp(const HostAndPort& host) const { - stdx::lock_guard<Latch> lk(_state->mutex); - Node* node = _state->findNode(host); - return node ? node->isUp : false; -} - -int ReplicaSetMonitor::getMinWireVersion() const { - stdx::lock_guard<Latch> lk(_state->mutex); - int minVersion = 0; - for (const auto& host : _state->nodes) { - if (host.isUp) { - minVersion = std::max(minVersion, host.minWireVersion); - } - } - - return minVersion; -} - -int ReplicaSetMonitor::getMaxWireVersion() const { - stdx::lock_guard<Latch> lk(_state->mutex); - int maxVersion = std::numeric_limits<int>::max(); - for (const auto& host : _state->nodes) { - if (host.isUp) { - maxVersion = std::min(maxVersion, host.maxWireVersion); - } - } - - return maxVersion; -} - -std::string ReplicaSetMonitor::getName() const { - // name is const so don't need to lock - return _state->name; -} - -std::string ReplicaSetMonitor::getServerAddress() const { - stdx::lock_guard<Latch> lk(_state->mutex); - // We return our setUri until first confirmation - return _state->seedConnStr.isValid() ? _state->seedConnStr.toString() - : _state->setUri.connectionString().toString(); -} - -const MongoURI& ReplicaSetMonitor::getOriginalUri() const { - // setUri is const so no need to lock. - return _state->setUri; -} - -bool ReplicaSetMonitor::contains(const HostAndPort& host) const { - stdx::lock_guard<Latch> lk(_state->mutex); - return _state->seedNodes.count(host); -} - shared_ptr<ReplicaSetMonitor> ReplicaSetMonitor::createIfNeeded(const string& name, const set<HostAndPort>& servers) { return ReplicaSetMonitorManager::get()->getOrCreateMonitor( @@ -458,40 +94,6 @@ ReplicaSetChangeNotifier& ReplicaSetMonitor::getNotifier() { return ReplicaSetMonitorManager::get()->getNotifier(); } -// TODO move to correct order with non-statics before pushing -void ReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const { - stdx::lock_guard<Latch> lk(_state->mutex); - - BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName())); - if (forFTDC) { - for (size_t i = 0; i < _state->nodes.size(); i++) { - const Node& node = _state->nodes[i]; - monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node)); - } - return; - } - - // NOTE: the format here must be consistent for backwards compatibility - BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts")); - for (size_t i = 0; i < _state->nodes.size(); i++) { - const Node& node = _state->nodes[i]; - - BSONObjBuilder builder; - builder.append("addr", node.host.toString()); - builder.append("ok", node.isUp); - builder.append("ismaster", node.isMaster); // intentionally not camelCase - builder.append("hidden", false); // we don't keep hidden nodes in the set - builder.append("secondary", node.isUp && !node.isMaster); - builder.append("pingTimeMillis", pingTimeMillis(node)); - - if (!node.tags.isEmpty()) { - builder.append("tags", node.tags); - } - - hosts.append(builder.obj()); - } -} - void ReplicaSetMonitor::shutdown() { ReplicaSetMonitorManager::get()->shutdown(); } @@ -500,1041 +102,15 @@ void ReplicaSetMonitor::cleanup() { ReplicaSetMonitorManager::get()->removeAllMonitors(); } -void ReplicaSetMonitor::disableRefreshRetries_forTest() { - areRefreshRetriesDisabledForTest.store(true); -} - -bool ReplicaSetMonitor::isKnownToHaveGoodPrimary() const { - stdx::lock_guard<Latch> lk(_state->mutex); - - for (const auto& node : _state->nodes) { - if (node.isMaster) { - return true; - } - } - - return false; -} - -void ReplicaSetMonitor::runScanForMockReplicaSet() { - stdx::lock_guard<Latch> lk(_state->mutex); - _ensureScanInProgress(_state); - - // This function should only be called from tests using MockReplicaSet and they should use the - // synchronous path to complete before returning. - invariant(_state->currentScan == nullptr); -} - -void ReplicaSetMonitor::_ensureScanInProgress(const SetStatePtr& state) { - Refresher(state).scheduleNetworkRequests(); -} - -Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) { - if (_scan) { - _set->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan); - _scan->retryAllTriedHosts(_set->rand); - return; // participate in in-progress scan - } - - startNewScan(); -} - -void Refresher::scheduleNetworkRequests() { - for (auto ns = getNextStep(); ns.step == NextStep::CONTACT_HOST; ns = getNextStep()) { - if (!_set->executor || _set->isMocked) { - // If we're mocked, just schedule an isMaster - scheduleIsMaster(ns.host); - continue; - } - - // cancel any scheduled isMaster calls that haven't yet been called - Node* node = _set->findOrCreateNode(ns.host); - if (auto handle = std::exchange(node->scheduledIsMasterHandle, {})) { - _set->executor->cancel(handle); - } - - // ensure that the call to isMaster is scheduled at most every 500ms - auto swHandle = - _set->scheduleWorkAt(node->nextPossibleIsMasterCall, - [*this, host = ns.host](const CallbackArgs& cbArgs) mutable { - scheduleIsMaster(host); - }); - - if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) { - _scan->markHostsToScanAsTried(); - break; - } - - if (!swHandle.isOK()) { - LOGV2_FATAL( - 20185, - "Can't continue scan for replica set {set_name} due to {swHandle_getStatus}", - "set_name"_attr = _set->name, - "swHandle_getStatus"_attr = redact(swHandle.getStatus())); - fassertFailed(31176); - } - - node->scheduledIsMasterHandle = uassertStatusOK(std::move(swHandle)); - } - - if (kDebugBuild) - _set->checkInvariants(); -} - -void Refresher::scheduleIsMaster(const HostAndPort& host) { - if (_set->isMocked) { - // MockReplicaSet only works with DBClient-style access since it injects itself into the - // ScopedDbConnection pool connection creation. - try { - ScopedDbConnection conn(ConnectionString(host), kCheckTimeout.count()); - - auto timer = Timer(); - auto reply = BSONObj(); - bool ignoredOutParam = false; - conn->isMaster(ignoredOutParam, &reply); - conn.done(); // return to pool on success. - - receivedIsMaster(host, timer.micros(), reply); - } catch (DBException& ex) { - failedHost(host, ex.toStatus()); - } - - return; - } - - auto request = executor::RemoteCommandRequest( - host, "admin", BSON("isMaster" << 1), nullptr, kCheckTimeout); - request.sslMode = _set->setUri.getSSLMode(); - auto status = - _set->executor - ->scheduleRemoteCommand( - std::move(request), - [copy = *this, host, timer = Timer()]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { - stdx::lock_guard lk(copy._set->mutex); - // Ignore the reply and return if we are no longer the current scan. This might - // happen if it was decided that the host we were contacting isn't part of the - // set. - if (copy._scan != copy._set->currentScan) { - return; - } - - // ensure that isMaster calls occur at most 500ms after the previous call ended - if (auto node = copy._set->findNode(host)) { - node->nextPossibleIsMasterCall = - copy._set->executor->now() + Milliseconds(500); - } - - if (result.response.isOK()) { - // Not using result.response.elapsedMillis because higher precision is - // useful for computing the rolling average. - copy.receivedIsMaster(host, timer.micros(), result.response.data); - } else { - copy.failedHost(host, result.response.status); - } - - // This reply may have discovered new hosts to contact so we need to schedule - // them. - copy.scheduleNetworkRequests(); - }) - .getStatus(); - - if (!status.isOK()) { - failedHost(host, status); - // This is only called from scheduleNetworkRequests() which will still be looping, so we - // don't need to call it here after updating the state. - } -} - -Refresher::NextStep Refresher::getNextStep() { - // No longer the current scan - if (_scan != _set->currentScan) { - return NextStep(NextStep::DONE); - } - - // Wait for all dispatched hosts to return before trying any fallback hosts. - if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) { - return NextStep(NextStep::WAIT); - } - - // If we haven't yet found a master, try contacting unconfirmed hosts - if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) { - _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand); - _scan->possibleNodes.clear(); - } - - if (_scan->hostsToScan.empty()) { - // We've tried all hosts we can, so nothing more to do in this round. - if (!_scan->foundUpMaster) { - LOGV2_WARNING( - 20181, "Unable to reach primary for set {set_name}", "set_name"_attr = _set->name); - - // Since we've talked to everyone we could but still didn't find a primary, we - // do the best we can, and assume all unconfirmedReplies are actually from nodes - // in the set (we've already confirmed that they think they are). This is - // important since it allows us to bootstrap to a usable state even if we are - // unable to talk to a master while starting up. As soon as we are able to - // contact a master, we will remove any nodes that it doesn't think are part of - // the set, undoing the damage we cause here. - - // NOTE: we don't modify seedNodes or notify about set membership change in this - // case since it hasn't been confirmed by a master. - for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); - it != _scan->unconfirmedReplies.end(); - ++it) { - _set->findOrCreateNode(it->first)->update(it->second); - } - - auto connStr = _set->possibleConnectionString(); - if (connStr != _set->workingConnStr) { - _set->workingConnStr = std::move(connStr); - _set->notifier->onPossibleSet(_set->workingConnStr); - } - - // If at some point we care about lacking a primary, on it here - _set->lastSeenMaster = {}; - } - - if (_scan->foundAnyUpNodes) { - _set->consecutiveFailedScans = 0; - } else { - auto nScans = _set->consecutiveFailedScans++; - if (nScans <= 10 || nScans % 10 == 0) { - LOGV2(20165, - "Cannot reach any nodes for set {set_name}. Please check network " - "connectivity and the status of the set. This has happened for " - "{set_consecutiveFailedScans} checks in a row.", - "set_name"_attr = _set->name, - "set_consecutiveFailedScans"_attr = _set->consecutiveFailedScans); - } - } - - // Makes sure all other Refreshers in this round return DONE - _set->currentScan.reset(); - _set->notify(); - - LOGV2_DEBUG(20166, - 1, - "Refreshing replica set {set_name} took {scan_timer_millis}ms", - "set_name"_attr = _set->name, - "scan_timer_millis"_attr = _scan->timer.millis()); - - return NextStep(NextStep::DONE); - } - - // Pop and return the next hostToScan. - HostAndPort host = _scan->hostsToScan.front(); - _scan->hostsToScan.pop_front(); - _scan->waitingFor.insert(host); - _scan->triedHosts.insert(host); - - return NextStep(NextStep::CONTACT_HOST, host); -} - -void Refresher::receivedIsMaster(const HostAndPort& from, - int64_t latencyMicros, - const BSONObj& replyObj) { - _scan->waitingFor.erase(from); - - const IsMasterReply reply(from, latencyMicros, replyObj); - - // Handle various failure cases - if (!reply.ok) { - failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"}); - return; - } - - if (reply.setName != _set->name) { - if (reply.raw["isreplicaset"].trueValue()) { - // The reply came from a node in the state referred to as RSGhost in the SDAM - // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event, - // if a reply from a ghost offers a list of possible other members of the replica set, - // and if this refresher has yet to find the replica set master, we add hosts listed in - // the reply to the list of possible replica set members. - if (!_scan->foundUpMaster) { - _scan->possibleNodes.insert(reply.members.begin(), reply.members.end()); - } - } else { - LOGV2_ERROR(20183, - "replset name mismatch: expected \"{set_name}\", but remote node {from} " - "has replset name \"{reply_setName}\", ismaster: {replyObj}", - "set_name"_attr = _set->name, - "from"_attr = from, - "reply_setName"_attr = reply.setName, - "replyObj"_attr = replyObj); - } - - failedHost(from, - {ErrorCodes::InconsistentReplicaSetNames, - str::stream() << "Target replica set name " << reply.setName - << " does not match the monitored set name " << _set->name}); - return; - } - - if (reply.isMaster) { - Status status = receivedIsMasterFromMaster(from, reply); - if (!status.isOK()) { - failedHost(from, status); - return; - } - } - - if (_scan->foundUpMaster) { - // We only update a Node if a master has confirmed it is in the set. - _set->updateNodeIfInNodes(reply); - } else { - // Populate possibleNodes. - _scan->possibleNodes.insert(reply.members.begin(), reply.members.end()); - _scan->unconfirmedReplies[from] = reply; - } - - // _set->nodes may still not have any nodes with isUp==true, but we have at least found a - // connectible host that is that claims to be in the set. - _scan->foundAnyUpNodes = true; - - _set->notify(); - - if (kDebugBuild) - _set->checkInvariants(); -} - -void Refresher::failedHost(const HostAndPort& host, const Status& status) { - _scan->waitingFor.erase(host); - - Node* node = _set->findNode(host); - if (node) - node->markFailed(status); - - if (_scan->waitingFor.empty()) { - // If this was the last host that needed a response, we should notify the SetState so that - // we can fail any waiters that have timed out. - _set->notify(); - } -} - -void Refresher::startNewScan() { - // The heuristics we use in deciding the order to contact hosts are designed to find a - // master as quickly as possible. This is because we can't use any hosts we find until - // we either get the latest set of members from a master or talk to all possible hosts - // without finding a master. - - // TODO It might make sense to check down nodes first if the last seen master is still - // marked as up. - - _scan = std::make_shared<ScanState>(); - _set->currentScan = _scan; - - int upNodes = 0; - for (Nodes::const_iterator it(_set->nodes.begin()), end(_set->nodes.end()); it != end; ++it) { - if (it->isUp) { - // _scan the nodes we think are up first - _scan->hostsToScan.push_front(it->host); - upNodes++; - } else { - _scan->hostsToScan.push_back(it->host); - } - } - - // shuffle the queue, but keep "up" nodes at the front - std::shuffle( - _scan->hostsToScan.begin(), _scan->hostsToScan.begin() + upNodes, _set->rand.urbg()); - std::shuffle(_scan->hostsToScan.begin() + upNodes, _scan->hostsToScan.end(), _set->rand.urbg()); - - if (!_set->lastSeenMaster.empty()) { - // move lastSeenMaster to front of queue - std::stable_partition( - _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(_set->lastSeenMaster)); - } -} - -Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) { - invariant(reply.isMaster); - - // Reject if config version is older. This is for backwards compatibility with nodes in pv0 - // since they don't have the same ordering with pv1 electionId. - if (reply.configVersion < _set->configVersion) { - return { - ErrorCodes::NotMaster, - str::stream() << "Node " << from << " believes it is primary, but its config version " - << reply.configVersion << " is older than the most recent config version " - << _set->configVersion}; - } - - if (reply.electionId.isSet()) { - // ElectionIds are only comparable if they are of the same protocol version. However, since - // isMaster has no protocol version field, we use the configVersion instead. This works - // because configVersion needs to be incremented whenever the protocol version is changed. - if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() && - _set->maxElectionId.compare(reply.electionId) > 0) { - return { - ErrorCodes::NotMaster, - str::stream() << "Node " << from << " believes it is primary, but its election id " - << reply.electionId << " is older than the most recent election id " - << _set->maxElectionId}; - } - - _set->maxElectionId = reply.electionId; - } - - _set->configVersion = reply.configVersion; - - // Mark all nodes as not master. We will mark ourself as master before releasing the lock. - // NOTE: we use a "last-wins" policy if multiple hosts claim to be master. - for (size_t i = 0; i < _set->nodes.size(); i++) { - _set->nodes[i].isMaster = false; - } - - // Check if the master agrees with our current list of nodes. - // REMINDER: both _set->nodes and reply.members are sorted. - if (_set->nodes.size() != reply.members.size() || - !std::equal(_set->nodes.begin(), _set->nodes.end(), reply.members.begin(), hostsEqual)) { - LOGV2_DEBUG(20167, - 2, - "Adjusting nodes in our view of replica set {set_name} based on master reply: " - "{reply_raw}", - "set_name"_attr = _set->name, - "reply_raw"_attr = redact(reply.raw)); - - // remove non-members from _set->nodes - _set->nodes.erase( - std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.members)), - _set->nodes.end()); - - // add new members to _set->nodes - for (auto& host : reply.members) { - _set->findOrCreateNode(host); - } - - // replace hostToScan queue with untried normal hosts. can both add and remove - // hosts from the queue. - _scan->hostsToScan.clear(); - _scan->enqueAllUntriedHosts(reply.members, _set->rand); - - if (!_scan->waitingFor.empty()) { - // make sure we don't wait for any hosts that aren't considered members - std::set<HostAndPort> newWaitingFor; - std::set_intersection(reply.members.begin(), - reply.members.end(), - _scan->waitingFor.begin(), - _scan->waitingFor.end(), - std::inserter(newWaitingFor, newWaitingFor.end())); - _scan->waitingFor.swap(newWaitingFor); - } - } - - bool changedHosts = reply.members != _set->seedNodes; - bool changedPrimary = reply.host != _set->lastSeenMaster; - if (changedHosts || changedPrimary) { - ++_set->seedGen; - _set->seedNodes = reply.members; - _set->seedConnStr = _set->confirmedConnectionString(); - - // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare - // and we want to record our changes - LOGV2(20168, - "Confirmed replica set for {set_name} is {set_seedConnStr}", - "set_name"_attr = _set->name, - "set_seedConnStr"_attr = _set->seedConnStr); - - _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host, reply.passives); - } - - // Update our working string - _set->workingConnStr = _set->seedConnStr; - - // Update other nodes's information based on replies we've already seen - for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); - it != _scan->unconfirmedReplies.end(); - ++it) { - // this ignores replies from hosts not in _set->nodes (as modified above) - _set->updateNodeIfInNodes(it->second); - } - _scan->unconfirmedReplies.clear(); - - _scan->foundUpMaster = true; - _set->lastSeenMaster = reply.host; - - return Status::OK(); -} - -void IsMasterReply::parse(const BSONObj& obj) { - try { - raw = obj.getOwned(); // don't use obj again after this line - - ok = raw["ok"].trueValue(); - if (!ok) - return; - - setName = raw["setName"].str(); - hidden = raw["hidden"].trueValue(); - secondary = raw["secondary"].trueValue(); - - minWireVersion = raw["minWireVersion"].numberInt(); - maxWireVersion = raw["maxWireVersion"].numberInt(); - - // hidden nodes can't be master, even if they claim to be. - isMaster = !hidden && raw["ismaster"].trueValue(); - - if (isMaster && raw.hasField("electionId")) { - electionId = raw["electionId"].OID(); - } - - configVersion = raw["setVersion"].numberInt(); - - const string primaryString = raw["primary"].str(); - primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString); - - // both hosts and passives, but not arbiters, are considered "normal hosts" - members.clear(); - BSONForEach(host, raw.getObjectField("hosts")) { - members.insert(HostAndPort(host.String())); - } - BSONForEach(host, raw.getObjectField("passives")) { - members.insert(HostAndPort(host.String())); - passives.insert(HostAndPort(host.String())); - } - - tags = raw.getObjectField("tags"); - BSONObj lastWriteField = raw.getObjectField("lastWrite"); - if (!lastWriteField.isEmpty()) { - if (auto lastWrite = lastWriteField["lastWriteDate"]) { - lastWriteDate = lastWrite.date(); - } - - uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime)); - } - } catch (const std::exception& e) { - ok = false; - LOGV2(20169, - "exception while parsing isMaster reply: {e_what} {obj}", - "e_what"_attr = e.what(), - "obj"_attr = obj); - } -} - -Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {} - -void Node::markFailed(const Status& status) { - if (isUp) { - LOGV2(20170, - "Marking host {host} as failed{causedBy_status}", - "host"_attr = host, - "causedBy_status"_attr = causedBy(redact(status))); - - isUp = false; - } - - isMaster = false; -} - -bool Node::matches(const ReadPreference pref) const { - if (!isUp) { - LOGV2_DEBUG(20171, 3, "Host {host} is not up", "host"_attr = host); - return false; - } - - LOGV2_DEBUG(20172, - 3, - "Host {host} is {isMaster_primary_not_primary}", - "host"_attr = host, - "isMaster_primary_not_primary"_attr = (isMaster ? "primary" : "not primary")); - if (pref == ReadPreference::PrimaryOnly) { - return isMaster; - } - - if (pref == ReadPreference::SecondaryOnly) { - return !isMaster; - } - - return true; -} - -bool Node::matches(const BSONObj& tag) const { - BSONForEach(tagCriteria, tag) { - if (SimpleBSONElementComparator::kInstance.evaluate( - this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) { - return false; - } - } - - return true; -} - -void Node::update(const IsMasterReply& reply) { - invariant(host == reply.host); - invariant(reply.ok); - - LOGV2_DEBUG(20173, - 3, - "Updating host {host} based on ismaster reply: {reply_raw}", - "host"_attr = host, - "reply_raw"_attr = reply.raw); - - // Nodes that are hidden or neither master or secondary are considered down since we can't - // send any operations to them. - isUp = !reply.hidden && (reply.isMaster || reply.secondary); - isMaster = reply.isMaster; - - minWireVersion = reply.minWireVersion; - maxWireVersion = reply.maxWireVersion; - - // save a copy if unchanged - if (!tags.binaryEqual(reply.tags)) - tags = reply.tags.getOwned(); - - if (reply.latencyMicros >= 0) { // TODO upper bound? - if (latencyMicros == unknownLatency) { - latencyMicros = reply.latencyMicros; - } else { - // update latency with smoothed moving average (1/4th the delta) - latencyMicros += (reply.latencyMicros - latencyMicros) / 4; - } - } - - LOGV2_DEBUG(20174, - 3, - "Updating {host} lastWriteDate to {reply_lastWriteDate}", - "host"_attr = host, - "reply_lastWriteDate"_attr = reply.lastWriteDate); - lastWriteDate = reply.lastWriteDate; - - LOGV2_DEBUG(20175, - 3, - "Updating {host} opTime to {reply_opTime}", - "host"_attr = host, - "reply_opTime"_attr = reply.opTime); - opTime = reply.opTime; - lastWriteDateUpdateTime = Date_t::now(); -} - -SetState::SetState(const MongoURI& uri, - ReplicaSetChangeNotifier* notifier, - executor::TaskExecutor* executor) - : setUri(std::move(uri)), - name(setUri.getSetName()), - notifier(notifier), - executor(executor), - seedNodes(setUri.getServers().begin(), setUri.getServers().end()), - latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)), - rand(std::random_device()()), - refreshPeriod(getDefaultRefreshPeriod()) { - uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty()); - - if (name.empty()) - LOGV2_WARNING(20182, - "Replica set name empty, first node: {seedNodes_begin}", - "seedNodes_begin"_attr = *(seedNodes.begin())); - - // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a - // scan until we start a scan and either find a master or contact all hosts without finding - // one. - // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to - // sort nodes after this loop. - for (auto&& addr : seedNodes) { - nodes.push_back(Node(addr)); - - if (addr.host()[0] == '$') { - invariant(isMocked || &addr == &*seedNodes.begin()); // Can't mix and match. - isMocked = true; - } else { - invariant(!isMocked); // Can't mix and match. - } - } - - if (kDebugBuild) - checkInvariants(); -} - -HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const { - auto hosts = getMatchingHosts(criteria); - - if (hosts.empty()) { - return HostAndPort(); - } - - return hosts[0]; -} - -std::vector<HostAndPort> SetState::getMatchingHosts(const ReadPreferenceSetting& criteria) const { - switch (criteria.pref) { - // "Prefered" read preferences are defined in terms of other preferences - case ReadPreference::PrimaryPreferred: { - std::vector<HostAndPort> out = - getMatchingHosts(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); - // NOTE: the spec says we should use the primary even if tags don't match - if (!out.empty()) - return out; - return getMatchingHosts(ReadPreferenceSetting( - ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds)); - } - - case ReadPreference::SecondaryPreferred: { - std::vector<HostAndPort> out = getMatchingHosts(ReadPreferenceSetting( - ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds)); - if (!out.empty()) - return out; - // NOTE: the spec says we should use the primary even if tags don't match - return getMatchingHosts( - ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); - } - - case ReadPreference::PrimaryOnly: { - // NOTE: isMaster implies isUp - Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster); - if (it == nodes.end()) - return {}; - return {it->host}; - } - - // The difference between these is handled by Node::matches - case ReadPreference::SecondaryOnly: - case ReadPreference::Nearest: { - std::function<bool(const Node&)> matchNode = [](const Node& node) -> bool { - return true; - }; - // build comparator - if (criteria.maxStalenessSeconds.count()) { - auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster); - if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) { - auto writeDateCmp = [](const Node* a, const Node* b) -> bool { - return a->lastWriteDate < b->lastWriteDate; - }; - // use only non failed nodes - std::vector<const Node*> upNodes; - for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) { - if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) { - upNodes.push_back(&(*nodeIt)); - } - } - auto latestSecNode = - std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp); - if (latestSecNode == upNodes.end()) { - matchNode = [](const Node& node) -> bool { return false; }; - } else { - Date_t maxWriteTime = (*latestSecNode)->lastWriteDate; - matchNode = [=](const Node& node) -> bool { - return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) + - refreshPeriod <= - criteria.maxStalenessSeconds; - }; - } - } else { - Seconds primaryStaleness = duration_cast<Seconds>( - masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate); - matchNode = [=](const Node& node) -> bool { - return duration_cast<Seconds>(node.lastWriteDateUpdateTime - - node.lastWriteDate) - - primaryStaleness + refreshPeriod <= - criteria.maxStalenessSeconds; - }; - } - } - - std::vector<const Node*> allMatchingNodes; - BSONForEach(tagElem, criteria.tags.getTagBSON()) { - uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj()); - BSONObj tag = tagElem.Obj(); - - std::vector<const Node*> matchingNodes; - for (size_t i = 0; i < nodes.size(); i++) { - if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) && - matchNode(nodes[i])) { - matchingNodes.push_back(&nodes[i]); - } - } - - // Only consider nodes that satisfy the minOpTime - if (!criteria.minOpTime.isNull()) { - std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater); - for (size_t i = 0; i < matchingNodes.size(); i++) { - if (matchingNodes[i]->opTime < criteria.minOpTime) { - if (i == 0) { - // If no nodes satisfy the minOpTime criteria, we ignore the - // minOpTime requirement. - break; - } - matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end()); - break; - } - } - } - - allMatchingNodes.insert( - allMatchingNodes.end(), matchingNodes.begin(), matchingNodes.end()); - } - - // don't do more complicated selection if not needed - if (allMatchingNodes.empty()) { - return {}; - } - if (allMatchingNodes.size() == 1) { - return {allMatchingNodes.front()->host}; - } - - // If there are multiple nodes satisfying the minOpTime, next order by latency - // and don't consider hosts further than a threshold from the closest. - std::sort(allMatchingNodes.begin(), allMatchingNodes.end(), compareLatencies); - for (size_t i = 1; i < allMatchingNodes.size(); i++) { - int64_t distance = - allMatchingNodes[i]->latencyMicros - allMatchingNodes[0]->latencyMicros; - if (distance >= latencyThresholdMicros) { - // this node and all remaining ones are too far away - allMatchingNodes.erase(allMatchingNodes.begin() + i, allMatchingNodes.end()); - break; - } - } - - std::vector<HostAndPort> hosts; - std::transform(allMatchingNodes.begin(), - allMatchingNodes.end(), - std::back_inserter(hosts), - [](const auto& node) { return node->host; }); - - // Note that the host list is only deterministic (or random) for the first node. - // The rest of the list is in matchingNodes order (latency) with one element swapped - // for the first element. - if (auto bestHostIdx = ReplicaSetMonitor::useDeterministicHostSelection - ? roundRobin++ % hosts.size() - : rand.nextInt32(hosts.size())) { - using std::swap; - swap(hosts[0], hosts[bestHostIdx]); - } - - return hosts; - } - - default: - uassert(16337, "Unknown read preference", false); - break; - } -} - -Node* SetState::findNode(const HostAndPort& host) { - const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); - if (it == nodes.end() || it->host != host) - return nullptr; - - return &(*it); -} - -Node* SetState::findOrCreateNode(const HostAndPort& host) { - // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class - // must function correctly even with more nodes). If we lift that restriction, we may need - // to consider alternate algorithms. - Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); - if (it == nodes.end() || it->host != host) { - LOGV2_DEBUG(20176, - 2, - "Adding node {host} to our view of replica set {name}", - "host"_attr = host, - "name"_attr = name); - it = nodes.insert(it, Node(host)); - } - return &(*it); -} - -void SetState::updateNodeIfInNodes(const IsMasterReply& reply) { - Node* node = findNode(reply.host); - if (!node) { - LOGV2_DEBUG(20177, - 2, - "Skipping application of ismaster reply from {reply_host} since it isn't a " - "confirmed member of set {name}", - "reply_host"_attr = reply.host, - "name"_attr = name); - return; - } - - node->update(reply); -} - -ConnectionString SetState::confirmedConnectionString() const { - std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes)); - - return ConnectionString::forReplicaSet(name, std::move(hosts)); -} - -ConnectionString SetState::possibleConnectionString() const { - std::vector<HostAndPort> hosts; - hosts.reserve(nodes.size()); - - for (auto& node : nodes) { - hosts.push_back(node.host); - } - - return ConnectionString::forReplicaSet(name, std::move(hosts)); -} - -void SetState::notify() { - if (!waiters.size()) { - return; - } - - const auto cachedNow = now(); - auto shouldQuickFail = areRefreshRetriesDisabledForTest.load() && !currentScan; - - for (auto it = waiters.begin(); it != waiters.end();) { - if (isDropped) { - it->promise.setError({ErrorCodes::ShutdownInProgress, - str::stream() << "ReplicaSetMonitor is shutting down"}); - waiters.erase(it++); - continue; - } - - auto match = getMatchingHosts(it->criteria); - if (!match.empty()) { - // match; - it->promise.emplaceValue(std::move(match)); - waiters.erase(it++); - } else if (it->deadline <= cachedNow) { - LOGV2_DEBUG( - 20178, - 1, - "Unable to statisfy read preference {it_criteria} by deadline {it_deadline}", - "it_criteria"_attr = it->criteria, - "it_deadline"_attr = it->deadline); - it->promise.setError(makeUnsatisfedReadPrefError(it->criteria)); - waiters.erase(it++); - } else if (shouldQuickFail) { - LOGV2_DEBUG(20179, 1, "Unable to statisfy read preference because tests fail quickly"); - it->promise.setError(makeUnsatisfedReadPrefError(it->criteria)); - waiters.erase(it++); - } else { - it++; - } - } - - if (waiters.empty()) { - // No current waiters so we can stop the expedited scanning. - isExpedited = false; - rescheduleRefresh(SchedulingStrategy::kCancelPreviousScan); - } -} - -Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const { - return Status(ErrorCodes::FailedToSatisfyReadPreference, - str::stream() << "Could not find host matching read preference " - << criteria.toString() << " for set " << name); -} - -void SetState::init() { - rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan); - notifier->onFoundSet(name); -} - -void SetState::drop() { - if (std::exchange(isDropped, true)) { - // If a SetState calls drop() from destruction after the RSMM calls shutdown(), then the - // RSMM's executor may no longer exist. Thus, only drop once. - return; - } - - currentScan.reset(); - notify(); - - if (auto handle = std::exchange(refresherHandle, {})) { - // Cancel our refresh on the way out - executor->cancel(handle); - } - - for (auto& node : nodes) { - if (auto handle = std::exchange(node.scheduledIsMasterHandle, {})) { - // Cancel any isMasters we had scheduled - executor->cancel(handle); - } - } - - // No point in notifying if we never started - if (workingConnStr.isValid()) { - notifier->onDroppedSet(name); - } -} - -void SetState::checkInvariants() const { - bool foundMaster = false; - for (size_t i = 0; i < nodes.size(); i++) { - // no empty hosts - invariant(!nodes[i].host.empty()); - - if (nodes[i].isMaster) { - // masters must be up - invariant(nodes[i].isUp); - - // at most one master - invariant(!foundMaster); - foundMaster = true; - - // if we have a master it should be the same as lastSeenMaster - invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster); - } - - // should never end up with negative latencies - invariant(nodes[i].latencyMicros >= 0); - - // nodes must be sorted by host with no-dupes - invariant(i == 0 || (nodes[i - 1].host < nodes[i].host)); - } - - // nodes should be a (non-strict) superset of the seedNodes - invariant(std::includes( - nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts)); - - if (currentScan) { - // hostsToScan can't have dups or hosts already in triedHosts. - std::set<HostAndPort> cantSee = currentScan->triedHosts; - for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin(); - it != currentScan->hostsToScan.end(); - ++it) { - invariant(!cantSee.count(*it)); - cantSee.insert(*it); // make sure we don't see this again - } - - // We should only be waitingFor hosts that are in triedHosts - invariant(std::includes(currentScan->triedHosts.begin(), - currentScan->triedHosts.end(), - currentScan->waitingFor.begin(), - currentScan->waitingFor.end())); - - // We should only have unconfirmedReplies if we haven't found a master yet - invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty()); - } -} - -template <typename Container> -void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) { - invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue. - - // no std::copy_if before c++11 - for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end; - ++it) { - if (!triedHosts.count(*it)) { - hostsToScan.push_back(*it); - } - } - std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg()); -} +namespace { +AtomicWord<bool> refreshRetriesDisabledForTest{false}; +} // namespace -void ScanState::retryAllTriedHosts(PseudoRandom& rand) { - invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue. - // Move hosts that are in triedHosts but not in waitingFor from triedHosts to hostsToScan. - std::set_difference(triedHosts.begin(), - triedHosts.end(), - waitingFor.begin(), - waitingFor.end(), - std::inserter(hostsToScan, hostsToScan.end())); - std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg()); - triedHosts = waitingFor; +void ReplicaSetMonitor::disableRefreshRetries_forTest() { + refreshRetriesDisabledForTest.store(true); } -void ScanState::markHostsToScanAsTried() noexcept { - while (!hostsToScan.empty()) { - auto host = hostsToScan.front(); - hostsToScan.pop_front(); - /** - * Mark the popped host as tried to avoid deleting hosts in multiple points. - * This emulates the final effect of Refresher::getNextStep() on the set. - */ - triedHosts.insert(host); - } +bool ReplicaSetMonitor::areRefreshRetriesDisabledForTest() { + return refreshRetriesDisabledForTest.load(); } } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h index 44b9a2504d1..1fa5f737ae9 100644 --- a/src/mongo/client/replica_set_monitor.h +++ b/src/mongo/client/replica_set_monitor.h @@ -34,9 +34,9 @@ #include <set> #include <string> -#include "mongo/base/string_data.h" #include "mongo/client/mongo_uri.h" #include "mongo/client/replica_set_change_notifier.h" +#include "mongo/client/replica_set_monitor_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/util/concurrency/with_lock.h" #include "mongo/util/duration.h" @@ -44,136 +44,24 @@ #include "mongo/util/time_support.h" namespace mongo { - -class BSONObj; -class ReplicaSetMonitor; -class ReplicaSetMonitorTest; -struct ReadPreferenceSetting; -typedef std::shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr; - /** - * Holds state about a replica set and provides a means to refresh the local view. - * All methods perform the required synchronization to allow callers from multiple threads. + * An abstract class, defines the external interface for static ReplicaSetMonitor methods and + * provides a means to refresh the local view. + * A ReplicaSetMonitor holds a state about the replica set and provides a means to refresh the local + * view. All methods perform the required synchronization to allow callers from multiple threads. */ -class ReplicaSetMonitor { - ReplicaSetMonitor(const ReplicaSetMonitor&) = delete; - ReplicaSetMonitor& operator=(const ReplicaSetMonitor&) = delete; - +class ReplicaSetMonitor : public ReplicaSetMonitorInterface { public: - class Refresher; - - static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500); - static constexpr auto kCheckTimeout = Seconds(5); - - /** - * Initializes local state from a MongoURI. - */ - ReplicaSetMonitor(const MongoURI& uri); - - /** - * Schedules the initial refresh task into task executor. - */ - void init(); - - /** - * Ends any ongoing refreshes. - */ - void drop(); + virtual ~ReplicaSetMonitor() = default; /** - * Returns a host matching the given read preference or an error, if no host matches. - * - * @param readPref Read preference to match against - * @param maxWait If no host is readily available, which matches the specified read preference, - * wait for one to become available for up to the specified time and periodically refresh - * the view of the set. The call may return with an error earlier than the specified value, - * if none of the known hosts for the set are reachable within some number of attempts. - * Note that if a maxWait of 0ms is specified, this method may still attempt to contact - * every host in the replica set up to one time. - * - * Known errors are: - * FailedToSatisfyReadPreference, if node cannot be found, which matches the read preference. - */ - SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref, - Milliseconds maxWait = kDefaultFindHostTimeout); - - SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh( - const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout); - - /** - * Returns the host we think is the current master or uasserts. - * - * This is a thin wrapper around getHostOrRefresh so this will also refresh our view if we - * don't think there is a master at first. The main difference is that this will uassert - * rather than returning an empty HostAndPort. - */ - HostAndPort getMasterOrUassert(); - - /** - * Notifies this Monitor that a host has failed because of the specified error 'status' and - * should be considered down. + * Defaults to false, meaning that if multiple hosts meet a criteria we pick one at random. + * This is required by the replica set driver spec. Set this to true in tests that need host + * selection to be deterministic. * - * Call this when you get a connection error. If you get an error while trying to refresh our - * view of a host, call Refresher::failedHost instead because it bypasses taking the monitor's - * mutex. - */ - void failedHost(const HostAndPort& host, const Status& status); - - /** - * Returns true if this node is the master based ONLY on local data. Be careful, return may - * be stale. - */ - bool isPrimary(const HostAndPort& host) const; - - /** - * Returns true if host is part of this set and is considered up (meaning it can accept - * queries). - */ - bool isHostUp(const HostAndPort& host) const; - - /** - * Returns the minimum wire version supported across the replica set. - */ - int getMinWireVersion() const; - - /** - * Returns the maximum wire version supported across the replica set. - */ - int getMaxWireVersion() const; - - /** - * The name of the set. - */ - std::string getName() const; - - /** - * Returns a std::string with the format name/server1,server2. - * If name is empty, returns just comma-separated list of servers. - * It IS updated to reflect the current members of the set. - */ - std::string getServerAddress() const; - - /** - * Returns the URI that was used to construct this monitor. - * It IS NOT updated to reflect the current members of the set. - */ - const MongoURI& getOriginalUri() const; - - /** - * Is server part of this set? Uses only cached information. - */ - bool contains(const HostAndPort& server) const; - - /** - * Writes information about our cached view of the set to a BSONObjBuilder. If - * forFTDC, trim to minimize its size for full-time diagnostic data capture. - */ - void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const; - - /** - * Returns true if the monitor knows a usable primary from it's interal view. + * NOTE: Used by unit-tests only. */ - bool isKnownToHaveGoodPrimary() const; + static bool useDeterministicHostSelection; /** * Creates a new ReplicaSetMonitor, if it doesn't already exist. @@ -210,145 +98,13 @@ public: static void cleanup(); /** - * Use these to speed up tests by disabling the sleep-and-retry loops and cause errors to be - * reported immediately. - */ - static void disableRefreshRetries_forTest(); - - /** * Permanently stops all monitoring on replica sets. */ static void shutdown(); - /** - * Returns the refresh period that is given to all new SetStates. - */ - static Seconds getDefaultRefreshPeriod(); - - // - // internal types (defined in replica_set_monitor_internal.h) - // - - struct IsMasterReply; - struct ScanState; - struct SetState; - typedef std::shared_ptr<ScanState> ScanStatePtr; - typedef std::shared_ptr<SetState> SetStatePtr; - - /** - * Allows tests to set initial conditions and introspect the current state. - */ - explicit ReplicaSetMonitor(const SetStatePtr& initialState); - ~ReplicaSetMonitor(); - - /** - * The default timeout, which will be used for finding a replica set host if the caller does - * not explicitly specify it. - */ - static const Seconds kDefaultFindHostTimeout; - - /** - * Defaults to false, meaning that if multiple hosts meet a criteria we pick one at random. - * This is required by the replica set driver spec. Set this to true in tests that need host - * selection to be deterministic. - * - * NOTE: Used by unit-tests only. - */ - static bool useDeterministicHostSelection; - - /** - * This is for use in tests using MockReplicaSet to ensure that a full scan completes before - * continuing. - */ - void runScanForMockReplicaSet(); - -private: - Future<std::vector<HostAndPort>> _getHostsOrRefresh(const ReadPreferenceSetting& readPref, - Milliseconds maxWait); - /** - * If no scan is in-progress, this function is responsible for setting up a new scan. Otherwise, - * does nothing. - */ - static void _ensureScanInProgress(const SetStatePtr&); - - const SetStatePtr _state; -}; - - -/** - * Refreshes the local view of a replica set. - * - * All logic related to choosing the hosts to contact and updating the SetState based on replies - * lives in this class. Use of this class should always be guarded by SetState::mutex unless in - * single-threaded use by ReplicaSetMonitorTest. - */ -class ReplicaSetMonitor::Refresher { -public: - explicit Refresher(const SetStatePtr& setState); - - struct NextStep { - enum StepKind { - CONTACT_HOST, /// Contact the returned host - WAIT, /// Wait on condition variable and try again. - DONE, /// No more hosts to contact in this Refresh round - }; - - explicit NextStep(StepKind step, const HostAndPort& host = HostAndPort()) - : step(step), host(host) {} - - StepKind step; - HostAndPort host; - }; - - /** - * Returns the next step to take. - * - * By calling this, you promise to call receivedIsMaster or failedHost if the NextStep is - * CONTACT_HOST. - */ - NextStep getNextStep(); - - /** - * Call this if a host returned from getNextStep successfully replied to an isMaster call. - * Negative latencyMicros are ignored. - */ - void receivedIsMaster(const HostAndPort& from, int64_t latencyMicros, const BSONObj& reply); - - /** - * Call this if a host returned from getNextStep failed to reply to an isMaster call. - */ - void failedHost(const HostAndPort& host, const Status& status); - - /** - * Starts a new scan over the hosts in set. - */ - void startNewScan(); - - /** - * First, checks that the "reply" is not from a stale primary by comparing the electionId of - * "reply" to the maxElectionId recorded by the SetState and returns OK status if "reply" - * belongs to a non-stale primary. Otherwise returns a failed status. - * - * The 'from' parameter specifies the node from which the response is received. - * - * Updates _set and _scan based on set-membership information from a master. - * Applies _scan->unconfirmedReplies to confirmed nodes. - * Does not update this host's node in _set->nodes. - */ - Status receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply); - - /** - * Schedules isMaster requests to all hosts that currently need to be contacted. - * Does nothing if requests have already been sent to all known hosts. - */ - void scheduleNetworkRequests(); - - void scheduleIsMaster(const HostAndPort& host); + static void disableRefreshRetries_forTest(); -private: - // Both pointers are never NULL - SetStatePtr _set; - ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started. + static bool areRefreshRetriesDisabledForTest(); }; } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_interface.h b/src/mongo/client/replica_set_monitor_interface.h new file mode 100644 index 00000000000..3826154dfe2 --- /dev/null +++ b/src/mongo/client/replica_set_monitor_interface.h @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <set> +#include <string> + +#include "mongo/client/mongo_uri.h" +#include "mongo/client/replica_set_change_notifier.h" +#include "mongo/util/duration.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +struct ReadPreferenceSetting; + +class ReplicaSetMonitorInterface { +public: + virtual ~ReplicaSetMonitorInterface() = default; + + /** + * The default timeout, which will be used for finding a replica set host if the caller does + * not explicitly specify it. + */ + static constexpr Seconds kDefaultFindHostTimeout{15}; + + /** + * Schedules the initial refresh task into task executor. + */ + virtual void init() = 0; + + /** + * Ends any ongoing refreshes. + */ + virtual void drop() = 0; + + /** + * Returns a host matching the given read preference or an error, if no host matches. + * + * @param readPref Read preference to match against + * @param maxWait If no host is readily available that matches the specified read preference, + * wait for one to become available for up to the specified time and periodically refresh + * the view of the set. The call may return with an error earlier than the specified value, + * if none of the known hosts for the set are reachable within some number of attempts. + * Note that if a maxWait of 0ms is specified, this method may still attempt to contact + * every host in the replica set up to one time. + * + * Known errors are: + * FailedToSatisfyReadPreference, if node cannot be found, which matches the read preference. + */ + virtual SemiFuture<HostAndPort> getHostOrRefresh( + const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout) = 0; + + virtual SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh( + const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout) = 0; + + /** + * Returns the host the RSM thinks is the current master or uasserts. + * + * This is a thin wrapper around getHostOrRefresh and will also refresh the view if a primary + * does not exist. The main difference is that this will uassert rather than returning an empty + * HostAndPort. + */ + virtual HostAndPort getMasterOrUassert() = 0; + + /** + * Notifies this Monitor that a host has failed because of the specified error 'status' and + * should be considered down. + * + * Call this when you get a connection error. If you get an error while trying to refresh our + * view of a host, call Refresher::failedHost instead because it bypasses taking the monitor's + * mutex. + */ + virtual void failedHost(const HostAndPort& host, const Status& status) = 0; + + /** + * Returns true if this node is the master based ONLY on local data. Be careful, return may + * be stale. + */ + virtual bool isPrimary(const HostAndPort& host) const = 0; + + /** + * Returns true if host is part of this set and is considered up (meaning it can accept + * queries). + */ + virtual bool isHostUp(const HostAndPort& host) const = 0; + + /** + * Returns the minimum wire version supported across the replica set. + */ + virtual int getMinWireVersion() const = 0; + + /** + * Returns the maximum wire version supported across the replica set. + */ + virtual int getMaxWireVersion() const = 0; + + /** + * The name of the set. + */ + virtual std::string getName() const = 0; + + /** + * Returns a std::string with the format name/server1,server2. + * If name is empty, returns just comma-separated list of servers. + * It IS updated to reflect the current members of the set. + */ + virtual std::string getServerAddress() const = 0; + + /** + * Returns the URI that was used to construct this monitor. + * It IS NOT updated to reflect the current members of the set. + */ + virtual const MongoURI& getOriginalUri() const = 0; + + /** + * Is server part of this set? Uses only cached information. + */ + virtual bool contains(const HostAndPort& server) const = 0; + + /** + * Writes information about our cached view of the set to a BSONObjBuilder. If + * forFTDC, trim to minimize its size for full-time diagnostic data capture. + */ + virtual void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const = 0; + + /** + * Returns true if the monitor knows a usable primary from it's interal view. + */ + virtual bool isKnownToHaveGoodPrimary() const = 0; + + /** + * This is for use in tests using MockReplicaSet to ensure that a full scan completes before + * continuing. + */ + virtual void runScanForMockReplicaSet() = 0; +}; + +} // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index fad377e0368..1e81bbbb688 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -38,7 +38,9 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/connection_string.h" #include "mongo/client/mongo_uri.h" -#include "mongo/client/replica_set_monitor.h" +#include "mongo/client/replica_set_monitor_params_gen.h" +#include "mongo/client/scanning_replica_set_monitor.h" +#include "mongo/client/streamable_replica_set_monitor.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" @@ -130,10 +132,14 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const LOGV2(20186, "Starting new replica set monitor for {uri}", "uri"_attr = uri.toString()); - auto newMonitor = std::make_shared<ReplicaSetMonitor>(uri); - _monitors[setName] = newMonitor; - newMonitor->init(); - return newMonitor; + if (disableStreamableReplicaSetMonitor.load()) { + auto newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri); + _monitors[setName] = newMonitor; + newMonitor->init(); + return newMonitor; + } else { + uasserted(31451, "StreamableReplicaSetMonitor is not yet implemented"); + } } vector<string> ReplicaSetMonitorManager::getAllSetNames() { diff --git a/src/mongo/client/replica_set_monitor_params.idl b/src/mongo/client/replica_set_monitor_params.idl new file mode 100644 index 00000000000..f5b0e571305 --- /dev/null +++ b/src/mongo/client/replica_set_monitor_params.idl @@ -0,0 +1,39 @@ +# Copyright (C) 2020-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +global: + cpp_namespace: mongo + +server_parameters: + disableStreamableReplicaSetMonitor: + description: >- + Disable the StreamableReplicaSetMonitor and revert to the prior behavior with the + ScanningReplicaSetMonitor + set_at: startup + cpp_vartype: AtomicWord<bool> + cpp_varname: disableStreamableReplicaSetMonitor + default: true diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp new file mode 100644 index 00000000000..d29259d4bb7 --- /dev/null +++ b/src/mongo/client/scanning_replica_set_monitor.cpp @@ -0,0 +1,1424 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/client/scanning_replica_set_monitor.h" + +#include <algorithm> +#include <limits> +#include <random> + +#include "mongo/bson/simple_bsonelement_comparator.h" +#include "mongo/client/connpool.h" +#include "mongo/client/global_conn_pool.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/scanning_replica_set_monitor_internal.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/bson_extract_optime.h" +#include "mongo/db/server_options.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/util/background.h" +#include "mongo/util/debug_util.h" +#include "mongo/util/exit.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/log.h" +#include "mongo/util/string_map.h" +#include "mongo/util/timer.h" + +namespace mongo { + +using std::numeric_limits; +using std::set; +using std::shared_ptr; +using std::string; +using std::vector; + +namespace { + +// Pull nested types to top-level scope +typedef ScanningReplicaSetMonitor::IsMasterReply IsMasterReply; +typedef ScanningReplicaSetMonitor::ScanState ScanState; +typedef ScanningReplicaSetMonitor::ScanStatePtr ScanStatePtr; +typedef ScanningReplicaSetMonitor::SetState SetState; +typedef ScanningReplicaSetMonitor::SetStatePtr SetStatePtr; +typedef ScanningReplicaSetMonitor::Refresher Refresher; +typedef ScanState::UnconfirmedReplies UnconfirmedReplies; +typedef SetState::Node Node; +typedef SetState::Nodes Nodes; +using executor::TaskExecutor; +using CallbackArgs = TaskExecutor::CallbackArgs; +using CallbackHandle = TaskExecutor::CallbackHandle; + +// Intentionally chosen to compare worse than all known latencies. +const int64_t unknownLatency = numeric_limits<int64_t>::max(); + +const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet()); + +// +// Helpers for stl algorithms +// + +bool isMaster(const Node& node) { + return node.isMaster; +} + +bool opTimeGreater(const Node* lhs, const Node* rhs) { + return lhs->opTime > rhs->opTime; +} + +bool compareLatencies(const Node* lhs, const Node* rhs) { + // NOTE: this automatically compares Node::unknownLatency worse than all others. + return lhs->latencyMicros < rhs->latencyMicros; +} + +bool hostsEqual(const Node& lhs, const HostAndPort& rhs) { + return lhs.host == rhs; +} + +// Allows comparing two Nodes, or a HostAndPort and a Node. +// NOTE: the two HostAndPort overload is only needed to support extra checks in some STL +// implementations. For simplicity, no comparator should be used with collections of just +// HostAndPort. +struct CompareHosts { + bool operator()(const Node& lhs, const Node& rhs) { + return lhs.host < rhs.host; + } + bool operator()(const Node& lhs, const HostAndPort& rhs) { + return lhs.host < rhs; + } + bool operator()(const HostAndPort& lhs, const Node& rhs) { + return lhs < rhs.host; + } + bool operator()(const HostAndPort& lhs, const HostAndPort& rhs) { + return lhs < rhs; + } +} compareHosts; // like an overloaded function, but able to pass to stl algorithms + +// The following structs should be treated as functions returning a UnaryPredicate. +// Usage example: std::find_if(nodes.begin(), nodes.end(), HostIs(someHost)); +// They all hold their constructor argument by reference. + +struct HostIs { + explicit HostIs(const HostAndPort& host) : _host(host) {} + bool operator()(const HostAndPort& host) { + return host == _host; + } + bool operator()(const Node& node) { + return node.host == _host; + } + const HostAndPort& _host; +}; + +struct HostNotIn { + explicit HostNotIn(const std::set<HostAndPort>& hosts) : _hosts(hosts) {} + bool operator()(const HostAndPort& host) { + return !_hosts.count(host); + } + bool operator()(const Node& node) { + return !_hosts.count(node.host); + } + const std::set<HostAndPort>& _hosts; +}; + +int32_t pingTimeMillis(const Node& node) { + auto latencyMillis = node.latencyMicros / 1000; + if (latencyMillis > numeric_limits<int32_t>::max()) { + // In particular, Node::unknownLatency does not fit in an int32. + return numeric_limits<int32_t>::max(); + } + return latencyMillis; +} + +/** + * Replica set refresh period on the task executor. + */ +const Seconds kDefaultRefreshPeriod(30); +} // namespace + +ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const SetStatePtr& initialState) + : _state(initialState) {} + +ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const MongoURI& uri) + : ScanningReplicaSetMonitor( + std::make_shared<SetState>(uri, + &ReplicaSetMonitorManager::get()->getNotifier(), + ReplicaSetMonitorManager::get()->getExecutor())) {} + +void ScanningReplicaSetMonitor::init() { + if (areRefreshRetriesDisabledForTest()) { + // This is for MockReplicaSet. Those tests want to control when scanning happens. + warning() << "*** Not starting background refresh because refresh retries are disabled."; + return; + } + + { + stdx::lock_guard lk(_state->mutex); + _state->init(); + } +} + +void ScanningReplicaSetMonitor::drop() { + { + stdx::lock_guard lk(_state->mutex); + _state->drop(); + } +} + +ScanningReplicaSetMonitor::~ScanningReplicaSetMonitor() { + drop(); +} + +template <typename Callback> +auto ScanningReplicaSetMonitor::SetState::scheduleWorkAt(Date_t when, Callback&& cb) const { + auto wrappedCallback = [cb = std::forward<Callback>(cb), + anchor = shared_from_this()](const CallbackArgs& cbArgs) mutable { + if (ErrorCodes::isCancelationError(cbArgs.status)) { + // Do no more work if we're removed or canceled + return; + } + invariant(cbArgs.status); + + stdx::lock_guard lk(anchor->mutex); + if (anchor->isDropped) { + return; + } + + cb(cbArgs); + }; + return executor->scheduleWorkAt(std::move(when), std::move(wrappedCallback)); +} + +void ScanningReplicaSetMonitor::SetState::rescheduleRefresh(SchedulingStrategy strategy) { + // Reschedule the refresh + + if (!executor || isMocked) { + // Without an executor, we can't do refreshes -- we're in a test + return; + } + + if (isDropped) { // already removed so no need to refresh + LOG(1) << "Stopping refresh for replica set " << name << " because it's removed"; + return; + } + + Milliseconds period = refreshPeriod; + if (isExpedited) { + period = std::min<Milliseconds>(period, kExpeditedRefreshPeriod); + } + + auto currentTime = now(); + auto possibleNextScanTime = currentTime + period; + if (refresherHandle && // + (strategy == SchedulingStrategy::kKeepEarlyScan) && // + (nextScanTime > currentTime) && // + (possibleNextScanTime >= nextScanTime)) { + // If the next scan would be sooner than our desired, why cancel? + return; + } + + // Cancel out the last refresh + if (auto currentHandle = std::exchange(refresherHandle, {})) { + executor->cancel(currentHandle); + } + + nextScanTime = possibleNextScanTime; + LOG(1) << "Next replica set scan scheduled for " << nextScanTime; + auto swHandle = scheduleWorkAt(nextScanTime, [this](const CallbackArgs& cbArgs) { + if (cbArgs.myHandle != refresherHandle) + return; // We've been replaced! + + // It is possible that a waiter will have already expired by the point of this rescan. + // Thus we notify here to trigger that logic. + notify(); + + _ensureScanInProgress(shared_from_this()); + + // And now we set up the next one + rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan); + }); + + if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) { + LOG(1) << "Cant schedule refresh for " << name << ". Executor shutdown in progress"; + return; + } + + if (!swHandle.isOK()) { + severe() << "Can't continue refresh for replica set " << name << " due to " + << redact(swHandle.getStatus()); + fassertFailed(40140); + } + + refresherHandle = std::move(swHandle.getValue()); +} + +SemiFuture<HostAndPort> ScanningReplicaSetMonitor::getHostOrRefresh( + const ReadPreferenceSetting& criteria, Milliseconds maxWait) { + return _getHostsOrRefresh(criteria, maxWait) + .then([](const auto& hosts) { + invariant(hosts.size()); + return hosts[0]; + }) + .semi(); +} + +SemiFuture<std::vector<HostAndPort>> ScanningReplicaSetMonitor::getHostsOrRefresh( + const ReadPreferenceSetting& criteria, Milliseconds maxWait) { + return _getHostsOrRefresh(criteria, maxWait).semi(); +} + +Future<std::vector<HostAndPort>> ScanningReplicaSetMonitor::_getHostsOrRefresh( + const ReadPreferenceSetting& criteria, Milliseconds maxWait) { + + stdx::lock_guard<Latch> lk(_state->mutex); + if (_state->isDropped) { + return Status(ErrorCodes::ReplicaSetMonitorRemoved, + str::stream() + << "ScanningReplicaSetMonitor for set " << getName() << " is removed"); + } + + auto out = _state->getMatchingHosts(criteria); + if (!out.empty()) + return {std::move(out)}; + + // Fail early without doing any more work if the timeout has already expired. + if (maxWait <= Milliseconds(0)) + return _state->makeUnsatisfedReadPrefError(criteria); + + // TODO look into putting all PrimaryOnly waiters on a single SharedPromise. The tricky part is + // dealing with maxWait. + auto pf = makePromiseFuture<decltype(out)>(); + _state->waiters.emplace_back( + SetState::Waiter{_state->now() + maxWait, criteria, std::move(pf.promise)}); + + // This must go after we set up the wait state to correctly handle unittests using + // MockReplicaSet. + _ensureScanInProgress(_state); + + // Switch to expedited scanning. + _state->isExpedited = true; + _state->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan); + + return std::move(pf.future); +} +HostAndPort ScanningReplicaSetMonitor::getMasterOrUassert() { + return getHostOrRefresh(kPrimaryOnlyReadPreference).get(); +} + +void ScanningReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) { + stdx::lock_guard<Latch> lk(_state->mutex); + Node* node = _state->findNode(host); + if (node) + node->markFailed(status); + if (kDebugBuild) + _state->checkInvariants(); +} + +bool ScanningReplicaSetMonitor::isPrimary(const HostAndPort& host) const { + stdx::lock_guard<Latch> lk(_state->mutex); + Node* node = _state->findNode(host); + return node ? node->isMaster : false; +} + +bool ScanningReplicaSetMonitor::isHostUp(const HostAndPort& host) const { + stdx::lock_guard<Latch> lk(_state->mutex); + Node* node = _state->findNode(host); + return node ? node->isUp : false; +} + +int ScanningReplicaSetMonitor::getMinWireVersion() const { + stdx::lock_guard<Latch> lk(_state->mutex); + int minVersion = 0; + for (const auto& host : _state->nodes) { + if (host.isUp) { + minVersion = std::max(minVersion, host.minWireVersion); + } + } + + return minVersion; +} + +int ScanningReplicaSetMonitor::getMaxWireVersion() const { + stdx::lock_guard<Latch> lk(_state->mutex); + int maxVersion = std::numeric_limits<int>::max(); + for (const auto& host : _state->nodes) { + if (host.isUp) { + maxVersion = std::min(maxVersion, host.maxWireVersion); + } + } + + return maxVersion; +} + +std::string ScanningReplicaSetMonitor::getName() const { + // name is const so don't need to lock + return _state->name; +} + +std::string ScanningReplicaSetMonitor::getServerAddress() const { + stdx::lock_guard<Latch> lk(_state->mutex); + // We return our setUri until first confirmation + return _state->seedConnStr.isValid() ? _state->seedConnStr.toString() + : _state->setUri.connectionString().toString(); +} + +const MongoURI& ScanningReplicaSetMonitor::getOriginalUri() const { + // setUri is const so no need to lock. + return _state->setUri; +} + +bool ScanningReplicaSetMonitor::contains(const HostAndPort& host) const { + stdx::lock_guard<Latch> lk(_state->mutex); + return _state->seedNodes.count(host); +} + +// TODO move to correct order with non-statics before pushing +void ScanningReplicaSetMonitor::appendInfo(BSONObjBuilder& bsonObjBuilder, bool forFTDC) const { + stdx::lock_guard<Latch> lk(_state->mutex); + + BSONObjBuilder monitorInfo(bsonObjBuilder.subobjStart(getName())); + if (forFTDC) { + for (size_t i = 0; i < _state->nodes.size(); i++) { + const Node& node = _state->nodes[i]; + monitorInfo.appendNumber(node.host.toString(), pingTimeMillis(node)); + } + return; + } + + // NOTE: the format here must be consistent for backwards compatibility + BSONArrayBuilder hosts(monitorInfo.subarrayStart("hosts")); + for (size_t i = 0; i < _state->nodes.size(); i++) { + const Node& node = _state->nodes[i]; + + BSONObjBuilder builder; + builder.append("addr", node.host.toString()); + builder.append("ok", node.isUp); + builder.append("ismaster", node.isMaster); // intentionally not camelCase + builder.append("hidden", false); // we don't keep hidden nodes in the set + builder.append("secondary", node.isUp && !node.isMaster); + builder.append("pingTimeMillis", pingTimeMillis(node)); + + if (!node.tags.isEmpty()) { + builder.append("tags", node.tags); + } + + hosts.append(builder.obj()); + } +} +bool ScanningReplicaSetMonitor::isKnownToHaveGoodPrimary() const { + stdx::lock_guard<Latch> lk(_state->mutex); + + for (const auto& node : _state->nodes) { + if (node.isMaster) { + return true; + } + } + + return false; +} + +Seconds ScanningReplicaSetMonitor::getDefaultRefreshPeriod() { + Seconds r = kDefaultRefreshPeriod; + static constexpr auto kPeriodField = "period"_sd; + globalFailPointRegistry() + .find("modifyReplicaSetMonitorDefaultRefreshPeriod") + ->executeIf([&r](const BSONObj& data) { r = Seconds{data.getIntField(kPeriodField)}; }, + [](const BSONObj& data) { return data.hasField(kPeriodField); }); + return r; +} + +void ScanningReplicaSetMonitor::runScanForMockReplicaSet() { + stdx::lock_guard<Latch> lk(_state->mutex); + _ensureScanInProgress(_state); + + // This function should only be called from tests using MockReplicaSet and they should use the + // synchronous path to complete before returning. + invariant(_state->currentScan == nullptr); +} + +void ScanningReplicaSetMonitor::_ensureScanInProgress(const SetStatePtr& state) { + Refresher(state).scheduleNetworkRequests(); +} + +Refresher::Refresher(const SetStatePtr& setState) : _set(setState), _scan(setState->currentScan) { + if (_scan) { + _set->rescheduleRefresh(SetState::SchedulingStrategy::kKeepEarlyScan); + _scan->retryAllTriedHosts(_set->rand); + return; // participate in in-progress scan + } + + startNewScan(); +} + +void Refresher::scheduleNetworkRequests() { + for (auto ns = getNextStep(); ns.step == NextStep::CONTACT_HOST; ns = getNextStep()) { + if (!_set->executor || _set->isMocked) { + // If we're mocked, just schedule an isMaster + scheduleIsMaster(ns.host); + continue; + } + + // cancel any scheduled isMaster calls that haven't yet been called + Node* node = _set->findOrCreateNode(ns.host); + if (auto handle = std::exchange(node->scheduledIsMasterHandle, {})) { + _set->executor->cancel(handle); + } + + // ensure that the call to isMaster is scheduled at most every 500ms + auto swHandle = + _set->scheduleWorkAt(node->nextPossibleIsMasterCall, + [*this, host = ns.host](const CallbackArgs& cbArgs) mutable { + scheduleIsMaster(host); + }); + + if (ErrorCodes::isShutdownError(swHandle.getStatus().code())) { + _scan->markHostsToScanAsTried(); + break; + } + + if (!swHandle.isOK()) { + severe() << "Can't continue scan for replica set " << _set->name << " due to " + << redact(swHandle.getStatus()); + fassertFailed(31176); + } + + node->scheduledIsMasterHandle = uassertStatusOK(std::move(swHandle)); + } + + if (kDebugBuild) + _set->checkInvariants(); +} + +void Refresher::scheduleIsMaster(const HostAndPort& host) { + if (_set->isMocked) { + // MockReplicaSet only works with DBClient-style access since it injects itself into the + // ScopedDbConnection pool connection creation. + try { + ScopedDbConnection conn(ConnectionString(host), kCheckTimeout.count()); + + auto timer = Timer(); + auto reply = BSONObj(); + bool ignoredOutParam = false; + conn->isMaster(ignoredOutParam, &reply); + conn.done(); // return to pool on success. + + receivedIsMaster(host, timer.micros(), reply); + } catch (DBException& ex) { + failedHost(host, ex.toStatus()); + } + + return; + } + + auto request = executor::RemoteCommandRequest( + host, "admin", BSON("isMaster" << 1), nullptr, kCheckTimeout); + request.sslMode = _set->setUri.getSSLMode(); + auto status = + _set->executor + ->scheduleRemoteCommand( + std::move(request), + [copy = *this, host, timer = Timer()]( + const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { + stdx::lock_guard lk(copy._set->mutex); + // Ignore the reply and return if we are no longer the current scan. This might + // happen if it was decided that the host we were contacting isn't part of the + // set. + if (copy._scan != copy._set->currentScan) { + return; + } + + // ensure that isMaster calls occur at most 500ms after the previous call ended + if (auto node = copy._set->findNode(host)) { + node->nextPossibleIsMasterCall = + copy._set->executor->now() + Milliseconds(500); + } + + if (result.response.isOK()) { + // Not using result.response.elapsedMillis because higher precision is + // useful for computing the rolling average. + copy.receivedIsMaster(host, timer.micros(), result.response.data); + } else { + copy.failedHost(host, result.response.status); + } + + // This reply may have discovered new hosts to contact so we need to schedule + // them. + copy.scheduleNetworkRequests(); + }) + .getStatus(); + + if (!status.isOK()) { + failedHost(host, status); + // This is only called from scheduleNetworkRequests() which will still be looping, so we + // don't need to call it here after updating the state. + } +} + +Refresher::NextStep Refresher::getNextStep() { + // No longer the current scan + if (_scan != _set->currentScan) { + return NextStep(NextStep::DONE); + } + + // Wait for all dispatched hosts to return before trying any fallback hosts. + if (_scan->hostsToScan.empty() && !_scan->waitingFor.empty()) { + return NextStep(NextStep::WAIT); + } + + // If we haven't yet found a master, try contacting unconfirmed hosts + if (_scan->hostsToScan.empty() && !_scan->foundUpMaster) { + _scan->enqueAllUntriedHosts(_scan->possibleNodes, _set->rand); + _scan->possibleNodes.clear(); + } + + if (_scan->hostsToScan.empty()) { + // We've tried all hosts we can, so nothing more to do in this round. + if (!_scan->foundUpMaster) { + warning() << "Unable to reach primary for set " << _set->name; + + // Since we've talked to everyone we could but still didn't find a primary, we + // do the best we can, and assume all unconfirmedReplies are actually from nodes + // in the set (we've already confirmed that they think they are). This is + // important since it allows us to bootstrap to a usable state even if we are + // unable to talk to a master while starting up. As soon as we are able to + // contact a master, we will remove any nodes that it doesn't think are part of + // the set, undoing the damage we cause here. + + // NOTE: we don't modify seedNodes or notify about set membership change in this + // case since it hasn't been confirmed by a master. + for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); + it != _scan->unconfirmedReplies.end(); + ++it) { + _set->findOrCreateNode(it->first)->update(it->second); + } + + auto connStr = _set->possibleConnectionString(); + if (connStr != _set->workingConnStr) { + _set->workingConnStr = std::move(connStr); + _set->notifier->onPossibleSet(_set->workingConnStr); + } + + // If at some point we care about lacking a primary, on it here + _set->lastSeenMaster = {}; + } + + if (_scan->foundAnyUpNodes) { + _set->consecutiveFailedScans = 0; + } else { + auto nScans = _set->consecutiveFailedScans++; + if (nScans <= 10 || nScans % 10 == 0) { + log() << "Cannot reach any nodes for set " << _set->name + << ". Please check network connectivity and the status of the set. " + << "This has happened for " << _set->consecutiveFailedScans + << " checks in a row."; + } + } + + // Makes sure all other Refreshers in this round return DONE + _set->currentScan.reset(); + _set->notify(); + + LOG(1) << "Refreshing replica set " << _set->name << " took " << _scan->timer.millis() + << "ms"; + + return NextStep(NextStep::DONE); + } + + // Pop and return the next hostToScan. + HostAndPort host = _scan->hostsToScan.front(); + _scan->hostsToScan.pop_front(); + _scan->waitingFor.insert(host); + _scan->triedHosts.insert(host); + + return NextStep(NextStep::CONTACT_HOST, host); +} + +void Refresher::receivedIsMaster(const HostAndPort& from, + int64_t latencyMicros, + const BSONObj& replyObj) { + _scan->waitingFor.erase(from); + + const IsMasterReply reply(from, latencyMicros, replyObj); + + // Handle various failure cases + if (!reply.ok) { + failedHost(from, {ErrorCodes::CommandFailed, "Failed to execute 'ismaster' command"}); + return; + } + + if (reply.setName != _set->name) { + if (reply.raw["isreplicaset"].trueValue()) { + // The reply came from a node in the state referred to as RSGhost in the SDAM + // spec. RSGhost corresponds to either REMOVED or STARTUP member states. In any event, + // if a reply from a ghost offers a list of possible other members of the replica set, + // and if this refresher has yet to find the replica set master, we add hosts listed in + // the reply to the list of possible replica set members. + if (!_scan->foundUpMaster) { + _scan->possibleNodes.insert(reply.members.begin(), reply.members.end()); + } + } else { + error() << "replset name mismatch: expected \"" << _set->name << "\", " + << "but remote node " << from << " has replset name \"" << reply.setName << "\"" + << ", ismaster: " << replyObj; + } + + failedHost(from, + {ErrorCodes::InconsistentReplicaSetNames, + str::stream() << "Target replica set name " << reply.setName + << " does not match the monitored set name " << _set->name}); + return; + } + + if (reply.isMaster) { + Status status = receivedIsMasterFromMaster(from, reply); + if (!status.isOK()) { + failedHost(from, status); + return; + } + } + + if (_scan->foundUpMaster) { + // We only update a Node if a master has confirmed it is in the set. + _set->updateNodeIfInNodes(reply); + } else { + // Populate possibleNodes. + _scan->possibleNodes.insert(reply.members.begin(), reply.members.end()); + _scan->unconfirmedReplies[from] = reply; + } + + // _set->nodes may still not have any nodes with isUp==true, but we have at least found a + // connectible host that is that claims to be in the set. + _scan->foundAnyUpNodes = true; + + _set->notify(); + + if (kDebugBuild) + _set->checkInvariants(); +} + +void Refresher::failedHost(const HostAndPort& host, const Status& status) { + _scan->waitingFor.erase(host); + + Node* node = _set->findNode(host); + if (node) + node->markFailed(status); + + if (_scan->waitingFor.empty()) { + // If this was the last host that needed a response, we should notify the SetState so that + // we can fail any waiters that have timed out. + _set->notify(); + } +} + +void Refresher::startNewScan() { + // The heuristics we use in deciding the order to contact hosts are designed to find a + // master as quickly as possible. This is because we can't use any hosts we find until + // we either get the latest set of members from a master or talk to all possible hosts + // without finding a master. + + // TODO It might make sense to check down nodes first if the last seen master is still + // marked as up. + + _scan = std::make_shared<ScanState>(); + _set->currentScan = _scan; + + int upNodes = 0; + for (Nodes::const_iterator it(_set->nodes.begin()), end(_set->nodes.end()); it != end; ++it) { + if (it->isUp) { + // _scan the nodes we think are up first + _scan->hostsToScan.push_front(it->host); + upNodes++; + } else { + _scan->hostsToScan.push_back(it->host); + } + } + + // shuffle the queue, but keep "up" nodes at the front + std::shuffle( + _scan->hostsToScan.begin(), _scan->hostsToScan.begin() + upNodes, _set->rand.urbg()); + std::shuffle(_scan->hostsToScan.begin() + upNodes, _scan->hostsToScan.end(), _set->rand.urbg()); + + if (!_set->lastSeenMaster.empty()) { + // move lastSeenMaster to front of queue + std::stable_partition( + _scan->hostsToScan.begin(), _scan->hostsToScan.end(), HostIs(_set->lastSeenMaster)); + } +} + +Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply) { + invariant(reply.isMaster); + + // Reject if config version is older. This is for backwards compatibility with nodes in pv0 + // since they don't have the same ordering with pv1 electionId. + if (reply.configVersion < _set->configVersion) { + return { + ErrorCodes::NotMaster, + str::stream() << "Node " << from << " believes it is primary, but its config version " + << reply.configVersion << " is older than the most recent config version " + << _set->configVersion}; + } + + if (reply.electionId.isSet()) { + // ElectionIds are only comparable if they are of the same protocol version. However, since + // isMaster has no protocol version field, we use the configVersion instead. This works + // because configVersion needs to be incremented whenever the protocol version is changed. + if (reply.configVersion == _set->configVersion && _set->maxElectionId.isSet() && + _set->maxElectionId.compare(reply.electionId) > 0) { + return { + ErrorCodes::NotMaster, + str::stream() << "Node " << from << " believes it is primary, but its election id " + << reply.electionId << " is older than the most recent election id " + << _set->maxElectionId}; + } + + _set->maxElectionId = reply.electionId; + } + + _set->configVersion = reply.configVersion; + + // Mark all nodes as not master. We will mark ourself as master before releasing the lock. + // NOTE: we use a "last-wins" policy if multiple hosts claim to be master. + for (size_t i = 0; i < _set->nodes.size(); i++) { + _set->nodes[i].isMaster = false; + } + + // Check if the master agrees with our current list of nodes. + // REMINDER: both _set->nodes and reply.members are sorted. + if (_set->nodes.size() != reply.members.size() || + !std::equal(_set->nodes.begin(), _set->nodes.end(), reply.members.begin(), hostsEqual)) { + LOG(2) << "Adjusting nodes in our view of replica set " << _set->name + << " based on master reply: " << redact(reply.raw); + + // remove non-members from _set->nodes + _set->nodes.erase( + std::remove_if(_set->nodes.begin(), _set->nodes.end(), HostNotIn(reply.members)), + _set->nodes.end()); + + // add new members to _set->nodes + for (auto& host : reply.members) { + _set->findOrCreateNode(host); + } + + // replace hostToScan queue with untried normal hosts. can both add and remove + // hosts from the queue. + _scan->hostsToScan.clear(); + _scan->enqueAllUntriedHosts(reply.members, _set->rand); + + if (!_scan->waitingFor.empty()) { + // make sure we don't wait for any hosts that aren't considered members + std::set<HostAndPort> newWaitingFor; + std::set_intersection(reply.members.begin(), + reply.members.end(), + _scan->waitingFor.begin(), + _scan->waitingFor.end(), + std::inserter(newWaitingFor, newWaitingFor.end())); + _scan->waitingFor.swap(newWaitingFor); + } + } + + bool changedHosts = reply.members != _set->seedNodes; + bool changedPrimary = reply.host != _set->lastSeenMaster; + if (changedHosts || changedPrimary) { + ++_set->seedGen; + _set->seedNodes = reply.members; + _set->seedConnStr = _set->confirmedConnectionString(); + + // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare + // and we want to record our changes + log() << "Confirmed replica set for " << _set->name << " is " << _set->seedConnStr; + + _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host, reply.passives); + } + + // Update our working string + _set->workingConnStr = _set->seedConnStr; + + // Update other nodes's information based on replies we've already seen + for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); + it != _scan->unconfirmedReplies.end(); + ++it) { + // this ignores replies from hosts not in _set->nodes (as modified above) + _set->updateNodeIfInNodes(it->second); + } + _scan->unconfirmedReplies.clear(); + + _scan->foundUpMaster = true; + _set->lastSeenMaster = reply.host; + + return Status::OK(); +} + +void IsMasterReply::parse(const BSONObj& obj) { + try { + raw = obj.getOwned(); // don't use obj again after this line + + ok = raw["ok"].trueValue(); + if (!ok) + return; + + setName = raw["setName"].str(); + hidden = raw["hidden"].trueValue(); + secondary = raw["secondary"].trueValue(); + + minWireVersion = raw["minWireVersion"].numberInt(); + maxWireVersion = raw["maxWireVersion"].numberInt(); + + // hidden nodes can't be master, even if they claim to be. + isMaster = !hidden && raw["ismaster"].trueValue(); + + if (isMaster && raw.hasField("electionId")) { + electionId = raw["electionId"].OID(); + } + + configVersion = raw["setVersion"].numberInt(); + + const string primaryString = raw["primary"].str(); + primary = primaryString.empty() ? HostAndPort() : HostAndPort(primaryString); + + // both hosts and passives, but not arbiters, are considered "normal hosts" + members.clear(); + BSONForEach(host, raw.getObjectField("hosts")) { + members.insert(HostAndPort(host.String())); + } + BSONForEach(host, raw.getObjectField("passives")) { + members.insert(HostAndPort(host.String())); + passives.insert(HostAndPort(host.String())); + } + + tags = raw.getObjectField("tags"); + BSONObj lastWriteField = raw.getObjectField("lastWrite"); + if (!lastWriteField.isEmpty()) { + if (auto lastWrite = lastWriteField["lastWriteDate"]) { + lastWriteDate = lastWrite.date(); + } + + uassertStatusOK(bsonExtractOpTimeField(lastWriteField, "opTime", &opTime)); + } + } catch (const std::exception& e) { + ok = false; + log() << "exception while parsing isMaster reply: " << e.what() << " " << obj; + } +} + +Node::Node(const HostAndPort& host) : host(host), latencyMicros(unknownLatency) {} + +void Node::markFailed(const Status& status) { + if (isUp) { + log() << "Marking host " << host << " as failed" << causedBy(redact(status)); + + isUp = false; + } + + isMaster = false; +} + +bool Node::matches(const ReadPreference pref) const { + if (!isUp) { + LOG(3) << "Host " << host << " is not up"; + return false; + } + + LOG(3) << "Host " << host << " is " << (isMaster ? "primary" : "not primary"); + if (pref == ReadPreference::PrimaryOnly) { + return isMaster; + } + + if (pref == ReadPreference::SecondaryOnly) { + return !isMaster; + } + + return true; +} + +bool Node::matches(const BSONObj& tag) const { + BSONForEach(tagCriteria, tag) { + if (SimpleBSONElementComparator::kInstance.evaluate( + this->tags[tagCriteria.fieldNameStringData()] != tagCriteria)) { + return false; + } + } + + return true; +} + +void Node::update(const IsMasterReply& reply) { + invariant(host == reply.host); + invariant(reply.ok); + + LOG(3) << "Updating host " << host << " based on ismaster reply: " << reply.raw; + + // Nodes that are hidden or neither master or secondary are considered down since we can't + // send any operations to them. + isUp = !reply.hidden && (reply.isMaster || reply.secondary); + isMaster = reply.isMaster; + + minWireVersion = reply.minWireVersion; + maxWireVersion = reply.maxWireVersion; + + // save a copy if unchanged + if (!tags.binaryEqual(reply.tags)) + tags = reply.tags.getOwned(); + + if (reply.latencyMicros >= 0) { // TODO upper bound? + if (latencyMicros == unknownLatency) { + latencyMicros = reply.latencyMicros; + } else { + // update latency with smoothed moving average (1/4th the delta) + latencyMicros += (reply.latencyMicros - latencyMicros) / 4; + } + } + + LOG(3) << "Updating " << host << " lastWriteDate to " << reply.lastWriteDate; + lastWriteDate = reply.lastWriteDate; + + LOG(3) << "Updating " << host << " opTime to " << reply.opTime; + opTime = reply.opTime; + lastWriteDateUpdateTime = Date_t::now(); +} + +SetState::SetState(const MongoURI& uri, + ReplicaSetChangeNotifier* notifier, + executor::TaskExecutor* executor) + : setUri(std::move(uri)), + name(setUri.getSetName()), + notifier(notifier), + executor(executor), + seedNodes(setUri.getServers().begin(), setUri.getServers().end()), + latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)), + rand(std::random_device()()), + refreshPeriod(getDefaultRefreshPeriod()) { + uassert(13642, "Replica set seed list can't be empty", !seedNodes.empty()); + + if (name.empty()) + warning() << "Replica set name empty, first node: " << *(seedNodes.begin()); + + // This adds the seed hosts to nodes, but they aren't usable for anything except seeding a + // scan until we start a scan and either find a master or contact all hosts without finding + // one. + // WARNING: if seedNodes is ever changed to not imply sorted iteration, you will need to + // sort nodes after this loop. + for (auto&& addr : seedNodes) { + nodes.push_back(Node(addr)); + + if (addr.host()[0] == '$') { + invariant(isMocked || &addr == &*seedNodes.begin()); // Can't mix and match. + isMocked = true; + } else { + invariant(!isMocked); // Can't mix and match. + } + } + + if (kDebugBuild) + checkInvariants(); +} + +HostAndPort SetState::getMatchingHost(const ReadPreferenceSetting& criteria) const { + auto hosts = getMatchingHosts(criteria); + + if (hosts.empty()) { + return HostAndPort(); + } + + return hosts[0]; +} + +std::vector<HostAndPort> SetState::getMatchingHosts(const ReadPreferenceSetting& criteria) const { + switch (criteria.pref) { + // "Prefered" read preferences are defined in terms of other preferences + case ReadPreference::PrimaryPreferred: { + std::vector<HostAndPort> out = + getMatchingHosts(ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); + // NOTE: the spec says we should use the primary even if tags don't match + if (!out.empty()) + return out; + return getMatchingHosts(ReadPreferenceSetting( + ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds)); + } + + case ReadPreference::SecondaryPreferred: { + std::vector<HostAndPort> out = getMatchingHosts(ReadPreferenceSetting( + ReadPreference::SecondaryOnly, criteria.tags, criteria.maxStalenessSeconds)); + if (!out.empty()) + return out; + // NOTE: the spec says we should use the primary even if tags don't match + return getMatchingHosts( + ReadPreferenceSetting(ReadPreference::PrimaryOnly, criteria.tags)); + } + + case ReadPreference::PrimaryOnly: { + // NOTE: isMaster implies isUp + Nodes::const_iterator it = std::find_if(nodes.begin(), nodes.end(), isMaster); + if (it == nodes.end()) + return {}; + return {it->host}; + } + + // The difference between these is handled by Node::matches + case ReadPreference::SecondaryOnly: + case ReadPreference::Nearest: { + std::function<bool(const Node&)> matchNode = [](const Node& node) -> bool { + return true; + }; + // build comparator + if (criteria.maxStalenessSeconds.count()) { + auto masterIt = std::find_if(nodes.begin(), nodes.end(), isMaster); + if (masterIt == nodes.end() || !masterIt->lastWriteDate.toMillisSinceEpoch()) { + auto writeDateCmp = [](const Node* a, const Node* b) -> bool { + return a->lastWriteDate < b->lastWriteDate; + }; + // use only non failed nodes + std::vector<const Node*> upNodes; + for (auto nodeIt = nodes.begin(); nodeIt != nodes.end(); ++nodeIt) { + if (nodeIt->isUp && nodeIt->lastWriteDate.toMillisSinceEpoch()) { + upNodes.push_back(&(*nodeIt)); + } + } + auto latestSecNode = + std::max_element(upNodes.begin(), upNodes.end(), writeDateCmp); + if (latestSecNode == upNodes.end()) { + matchNode = [](const Node& node) -> bool { return false; }; + } else { + Date_t maxWriteTime = (*latestSecNode)->lastWriteDate; + matchNode = [=](const Node& node) -> bool { + return duration_cast<Seconds>(maxWriteTime - node.lastWriteDate) + + refreshPeriod <= + criteria.maxStalenessSeconds; + }; + } + } else { + Seconds primaryStaleness = duration_cast<Seconds>( + masterIt->lastWriteDateUpdateTime - masterIt->lastWriteDate); + matchNode = [=](const Node& node) -> bool { + return duration_cast<Seconds>(node.lastWriteDateUpdateTime - + node.lastWriteDate) - + primaryStaleness + refreshPeriod <= + criteria.maxStalenessSeconds; + }; + } + } + + std::vector<const Node*> allMatchingNodes; + BSONForEach(tagElem, criteria.tags.getTagBSON()) { + uassert(16358, "Tags should be a BSON object", tagElem.isABSONObj()); + BSONObj tag = tagElem.Obj(); + + std::vector<const Node*> matchingNodes; + for (size_t i = 0; i < nodes.size(); i++) { + if (nodes[i].matches(criteria.pref) && nodes[i].matches(tag) && + matchNode(nodes[i])) { + matchingNodes.push_back(&nodes[i]); + } + } + + // Only consider nodes that satisfy the minOpTime + if (!criteria.minOpTime.isNull()) { + std::sort(matchingNodes.begin(), matchingNodes.end(), opTimeGreater); + for (size_t i = 0; i < matchingNodes.size(); i++) { + if (matchingNodes[i]->opTime < criteria.minOpTime) { + if (i == 0) { + // If no nodes satisfy the minOpTime criteria, we ignore the + // minOpTime requirement. + break; + } + matchingNodes.erase(matchingNodes.begin() + i, matchingNodes.end()); + break; + } + } + } + + allMatchingNodes.insert( + allMatchingNodes.end(), matchingNodes.begin(), matchingNodes.end()); + } + + // don't do more complicated selection if not needed + if (allMatchingNodes.empty()) { + return {}; + } + if (allMatchingNodes.size() == 1) { + return {allMatchingNodes.front()->host}; + } + + // If there are multiple nodes satisfying the minOpTime, next order by latency + // and don't consider hosts further than a threshold from the closest. + std::sort(allMatchingNodes.begin(), allMatchingNodes.end(), compareLatencies); + for (size_t i = 1; i < allMatchingNodes.size(); i++) { + int64_t distance = + allMatchingNodes[i]->latencyMicros - allMatchingNodes[0]->latencyMicros; + if (distance >= latencyThresholdMicros) { + // this node and all remaining ones are too far away + allMatchingNodes.erase(allMatchingNodes.begin() + i, allMatchingNodes.end()); + break; + } + } + + std::vector<HostAndPort> hosts; + std::transform(allMatchingNodes.begin(), + allMatchingNodes.end(), + std::back_inserter(hosts), + [](const auto& node) { return node->host; }); + + // Note that the host list is only deterministic (or random) for the first node. + // The rest of the list is in matchingNodes order (latency) with one element swapped + // for the first element. + if (auto bestHostIdx = useDeterministicHostSelection ? roundRobin++ % hosts.size() + : rand.nextInt32(hosts.size())) { + using std::swap; + swap(hosts[0], hosts[bestHostIdx]); + } + + return hosts; + } + + default: + uassert(16337, "Unknown read preference", false); + break; + } +} + +Node* SetState::findNode(const HostAndPort& host) { + const Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); + if (it == nodes.end() || it->host != host) + return nullptr; + + return &(*it); +} + +Node* SetState::findOrCreateNode(const HostAndPort& host) { + // This is insertion sort, but N is currently guaranteed to be <= 12 (although this class + // must function correctly even with more nodes). If we lift that restriction, we may need + // to consider alternate algorithms. + Nodes::iterator it = std::lower_bound(nodes.begin(), nodes.end(), host, compareHosts); + if (it == nodes.end() || it->host != host) { + LOG(2) << "Adding node " << host << " to our view of replica set " << name; + it = nodes.insert(it, Node(host)); + } + return &(*it); +} + +void SetState::updateNodeIfInNodes(const IsMasterReply& reply) { + Node* node = findNode(reply.host); + if (!node) { + LOG(2) << "Skipping application of ismaster reply from " << reply.host + << " since it isn't a confirmed member of set " << name; + return; + } + + node->update(reply); +} + +ConnectionString SetState::confirmedConnectionString() const { + std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes)); + + return ConnectionString::forReplicaSet(name, std::move(hosts)); +} + +ConnectionString SetState::possibleConnectionString() const { + std::vector<HostAndPort> hosts; + hosts.reserve(nodes.size()); + + for (auto& node : nodes) { + hosts.push_back(node.host); + } + + return ConnectionString::forReplicaSet(name, std::move(hosts)); +} + +void SetState::notify() { + if (!waiters.size()) { + return; + } + + const auto cachedNow = now(); + auto shouldQuickFail = areRefreshRetriesDisabledForTest() && !currentScan; + + for (auto it = waiters.begin(); it != waiters.end();) { + if (isDropped) { + it->promise.setError({ErrorCodes::ShutdownInProgress, + str::stream() << "ScanningReplicaSetMonitor is shutting down"}); + waiters.erase(it++); + continue; + } + + auto match = getMatchingHosts(it->criteria); + if (!match.empty()) { + // match; + it->promise.emplaceValue(std::move(match)); + waiters.erase(it++); + } else if (it->deadline <= cachedNow) { + LOG(1) << "Unable to statisfy read preference " << it->criteria << " by deadline " + << it->deadline; + it->promise.setError(makeUnsatisfedReadPrefError(it->criteria)); + waiters.erase(it++); + } else if (shouldQuickFail) { + LOG(1) << "Unable to statisfy read preference because tests fail quickly"; + it->promise.setError(makeUnsatisfedReadPrefError(it->criteria)); + waiters.erase(it++); + } else { + it++; + } + } + + if (waiters.empty()) { + // No current waiters so we can stop the expedited scanning. + isExpedited = false; + rescheduleRefresh(SchedulingStrategy::kCancelPreviousScan); + } +} + +Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const { + return Status(ErrorCodes::FailedToSatisfyReadPreference, + str::stream() << "Could not find host matching read preference " + << criteria.toString() << " for set " << name); +} + +void SetState::init() { + rescheduleRefresh(SchedulingStrategy::kKeepEarlyScan); + notifier->onFoundSet(name); +} + +void SetState::drop() { + if (std::exchange(isDropped, true)) { + // If a SetState calls drop() from destruction after the RSMM calls shutdown(), then the + // RSMM's executor may no longer exist. Thus, only drop once. + return; + } + + currentScan.reset(); + notify(); + + if (auto handle = std::exchange(refresherHandle, {})) { + // Cancel our refresh on the way out + executor->cancel(handle); + } + + for (auto& node : nodes) { + if (auto handle = std::exchange(node.scheduledIsMasterHandle, {})) { + // Cancel any isMasters we had scheduled + executor->cancel(handle); + } + } + + // No point in notifying if we never started + if (workingConnStr.isValid()) { + notifier->onDroppedSet(name); + } +} + +void SetState::checkInvariants() const { + bool foundMaster = false; + for (size_t i = 0; i < nodes.size(); i++) { + // no empty hosts + invariant(!nodes[i].host.empty()); + + if (nodes[i].isMaster) { + // masters must be up + invariant(nodes[i].isUp); + + // at most one master + invariant(!foundMaster); + foundMaster = true; + + // if we have a master it should be the same as lastSeenMaster + invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster); + } + + // should never end up with negative latencies + invariant(nodes[i].latencyMicros >= 0); + + // nodes must be sorted by host with no-dupes + invariant(i == 0 || (nodes[i - 1].host < nodes[i].host)); + } + + // nodes should be a (non-strict) superset of the seedNodes + invariant(std::includes( + nodes.begin(), nodes.end(), seedNodes.begin(), seedNodes.end(), compareHosts)); + + if (currentScan) { + // hostsToScan can't have dups or hosts already in triedHosts. + std::set<HostAndPort> cantSee = currentScan->triedHosts; + for (std::deque<HostAndPort>::const_iterator it = currentScan->hostsToScan.begin(); + it != currentScan->hostsToScan.end(); + ++it) { + invariant(!cantSee.count(*it)); + cantSee.insert(*it); // make sure we don't see this again + } + + // We should only be waitingFor hosts that are in triedHosts + invariant(std::includes(currentScan->triedHosts.begin(), + currentScan->triedHosts.end(), + currentScan->waitingFor.begin(), + currentScan->waitingFor.end())); + + // We should only have unconfirmedReplies if we haven't found a master yet + invariant(!currentScan->foundUpMaster || currentScan->unconfirmedReplies.empty()); + } +} + +template <typename Container> +void ScanState::enqueAllUntriedHosts(const Container& container, PseudoRandom& rand) { + invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue. + + // no std::copy_if before c++11 + for (typename Container::const_iterator it(container.begin()), end(container.end()); it != end; + ++it) { + if (!triedHosts.count(*it)) { + hostsToScan.push_back(*it); + } + } + std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg()); +} + +void ScanState::retryAllTriedHosts(PseudoRandom& rand) { + invariant(hostsToScan.empty()); // because we don't try to dedup hosts already in the queue. + // Move hosts that are in triedHosts but not in waitingFor from triedHosts to hostsToScan. + std::set_difference(triedHosts.begin(), + triedHosts.end(), + waitingFor.begin(), + waitingFor.end(), + std::inserter(hostsToScan, hostsToScan.end())); + std::shuffle(hostsToScan.begin(), hostsToScan.end(), rand.urbg()); + triedHosts = waitingFor; +} + +void ScanState::markHostsToScanAsTried() noexcept { + while (!hostsToScan.empty()) { + auto host = hostsToScan.front(); + hostsToScan.pop_front(); + /** + * Mark the popped host as tried to avoid deleting hosts in multiple points. + * This emulates the final effect of Refresher::getNextStep() on the set. + */ + triedHosts.insert(host); + } +} +} // namespace mongo diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h new file mode 100644 index 00000000000..efd5f10be14 --- /dev/null +++ b/src/mongo/client/scanning_replica_set_monitor.h @@ -0,0 +1,213 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <set> +#include <string> + +#include "mongo/base/string_data.h" +#include "mongo/client/mongo_uri.h" +#include "mongo/client/replica_set_change_notifier.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/executor/task_executor.h" +#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/duration.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class ScanningReplicaSetMonitor : public ReplicaSetMonitor { + ScanningReplicaSetMonitor(const ScanningReplicaSetMonitor&) = delete; + ScanningReplicaSetMonitor& operator=(const ScanningReplicaSetMonitor&) = delete; + +public: + class Refresher; + + static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500); + static constexpr auto kCheckTimeout = Seconds(5); + + ScanningReplicaSetMonitor(const MongoURI& uri); + + void init() override; + + void drop() override; + + SemiFuture<HostAndPort> getHostOrRefresh( + const ReadPreferenceSetting& readPref, + Milliseconds maxWait = kDefaultFindHostTimeout) override; + + SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh( + const ReadPreferenceSetting& readPref, + Milliseconds maxWait = kDefaultFindHostTimeout) override; + + HostAndPort getMasterOrUassert() override; + + void failedHost(const HostAndPort& host, const Status& status) override; + + bool isPrimary(const HostAndPort& host) const override; + + bool isHostUp(const HostAndPort& host) const override; + + int getMinWireVersion() const override; + + int getMaxWireVersion() const override; + + std::string getName() const override; + + std::string getServerAddress() const override; + + const MongoURI& getOriginalUri() const override; + + bool contains(const HostAndPort& server) const override; + + void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override; + + bool isKnownToHaveGoodPrimary() const override; + + /** + * Returns the refresh period that is given to all new SetStates. + */ + static Seconds getDefaultRefreshPeriod(); + + // + // internal types (defined in scanning_replica_set_monitor_internal.h) + // + + struct IsMasterReply; + struct ScanState; + struct SetState; + typedef std::shared_ptr<ScanState> ScanStatePtr; + typedef std::shared_ptr<SetState> SetStatePtr; + + /** + * Allows tests to set initial conditions and introspect the current state. + */ + explicit ScanningReplicaSetMonitor(const SetStatePtr& initialState); + ~ScanningReplicaSetMonitor(); + + /** + * This is for use in tests using MockReplicaSet to ensure that a full scan completes before + * continuing. + */ + void runScanForMockReplicaSet(); + +private: + Future<std::vector<HostAndPort>> _getHostsOrRefresh(const ReadPreferenceSetting& readPref, + Milliseconds maxWait); + /** + * If no scan is in-progress, this function is responsible for setting up a new scan. Otherwise, + * does nothing. + */ + static void _ensureScanInProgress(const SetStatePtr&); + + const SetStatePtr _state; +}; + + +/** + * Refreshes the local view of a replica set. + * + * All logic related to choosing the hosts to contact and updating the SetState based on replies + * lives in this class. Use of this class should always be guarded by SetState::mutex unless in + * single-threaded use by ScanningReplicaSetMonitorTest. + */ +class ScanningReplicaSetMonitor::Refresher { +public: + explicit Refresher(const SetStatePtr& setState); + + struct NextStep { + enum StepKind { + CONTACT_HOST, /// Contact the returned host + WAIT, /// Wait on condition variable and try again. + DONE, /// No more hosts to contact in this Refresh round + }; + + explicit NextStep(StepKind step, const HostAndPort& host = HostAndPort()) + : step(step), host(host) {} + + StepKind step; + HostAndPort host; + }; + + /** + * Returns the next step to take. + * + * By calling this, you promise to call receivedIsMaster or failedHost if the NextStep is + * CONTACT_HOST. + */ + NextStep getNextStep(); + + /** + * Call this if a host returned from getNextStep successfully replied to an isMaster call. + * Negative latencyMicros are ignored. + */ + void receivedIsMaster(const HostAndPort& from, int64_t latencyMicros, const BSONObj& reply); + + /** + * Call this if a host returned from getNextStep failed to reply to an isMaster call. + */ + void failedHost(const HostAndPort& host, const Status& status); + + /** + * Starts a new scan over the hosts in set. + */ + void startNewScan(); + + /** + * First, checks that the "reply" is not from a stale primary by comparing the electionId of + * "reply" to the maxElectionId recorded by the SetState and returns OK status if "reply" + * belongs to a non-stale primary. Otherwise returns a failed status. + * + * The 'from' parameter specifies the node from which the response is received. + * + * Updates _set and _scan based on set-membership information from a master. + * Applies _scan->unconfirmedReplies to confirmed nodes. + * Does not update this host's node in _set->nodes. + */ + Status receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply); + + /** + * Schedules isMaster requests to all hosts that currently need to be contacted. + * Does nothing if requests have already been sent to all known hosts. + */ + void scheduleNetworkRequests(); + + void scheduleIsMaster(const HostAndPort& host); + +private: + // Both pointers are never NULL + SetStatePtr _set; + ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started. +}; + +} // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/scanning_replica_set_monitor_internal.h index 1c02db4f65d..a0ff4ac8e92 100644 --- a/src/mongo/client/replica_set_monitor_internal.h +++ b/src/mongo/client/scanning_replica_set_monitor_internal.h @@ -42,7 +42,7 @@ #include <vector> #include "mongo/client/read_preference.h" -#include "mongo/client/replica_set_monitor.h" +#include "mongo/client/scanning_replica_set_monitor.h" #include "mongo/db/jsobj.h" #include "mongo/platform/mutex.h" #include "mongo/platform/random.h" @@ -51,7 +51,7 @@ namespace mongo { -struct ReplicaSetMonitor::IsMasterReply { +struct ScanningReplicaSetMonitor::IsMasterReply { IsMasterReply() : ok(false) {} IsMasterReply(const HostAndPort& host, int64_t latencyMicros, const BSONObj& reply) : ok(false), host(host), latencyMicros(latencyMicros) { @@ -86,14 +86,15 @@ struct ReplicaSetMonitor::IsMasterReply { }; /** - * The SetState is the underlying data object behind both the ReplicaSetMonitor and the Refresher + * The SetState is the underlying data object behind both the ScanningReplicaSetMonitor and the + * Refresher * * Note that the SetState only holds its own lock in init() and drop(). Even those uses can probably * be offloaded to the RSM eventually. In all other cases, the RSM and RSM::Refresher use the * SetState lock to synchronize. */ -struct ReplicaSetMonitor::SetState - : public std::enable_shared_from_this<ReplicaSetMonitor::SetState> { +struct ScanningReplicaSetMonitor::SetState + : public std::enable_shared_from_this<ScanningReplicaSetMonitor::SetState> { SetState(const SetState&) = delete; SetState& operator=(const SetState&) = delete; @@ -271,7 +272,7 @@ public: ConnectionString workingConnStr; // The connection string from our last scan // For tracking replies - OID maxElectionId; // largest election id observed by this ReplicaSetMonitor + OID maxElectionId; // largest election id observed by this ScanningReplicaSetMonitor int configVersion = 0; // version number of the replica set config. // For matching hosts @@ -288,7 +289,7 @@ public: Date_t nextScanTime; // The time at which the next scan is scheduled to start }; -struct ReplicaSetMonitor::ScanState { +struct ScanningReplicaSetMonitor::ScanState { ScanState(const ScanState&) = delete; ScanState& operator=(const ScanState&) = delete; diff --git a/src/mongo/client/replica_set_monitor_internal_test.cpp b/src/mongo/client/scanning_replica_set_monitor_internal_test.cpp index 1a90deed113..766558de2f5 100644 --- a/src/mongo/client/replica_set_monitor_internal_test.cpp +++ b/src/mongo/client/scanning_replica_set_monitor_internal_test.cpp @@ -29,7 +29,7 @@ #include "mongo/platform/basic.h" -#include "mongo/client/replica_set_monitor_test_fixture.h" +#include "mongo/client/scanning_replica_set_monitor_test_fixture.h" #include "mongo/client/mongo_uri.h" @@ -37,7 +37,7 @@ namespace mongo { namespace { // -- SetState Construction -- -using InitialStateTest = ReplicaSetMonitorTest; +using InitialStateTest = ScanningReplicaSetMonitorTest; TEST_F(InitialStateTest, InitialStateMongoURI) { auto uri = MongoURI::parse("mongodb://a,b,c/?replicaSet=name"); @@ -58,7 +58,7 @@ TEST_F(InitialStateTest, InitialStateMongoURI) { } // -- Node operations -- -class NodeTest : public ReplicaSetMonitorTest { +class NodeTest : public ScanningReplicaSetMonitorTest { public: bool isCompatible(const Node& node, ReadPreference pref, const TagSet& tagSet) { auto connStr = ConnectionString::forReplicaSet(kSetName, {node.host}); @@ -356,7 +356,7 @@ TEST_F(NodeTest, SecNodeNotCompatibleMultiTag) { } // -- IsMasterReply operations -- -using IsMasterReplyTest = ReplicaSetMonitorTest; +using IsMasterReplyTest = ScanningReplicaSetMonitorTest; TEST_F(IsMasterReplyTest, IsMasterBadParse) { BSONObj ismaster = BSON("hosts" << BSON_ARRAY("mongo.example:badport")); IsMasterReply imr(HostAndPort("mongo.example:27017"), -1, ismaster); diff --git a/src/mongo/client/replica_set_monitor_read_preference_test.cpp b/src/mongo/client/scanning_replica_set_monitor_read_preference_test.cpp index fa702580389..5fafe64071d 100644 --- a/src/mongo/client/replica_set_monitor_read_preference_test.cpp +++ b/src/mongo/client/scanning_replica_set_monitor_read_preference_test.cpp @@ -29,7 +29,7 @@ #include "mongo/platform/basic.h" -#include "mongo/client/replica_set_monitor_test_fixture.h" +#include "mongo/client/scanning_replica_set_monitor_test_fixture.h" #include <memory> @@ -39,7 +39,7 @@ namespace mongo { namespace { -class ReadPrefTest : public ReplicaSetMonitorTest { +class ReadPrefTest : public ScanningReplicaSetMonitorTest { public: ReadPrefTest() = default; virtual ~ReadPrefTest() = default; diff --git a/src/mongo/client/replica_set_monitor_scan_test.cpp b/src/mongo/client/scanning_replica_set_monitor_scan_test.cpp index 2bc894f27ba..451b20af3e0 100644 --- a/src/mongo/client/replica_set_monitor_scan_test.cpp +++ b/src/mongo/client/scanning_replica_set_monitor_scan_test.cpp @@ -31,7 +31,7 @@ #include "mongo/platform/basic.h" -#include "mongo/client/replica_set_monitor_test_fixture.h" +#include "mongo/client/scanning_replica_set_monitor_test_fixture.h" #include "mongo/client/mongo_uri.h" #include "mongo/logv2/log.h" @@ -39,7 +39,7 @@ namespace mongo { namespace { -using CoreScanTest = ReplicaSetMonitorTest; +using CoreScanTest = ScanningReplicaSetMonitorTest; TEST_F(CoreScanTest, CheckAllSeedsSerial) { auto state = makeState(basicUri); @@ -568,7 +568,7 @@ TEST_F(CoreScanTest, GetMatchingDuringScan) { // Ensure nothing breaks when out-of-band failedHost is called during scan TEST_F(CoreScanTest, OutOfBandFailedHost) { auto state = makeState(basicUri); - ReplicaSetMonitorPtr rsm = std::make_shared<ReplicaSetMonitor>(state); + auto rsm = std::make_shared<ScanningReplicaSetMonitor>(state); Refresher refresher(state); for (size_t i = 0; i != basicSeeds.size(); ++i) { @@ -802,14 +802,14 @@ TEST_F(CoreScanTest, StalePrimaryWithObsoleteElectionId) { TEST_F(CoreScanTest, NoPrimaryUpCheck) { auto state = makeState(basicUri); - ReplicaSetMonitor rsm(state); + ScanningReplicaSetMonitor rsm(state); ASSERT_FALSE(rsm.isKnownToHaveGoodPrimary()); } TEST_F(CoreScanTest, PrimaryIsUpCheck) { auto state = makeState(basicUri); state->nodes.front().isMaster = true; - ReplicaSetMonitor rsm(state); + ScanningReplicaSetMonitor rsm(state); ASSERT_TRUE(rsm.isKnownToHaveGoodPrimary()); } @@ -901,7 +901,7 @@ TEST_F(CoreScanTest, TwoPrimaries2ndHasOlderConfigVersion) { ASSERT_EQUALS(state->configVersion, 2); } -using MaxStalenessMSTest = ReplicaSetMonitorTest; +using MaxStalenessMSTest = ScanningReplicaSetMonitorTest; /** * Success finding node matching maxStalenessMS parameter @@ -1289,7 +1289,7 @@ TEST_F(MaxStalenessMSTest, MaxStalenessMSZeroNoLastWrite) { ASSERT(!state->getMatchingHost(secondary).empty()); } -using MinOpTimeTest = ReplicaSetMonitorTest; +using MinOpTimeTest = ScanningReplicaSetMonitorTest; /** * Success matching minOpTime */ @@ -1454,7 +1454,7 @@ public: State lastState; }; -class ChangeNotifierTest : public ReplicaSetMonitorTest { +class ChangeNotifierTest : public ScanningReplicaSetMonitorTest { public: ChangeNotifierTest() = default; virtual ~ChangeNotifierTest() = default; diff --git a/src/mongo/client/replica_set_monitor_test_concurrent.cpp b/src/mongo/client/scanning_replica_set_monitor_test_concurrent.cpp index 358dcc29cb7..1d10a4805ee 100644 --- a/src/mongo/client/replica_set_monitor_test_concurrent.cpp +++ b/src/mongo/client/scanning_replica_set_monitor_test_concurrent.cpp @@ -32,7 +32,7 @@ #include "mongo/platform/basic.h" #include "mongo/client/replica_set_monitor.h" -#include "mongo/client/replica_set_monitor_internal.h" +#include "mongo/client/scanning_replica_set_monitor_internal.h" #include "mongo/dbtests/mock/mock_replica_set.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/thread_pool_mock.h" @@ -51,7 +51,7 @@ using executor::RemoteCommandResponse; using executor::ThreadPoolExecutorTest; using InNetworkGuard = NetworkInterfaceMock::InNetworkGuard; using NetworkOperationIterator = NetworkInterfaceMock::NetworkOperationIterator; -using StepKind = ReplicaSetMonitor::Refresher::NextStep::StepKind; +using StepKind = ScanningReplicaSetMonitor::Refresher::NextStep::StepKind; class ReplicaSetMonitorConcurrentTest : public ThreadPoolExecutorTest { protected: @@ -178,9 +178,9 @@ TEST_F(ReplicaSetMonitorConcurrentTest, RechecksAvailableNodesUntilExpiration) { MockReplicaSet replSet("test", 2, false /* hasPrimary */, false /* dollarPrefixHosts */); const auto node0 = HostAndPort(replSet.getSecondaries()[0]); const auto node1 = HostAndPort(replSet.getSecondaries()[1]); - auto state = std::make_shared<ReplicaSetMonitor::SetState>( + auto state = std::make_shared<ScanningReplicaSetMonitor::SetState>( replSet.getURI(), &getNotifier(), &getExecutor()); - auto monitor = std::make_shared<ReplicaSetMonitor>(state); + auto monitor = std::make_shared<ScanningReplicaSetMonitor>(state); // Node 1 is unresponsive. replSet.kill(replSet.getSecondaries()[1]); @@ -245,9 +245,9 @@ TEST_F(ReplicaSetMonitorConcurrentTest, StepdownAndElection) { const auto node0 = HostAndPort(replSet.getSecondaries()[0]); const auto node1 = HostAndPort(replSet.getSecondaries()[1]); const auto node2 = HostAndPort(replSet.getSecondaries()[2]); - auto state = std::make_shared<ReplicaSetMonitor::SetState>( + auto state = std::make_shared<ScanningReplicaSetMonitor::SetState>( replSet.getURI(), &getNotifier(), &getExecutor()); - auto monitor = std::make_shared<ReplicaSetMonitor>(state); + auto monitor = std::make_shared<ScanningReplicaSetMonitor>(state); // Node 2 is unresponsive. replSet.kill(replSet.getSecondaries()[2]); @@ -328,9 +328,9 @@ TEST_F(ReplicaSetMonitorConcurrentTest, IsMasterFrequency) { const auto node0 = HostAndPort(replSet.getSecondaries()[0]); const auto node1 = HostAndPort(replSet.getSecondaries()[1]); - auto state = std::make_shared<ReplicaSetMonitor::SetState>( + auto state = std::make_shared<ScanningReplicaSetMonitor::SetState>( replSet.getURI(), &getNotifier(), &getExecutor()); - auto monitor = std::make_shared<ReplicaSetMonitor>(state); + auto monitor = std::make_shared<ScanningReplicaSetMonitor>(state); // Node 1 is unresponsive. replSet.kill(replSet.getSecondaries()[1]); @@ -389,9 +389,9 @@ TEST_F(ReplicaSetMonitorConcurrentTest, RecheckUntilTimeout) { const auto node0 = HostAndPort(replSet.getSecondaries()[0]); const auto node1 = HostAndPort(replSet.getSecondaries()[1]); - auto state = std::make_shared<ReplicaSetMonitor::SetState>( + auto state = std::make_shared<ScanningReplicaSetMonitor::SetState>( replSet.getURI(), &getNotifier(), &getExecutor()); - auto monitor = std::make_shared<ReplicaSetMonitor>(state); + auto monitor = std::make_shared<ScanningReplicaSetMonitor>(state); // Node 1 is unresponsive. replSet.kill(replSet.getSecondaries()[1]); @@ -414,11 +414,12 @@ TEST_F(ReplicaSetMonitorConcurrentTest, RecheckUntilTimeout) { // Every 500ms, the monitor rechecks node 0 after the previous successful isMaster. // Every 5s, the monitor rechecks node 1 after the previous isMaster experiences timeout. - constexpr auto kTimeoutPeriodMS = Milliseconds(ReplicaSetMonitor::kCheckTimeout).count() + - ReplicaSetMonitor::kExpeditedRefreshPeriod.count(); + constexpr auto kTimeoutPeriodMS = + Milliseconds(ScanningReplicaSetMonitor::kCheckTimeout).count() + + ScanningReplicaSetMonitor::kExpeditedRefreshPeriod.count(); checkUntil(Milliseconds(14500), [&]() { ASSERT_EQ(getNumChecks(node0), - elapsedMS() / ReplicaSetMonitor::kExpeditedRefreshPeriod.count() + 1); + elapsedMS() / ScanningReplicaSetMonitor::kExpeditedRefreshPeriod.count() + 1); ASSERT_EQ(getNumChecks(node1), elapsedMS() / kTimeoutPeriodMS + 1); ASSERT(!hostFuture.isReady()); }); diff --git a/src/mongo/client/replica_set_monitor_test_fixture.cpp b/src/mongo/client/scanning_replica_set_monitor_test_fixture.cpp index 4f0e158fa99..fa7f13cccc8 100644 --- a/src/mongo/client/replica_set_monitor_test_fixture.cpp +++ b/src/mongo/client/scanning_replica_set_monitor_test_fixture.cpp @@ -29,13 +29,13 @@ #include "mongo/platform/basic.h" -#include "mongo/client/replica_set_monitor_test_fixture.h" +#include "mongo/client/scanning_replica_set_monitor_test_fixture.h" namespace mongo { -const std::vector<HostAndPort> ReplicaSetMonitorTest::basicSeeds = { +const std::vector<HostAndPort> ScanningReplicaSetMonitorTest::basicSeeds = { HostAndPort("a"), HostAndPort("b"), HostAndPort("c")}; -const std::set<HostAndPort> ReplicaSetMonitorTest::basicSeedsSet = {std::begin(basicSeeds), - std::end(basicSeeds)}; -const MongoURI ReplicaSetMonitorTest::basicUri(ConnectionString::forReplicaSet(kSetName, - basicSeeds)); +const std::set<HostAndPort> ScanningReplicaSetMonitorTest::basicSeedsSet = {std::begin(basicSeeds), + std::end(basicSeeds)}; +const MongoURI ScanningReplicaSetMonitorTest::basicUri(ConnectionString::forReplicaSet(kSetName, + basicSeeds)); } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_test_fixture.h b/src/mongo/client/scanning_replica_set_monitor_test_fixture.h index 7e842008453..a48584ad362 100644 --- a/src/mongo/client/replica_set_monitor_test_fixture.h +++ b/src/mongo/client/scanning_replica_set_monitor_test_fixture.h @@ -34,8 +34,8 @@ #include <vector> #include "mongo/client/replica_set_change_notifier.h" -#include "mongo/client/replica_set_monitor.h" -#include "mongo/client/replica_set_monitor_internal.h" +#include "mongo/client/scanning_replica_set_monitor.h" +#include "mongo/client/scanning_replica_set_monitor_internal.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -44,21 +44,21 @@ namespace mongo { // current (only) thread, so they do not lock SetState::mutex before examining state. This is // NOT something that non-test code should do. -class ReplicaSetMonitorTest : public unittest::Test { +class ScanningReplicaSetMonitorTest : public unittest::Test { public: // Pull in nested types - using SetState = ReplicaSetMonitor::SetState; + using SetState = ScanningReplicaSetMonitor::SetState; using Node = SetState::Node; - using IsMasterReply = ReplicaSetMonitor::IsMasterReply; + using IsMasterReply = ScanningReplicaSetMonitor::IsMasterReply; - using Refresher = ReplicaSetMonitor::Refresher; + using Refresher = ScanningReplicaSetMonitor::Refresher; using NextStep = Refresher::NextStep; static constexpr StringData kSetName = "name"_sd; - ReplicaSetMonitorTest() = default; - virtual ~ReplicaSetMonitorTest() = default; + ScanningReplicaSetMonitorTest() = default; + virtual ~ScanningReplicaSetMonitorTest() = default; template <typename... Args> using StateIsConstructible = @@ -69,7 +69,7 @@ public: template <typename... Args, typename = StateIsConstructible<Args...>> auto makeState(Args&&... args) { - return std::make_shared<ReplicaSetMonitor::SetState>( + return std::make_shared<ScanningReplicaSetMonitor::SetState>( std::forward<Args>(args)..., &_notifier, nullptr); } diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp new file mode 100644 index 00000000000..0056a98c567 --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/streamable_replica_set_monitor.h" + +#include <functional> +#include <memory> +#include <set> +#include <string> + +#include "mongo/client/mongo_uri.h" +#include "mongo/client/replica_set_change_notifier.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/executor/task_executor.h" +#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/duration.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri) {} +void StreamableReplicaSetMonitor::init() {} + +void StreamableReplicaSetMonitor::drop() {} + +SemiFuture<HostAndPort> StreamableReplicaSetMonitor::getHostOrRefresh( + const ReadPreferenceSetting& readPref, Milliseconds maxWait) { + MONGO_UNREACHABLE; +} + +SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefresh( + const ReadPreferenceSetting& readPref, Milliseconds maxWait) { + MONGO_UNREACHABLE; +} + +HostAndPort StreamableReplicaSetMonitor::getMasterOrUassert() { + MONGO_UNREACHABLE; +} +void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {} +bool StreamableReplicaSetMonitor::isPrimary(const HostAndPort& host) const { + MONGO_UNREACHABLE; +} + +bool StreamableReplicaSetMonitor::isHostUp(const HostAndPort& host) const { + MONGO_UNREACHABLE; +} + +int StreamableReplicaSetMonitor::getMinWireVersion() const { + MONGO_UNREACHABLE; +} + +int StreamableReplicaSetMonitor::getMaxWireVersion() const { + MONGO_UNREACHABLE; +} + +std::string StreamableReplicaSetMonitor::getName() const { + MONGO_UNREACHABLE; +} + +std::string StreamableReplicaSetMonitor::getServerAddress() const { + MONGO_UNREACHABLE; +} + +const MongoURI& StreamableReplicaSetMonitor::getOriginalUri() const { + MONGO_UNREACHABLE; +}; +bool StreamableReplicaSetMonitor::contains(const HostAndPort& server) const { + MONGO_UNREACHABLE; +} + +void StreamableReplicaSetMonitor::appendInfo(BSONObjBuilder& b, bool forFTDC) const { + MONGO_UNREACHABLE; +} + +bool StreamableReplicaSetMonitor::isKnownToHaveGoodPrimary() const { + MONGO_UNREACHABLE; +} + +} // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h new file mode 100644 index 00000000000..8c27301660e --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <set> +#include <string> + +#include "mongo/base/string_data.h" +#include "mongo/client/mongo_uri.h" +#include "mongo/client/replica_set_change_notifier.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/executor/task_executor.h" +#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/duration.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class StreamableReplicaSetMonitor : public ReplicaSetMonitor { + StreamableReplicaSetMonitor(const StreamableReplicaSetMonitor&) = delete; + StreamableReplicaSetMonitor& operator=(const StreamableReplicaSetMonitor&) = delete; + +public: + StreamableReplicaSetMonitor(const MongoURI& uri); + + void init() override; + + void drop() override; + + SemiFuture<HostAndPort> getHostOrRefresh( + const ReadPreferenceSetting& readPref, + Milliseconds maxWait = kDefaultFindHostTimeout) override; + + SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh( + const ReadPreferenceSetting& readPref, + Milliseconds maxWait = kDefaultFindHostTimeout) override; + + HostAndPort getMasterOrUassert() override; + + void failedHost(const HostAndPort& host, const Status& status) override; + + bool isPrimary(const HostAndPort& host) const override; + + bool isHostUp(const HostAndPort& host) const override; + + int getMinWireVersion() const override; + + int getMaxWireVersion() const override; + + std::string getName() const override; + + std::string getServerAddress() const override; + + const MongoURI& getOriginalUri() const override; + + bool contains(const HostAndPort& server) const override; + + void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override; + + bool isKnownToHaveGoodPrimary() const override; +}; + +} // namespace mongo diff --git a/src/mongo/dbtests/replica_set_monitor_test.cpp b/src/mongo/dbtests/replica_set_monitor_test.cpp index 7e1d403834b..14571926693 100644 --- a/src/mongo/dbtests/replica_set_monitor_test.cpp +++ b/src/mongo/dbtests/replica_set_monitor_test.cpp @@ -36,7 +36,7 @@ #include "mongo/client/connpool.h" #include "mongo/client/dbclient_rs.h" #include "mongo/client/replica_set_monitor.h" -#include "mongo/client/replica_set_monitor_internal.h" +#include "mongo/client/scanning_replica_set_monitor_internal.h" #include "mongo/dbtests/mock/mock_conn_registry.h" #include "mongo/dbtests/mock/mock_replica_set.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp index 83360c2f958..d02ef15adc4 100644 --- a/src/mongo/s/client/parallel.cpp +++ b/src/mongo/s/client/parallel.cpp @@ -362,7 +362,7 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk( if (allowShardVersionFailure) { const DBClientReplicaSet* replConn = dynamic_cast<const DBClientReplicaSet*>(rawConn); invariant(replConn); - ReplicaSetMonitorPtr rsMonitor = ReplicaSetMonitor::get(replConn->getSetName()); + auto rsMonitor = ReplicaSetMonitor::get(replConn->getSetName()); uassert(16388, str::stream() << "cannot access unknown replica set: " << replConn->getSetName(), rsMonitor != nullptr); diff --git a/src/mongo/shell/shell_utils.cpp b/src/mongo/shell/shell_utils.cpp index 28ebda1afaa..3979d5b2010 100644 --- a/src/mongo/shell/shell_utils.cpp +++ b/src/mongo/shell/shell_utils.cpp @@ -413,7 +413,7 @@ BSONObj replMonitorStats(const BSONObj& a, void* data) { a.nFields() == 1 && a.firstElement().type() == String); auto name = a.firstElement().valuestrsafe(); - ReplicaSetMonitorPtr rsm = ReplicaSetMonitor::get(name); + auto rsm = ReplicaSetMonitor::get(name); if (!rsm) { return BSON("" << "no ReplSetMonitor exists by that name"); |