diff options
Diffstat (limited to 'src/mongo/client/sdam')
-rw-r--r-- | src/mongo/client/sdam/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/client/sdam/sdam.h | 37 | ||||
-rw-r--r-- | src/mongo/client/sdam/sdam_configuration.cpp | 95 | ||||
-rw-r--r-- | src/mongo/client/sdam/sdam_configuration.h | 100 | ||||
-rw-r--r-- | src/mongo/client/sdam/sdam_datatypes.h | 6 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_description.cpp | 27 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_description.h | 7 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_selector.cpp | 267 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_selector.h | 193 | ||||
-rw-r--r-- | src/mongo/client/sdam/server_selector_test.cpp | 463 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_description.cpp | 65 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_description.h | 72 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_description_test.cpp | 90 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener.cpp | 163 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener.h | 77 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_manager.cpp | 68 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_manager.h | 30 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_state_machine.h | 1 |
18 files changed, 1611 insertions, 164 deletions
diff --git a/src/mongo/client/sdam/SConscript b/src/mongo/client/sdam/SConscript index 0a10d22332f..6cc5c642ad6 100644 --- a/src/mongo/client/sdam/SConscript +++ b/src/mongo/client/sdam/SConscript @@ -7,16 +7,21 @@ env = env.Clone() env.Library( target='sdam', source=[ + 'sdam_configuration.cpp', 'sdam_datatypes.cpp', 'server_description.cpp', 'topology_description.cpp', + 'topology_listener.cpp', 'topology_state_machine.cpp', 'topology_manager.cpp', + 'server_selector.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/repl/optime', '$BUILD_DIR/mongo/util/clock_sources', + '$BUILD_DIR/mongo/client/read_preference', + '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/db/wire_version', '$BUILD_DIR/mongo/rpc/metadata', ], @@ -68,6 +73,15 @@ env.CppUnitTest( ) env.CppUnitTest( + target='server_selector_test', + source=['server_selector_test.cpp'], + LIBDEPS=[ + 'sdam', + 'sdam_test', + ], +) + +env.CppUnitTest( target='topology_state_machine_test', source=['topology_state_machine_test.cpp'], LIBDEPS=['sdam', 'sdam_test'], diff --git a/src/mongo/client/sdam/sdam.h b/src/mongo/client/sdam/sdam.h new file mode 100644 index 00000000000..16645e2418a --- /dev/null +++ b/src/mongo/client/sdam/sdam.h @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/client/sdam/sdam.h" +#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/client/sdam/server_description.h" +#include "mongo/client/sdam/server_selector.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/client/sdam/topology_listener.h" +#include "mongo/client/sdam/topology_manager.h" diff --git a/src/mongo/client/sdam/sdam_configuration.cpp b/src/mongo/client/sdam/sdam_configuration.cpp new file mode 100644 index 00000000000..d9852ddae94 --- /dev/null +++ b/src/mongo/client/sdam/sdam_configuration.cpp @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "sdam_configuration.h" + +namespace mongo::sdam { +SdamConfiguration::SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList, + TopologyType initialType, + mongo::Milliseconds heartBeatFrequencyMs, + boost::optional<std::string> setName) + : _seedList(seedList), + _initialType(initialType), + _heartBeatFrequencyMs(heartBeatFrequencyMs), + _setName(setName) { + uassert(ErrorCodes::InvalidSeedList, + "seed list size must be >= 1", + !seedList || (*seedList).size() >= 1); + + uassert(ErrorCodes::InvalidSeedList, + "TopologyType Single must have exactly one entry in the seed list.", + _initialType != TopologyType::kSingle || (*seedList).size() == 1); + + uassert( + ErrorCodes::InvalidTopologyType, + "Only ToplogyTypes ReplicaSetNoPrimary and Single are allowed when a setName is provided.", + !_setName || + (_initialType == TopologyType::kReplicaSetNoPrimary || + _initialType == TopologyType::kSingle)); + + uassert(ErrorCodes::TopologySetNameRequired, + "setName is required for ReplicaSetNoPrimary", + _initialType != TopologyType::kReplicaSetNoPrimary || _setName); + + uassert(ErrorCodes::InvalidHeartBeatFrequency, + "topology heartbeat must be >= 500ms", + _heartBeatFrequencyMs >= kMinHeartbeatFrequencyMS); +} + +const boost::optional<std::vector<ServerAddress>>& SdamConfiguration::getSeedList() const { + return _seedList; +} + +TopologyType SdamConfiguration::getInitialType() const { + return _initialType; +} + +Milliseconds SdamConfiguration::getHeartBeatFrequency() const { + return _heartBeatFrequencyMs; +} + +const boost::optional<std::string>& SdamConfiguration::getSetName() const { + return _setName; +} + + +ServerSelectionConfiguration::ServerSelectionConfiguration( + const Milliseconds localThresholdMs, const Milliseconds serverSelectionTimeoutMs) + : _localThresholdMs(localThresholdMs), _serverSelectionTimeoutMs(serverSelectionTimeoutMs) {} + +Milliseconds ServerSelectionConfiguration::getLocalThresholdMs() const { + return _localThresholdMs; +} + +Milliseconds ServerSelectionConfiguration::getServerSelectionTimeoutMs() const { + return _serverSelectionTimeoutMs; +} +Milliseconds ServerSelectionConfiguration::getHeartBeatFrequencyMs() const { + return _heartBeatFrequencyMs; +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/sdam_configuration.h b/src/mongo/client/sdam/sdam_configuration.h new file mode 100644 index 00000000000..895e52f4d87 --- /dev/null +++ b/src/mongo/client/sdam/sdam_configuration.h @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/client/sdam/sdam_datatypes.h" + +namespace mongo::sdam { +class SdamConfiguration { +public: + SdamConfiguration() : SdamConfiguration(boost::none){}; + + /** + * Initialize the TopologyDescription. This constructor may uassert if the provided + * configuration options are not valid according to the Server Discovery & Monitoring Spec. + * + * Initial Servers + * initial servers may be set to a seed list of one or more server addresses. + * + * Initial TopologyType + * The initial TopologyType may be set to Single, Unknown, or ReplicaSetNoPrimary. + * + * Initial setName + * The client's initial replica set name is required in order to initially configure the + * topology type as ReplicaSetNoPrimary. + * + * Allowed configuration combinations + * TopologyType Single cannot be used with multiple seeds. + * If setName is not null, only TopologyType ReplicaSetNoPrimary and Single, are + * allowed. + */ + explicit SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList, + TopologyType initialType = TopologyType::kUnknown, + Milliseconds heartBeatFrequencyMs = kDefaultHeartbeatFrequencyMs, + boost::optional<std::string> setName = boost::none); + + const boost::optional<std::vector<ServerAddress>>& getSeedList() const; + TopologyType getInitialType() const; + Milliseconds getHeartBeatFrequency() const; + const boost::optional<std::string>& getSetName() const; + + static constexpr Milliseconds kDefaultHeartbeatFrequencyMs = Seconds(10); + static constexpr Milliseconds kMinHeartbeatFrequencyMS = Milliseconds(500); + static constexpr Milliseconds kDefaultConnectTimeoutMS = Milliseconds(100); + +private: + boost::optional<std::vector<ServerAddress>> _seedList; + TopologyType _initialType; + Milliseconds _heartBeatFrequencyMs; + boost::optional<std::string> _setName; +}; + +class ServerSelectionConfiguration { +public: + explicit ServerSelectionConfiguration(const Milliseconds localThresholdMs, + const Milliseconds serverSelectionTimeoutMs); + + Milliseconds getLocalThresholdMs() const; + Milliseconds getServerSelectionTimeoutMs() const; + Milliseconds getHeartBeatFrequencyMs() const; + + static constexpr Milliseconds kDefaultLocalThresholdMS = Milliseconds(15); + static constexpr Milliseconds kDefaultServerSelectionTimeoutMs = Milliseconds(30000); + + static ServerSelectionConfiguration defaultConfiguration() { + return ServerSelectionConfiguration{kDefaultLocalThresholdMS, + kDefaultServerSelectionTimeoutMs}; + } + +private: + Milliseconds _localThresholdMs = kDefaultLocalThresholdMS; + Milliseconds _serverSelectionTimeoutMs = kDefaultServerSelectionTimeoutMs; + Milliseconds _heartBeatFrequencyMs = SdamConfiguration::kDefaultHeartbeatFrequencyMs; +}; +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h index d3f7e7f3b50..520d517da87 100644 --- a/src/mongo/client/sdam/sdam_datatypes.h +++ b/src/mongo/client/sdam/sdam_datatypes.h @@ -126,4 +126,10 @@ using ServerDescriptionPtr = std::shared_ptr<ServerDescription>; class TopologyDescription; using TopologyDescriptionPtr = std::shared_ptr<TopologyDescription>; + +class TopologyManager; +using TopologyManagerPtr = std::unique_ptr<TopologyManager>; + +class TopologyListener; +using TopologyListenerPtr = std::shared_ptr<TopologyListener>; }; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_description.cpp b/src/mongo/client/sdam/server_description.cpp index 1674840a5a6..42dd37d0cdc 100644 --- a/src/mongo/client/sdam/server_description.cpp +++ b/src/mongo/client/sdam/server_description.cpp @@ -26,9 +26,9 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/client/sdam/server_description.h" +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include <algorithm> #include <boost/algorithm/string.hpp> @@ -137,6 +137,14 @@ void ServerDescription::saveTags(BSONObj tagsObj) { } } +void ServerDescription::appendBsonTags(BSONObjBuilder& builder) const { + for (const auto& pair : _tags) { + const auto& key = pair.first; + const auto& value = pair.second; + builder.append(key, value); + } +} + void ServerDescription::saveElectionId(BSONElement electionId) { if (electionId.type() == jstOID) { _electionId = electionId.OID(); @@ -399,6 +407,11 @@ BSONObj ServerDescription::toBson() const { bson.append("arbiters", _arbiters); bson.append("passives", _passives); + if (getTags().size()) { + BSONObjBuilder tagsBuilder(bson.subobjStart("tags")); + appendBsonTags(tagsBuilder); + } + return bson.obj(); } @@ -414,6 +427,18 @@ std::string ServerDescription::toString() const { return toBson().toString(); } +ServerDescriptionPtr ServerDescription::cloneWithRTT(IsMasterRTT rtt) { + auto newServerDescription = std::make_shared<ServerDescription>(*this); + newServerDescription->_rtt = rtt; + return newServerDescription; +} + +const boost::optional<TopologyDescriptionPtr> ServerDescription::getTopologyDescription() { + return (_topologyDescription) + ? boost::optional<TopologyDescriptionPtr>(_topologyDescription->lock()) + : boost::none; +} + bool operator==(const mongo::sdam::ServerDescription& a, const mongo::sdam::ServerDescription& b) { return a.isEquivalent(b); diff --git a/src/mongo/client/sdam/server_description.h b/src/mongo/client/sdam/server_description.h index 6fe02a5a9d8..7d95331fa0e 100644 --- a/src/mongo/client/sdam/server_description.h +++ b/src/mongo/client/sdam/server_description.h @@ -78,6 +78,7 @@ public: const boost::optional<ServerAddress>& getMe() const; const boost::optional<std::string>& getSetName() const; const std::map<std::string, std::string>& getTags() const; + void appendBsonTags(BSONObjBuilder& builder) const; // network attributes const boost::optional<std::string>& getError() const; @@ -104,9 +105,11 @@ public: const boost::optional<int>& getSetVersion() const; const boost::optional<OID>& getElectionId() const; const boost::optional<TopologyVersion>& getTopologyVersion() const; + const boost::optional<TopologyDescriptionPtr> getTopologyDescription(); BSONObj toBson() const; std::string toString() const; + ServerDescriptionPtr cloneWithRTT(IsMasterRTT rtt); private: /** @@ -202,6 +205,10 @@ private: // pool for server. Incremented on network error or timeout. int _poolResetCounter = 0; + // The topology description of that we are a part of + boost::optional<std::weak_ptr<TopologyDescription>> _topologyDescription; + + friend class TopologyDescription; friend class ServerDescriptionBuilder; }; diff --git a/src/mongo/client/sdam/server_selector.cpp b/src/mongo/client/sdam/server_selector.cpp new file mode 100644 index 00000000000..31bacb1b43e --- /dev/null +++ b/src/mongo/client/sdam/server_selector.cpp @@ -0,0 +1,267 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "server_selector.h" + +#include <algorithm> + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/client/sdam/topology_description.h" +#include "mongo/platform/random.h" +#include "mongo/util/log.h" + +namespace mongo::sdam { +ServerSelector::~ServerSelector() {} + +SdamServerSelector::SdamServerSelector(const ServerSelectionConfiguration& config) + : _config(config), _random(PseudoRandom(SecureRandom().nextInt64())) {} + +void SdamServerSelector::_getCandidateServers(std::vector<ServerDescriptionPtr>* result, + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria) { + // when querying the primary we don't need to consider tags + bool shouldTagFilter = true; + + // TODO: check to see if we want to enforce minOpTime at all since + // it was effectively optional in the original implementation. + // TODO: the old version of the RSM does this, and many of + // the tests seem to rely on this behavior for correctness. + if (!criteria.minOpTime.isNull()) { + auto eligibleServers = topologyDescription->findServers([](const ServerDescriptionPtr& s) { + return (s->getType() == ServerType::kRSPrimary || + s->getType() == ServerType::kRSSecondary); + }); + + auto beginIt = eligibleServers.begin(); + auto endIt = eligibleServers.end(); + auto maxIt = std::max_element(beginIt, + endIt, + [topologyDescription](const ServerDescriptionPtr& left, + const ServerDescriptionPtr& right) { + return left->getOpTime() < right->getOpTime(); + }); + if (maxIt != endIt) { + auto maxOpTime = (*maxIt)->getOpTime(); + if (maxOpTime && maxOpTime < criteria.minOpTime) { + // ignore minOpTime + const_cast<ReadPreferenceSetting&>(criteria) = ReadPreferenceSetting(criteria.pref); + log() << "ignoring minOpTime for " << criteria.toString(); + } + } + } + + switch (criteria.pref) { + case ReadPreference::Nearest: + *result = topologyDescription->findServers(nearestFilter(criteria)); + break; + + case ReadPreference::SecondaryOnly: + *result = topologyDescription->findServers(secondaryFilter(criteria)); + break; + + case ReadPreference::PrimaryOnly: { + const auto primaryCriteria = ReadPreferenceSetting(criteria.pref); + *result = topologyDescription->findServers(primaryFilter(primaryCriteria)); + shouldTagFilter = false; + break; + } + + case ReadPreference::PrimaryPreferred: { + // ignore tags and max staleness for primary query + auto primaryCriteria = ReadPreferenceSetting(ReadPreference::PrimaryOnly); + _getCandidateServers(result, topologyDescription, primaryCriteria); + if (result->size()) { + shouldTagFilter = false; + break; + } + + // keep tags and maxStaleness for secondary query + auto secondaryCriteria = criteria; + secondaryCriteria.pref = ReadPreference::SecondaryOnly; + _getCandidateServers(result, topologyDescription, secondaryCriteria); + break; + } + + case ReadPreference::SecondaryPreferred: { + // keep tags and maxStaleness for secondary query + auto secondaryCriteria = criteria; + secondaryCriteria.pref = ReadPreference::SecondaryOnly; + _getCandidateServers(result, topologyDescription, secondaryCriteria); + if (result->size()) { + break; + } + + // ignore tags and maxStaleness for primary query + shouldTagFilter = false; + auto primaryCriteria = ReadPreferenceSetting(ReadPreference::PrimaryOnly); + _getCandidateServers(result, topologyDescription, primaryCriteria); + break; + } + + default: + MONGO_UNREACHABLE + } + + if (shouldTagFilter) { + filterTags(result, criteria.tags); + } +} + +boost::optional<std::vector<ServerDescriptionPtr>> SdamServerSelector::selectServers( + const TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) { + + // If the topology wire version is invalid, raise an error + if (!topologyDescription->isWireVersionCompatible()) { + uasserted(ErrorCodes::IncompatibleServerVersion, + *topologyDescription->getWireVersionCompatibleError()); + } + + if (topologyDescription->getType() == TopologyType::kUnknown) { + return boost::none; + } + + if (topologyDescription->getType() == TopologyType::kSingle) { + auto servers = topologyDescription->getServers(); + return (servers.size() && servers[0]->getType() != ServerType::kUnknown) + ? boost::optional<std::vector<ServerDescriptionPtr>>{{servers[0]}} + : boost::none; + } + + std::vector<ServerDescriptionPtr> results; + _getCandidateServers(&results, topologyDescription, criteria); + + if (results.size()) { + ServerDescriptionPtr minServer = + *std::min_element(results.begin(), results.end(), LatencyWindow::rttCompareFn); + + invariant(minServer->getRtt()); + auto latencyWindow = LatencyWindow(*minServer->getRtt(), _config.getLocalThresholdMs()); + latencyWindow.filterServers(&results); + + // latency window should always leave at least one result + invariant(results.size()); + + return results; + } + + return boost::none; +} + +ServerDescriptionPtr SdamServerSelector::_randomSelect( + const std::vector<ServerDescriptionPtr>& servers) const { + return servers[_random.nextInt64(servers.size())]; +} + +boost::optional<ServerDescriptionPtr> SdamServerSelector::selectServer( + const TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) { + auto servers = selectServers(topologyDescription, criteria); + return servers ? boost::optional<ServerDescriptionPtr>(_randomSelect(*servers)) : boost::none; +} + +bool SdamServerSelector::_containsAllTags(ServerDescriptionPtr server, const BSONObj& tags) { + auto serverTags = server->getTags(); + for (auto& checkTag : tags) { + auto checkKey = checkTag.fieldName(); + auto checkValue = checkTag.String(); + auto pos = serverTags.find(checkKey); + if (pos == serverTags.end() || pos->second != checkValue) { + return false; + } + } + return true; +} + +void SdamServerSelector::filterTags(std::vector<ServerDescriptionPtr>* servers, + const TagSet& tagSet) { + const auto& checkTags = tagSet.getTagBSON(); + + if (checkTags.nFields() == 0) + return; + + const auto predicate = [&](const ServerDescriptionPtr& s) { + auto it = checkTags.begin(); + while (it != checkTags.end()) { + if (it->isABSONObj()) { + const BSONObj& tags = it->Obj(); + if (_containsAllTags(s, tags)) { + // found a match -- don't remove the server + return false; + } + } else { + log() << "invalid tags specified for server selection; tags should be specified as " + "a bson Obj: " + << it->toString(); + } + ++it; + } + + // remove the server + return true; + }; + + servers->erase(std::remove_if(servers->begin(), servers->end(), predicate), servers->end()); +} + +bool SdamServerSelector::recencyFilter(const ReadPreferenceSetting& readPref, + const ServerDescriptionPtr& s) { + bool result = true; + + // TODO: check to see if we want to enforce minOpTime at all since + // it was effectively optional in the original implementation. + if (!readPref.minOpTime.isNull()) { + result = result && (s->getOpTime() >= readPref.minOpTime); + } + + if (readPref.maxStalenessSeconds.count()) { + auto topologyDescription = s->getTopologyDescription(); + invariant(topologyDescription); + auto staleness = _calculateStaleness(*topologyDescription, s); + result = result && (staleness <= readPref.maxStalenessSeconds); + } + + return result; +} + + +void LatencyWindow::filterServers(std::vector<ServerDescriptionPtr>* servers) { + servers->erase(std::remove_if(servers->begin(), + servers->end(), + [&](const ServerDescriptionPtr& s) { + // Servers that have made it to this stage are not ServerType + // == kUnknown, so they must have an associated latency. + invariant(s->getType() != ServerType::kUnknown); + invariant(s->getRtt()); + return !this->isWithinWindow(*s->getRtt()); + }), + servers->end()); +} + +bool LatencyWindow::isWithinWindow(IsMasterRTT latency) { + return lower <= latency && latency <= upper; +} +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_selector.h b/src/mongo/client/sdam/server_selector.h new file mode 100644 index 00000000000..9cb676bd3b6 --- /dev/null +++ b/src/mongo/client/sdam/server_selector.h @@ -0,0 +1,193 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once +#include <functional> +#include <vector> + +#include "mongo/client/read_preference.h" +#include "mongo/client/sdam/sdam_configuration.h" +#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/client/sdam/server_description.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/platform/random.h" + +namespace mongo::sdam { +/** + * This is the interface that allows one to select a server to satisfy a DB operation given a + * TopologyDescription and a ReadPreferenceSetting. + */ +class ServerSelector { +public: + /** + * Finds a list of candidate servers according to the ReadPreferenceSetting. + */ + virtual boost::optional<std::vector<ServerDescriptionPtr>> selectServers( + TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) = 0; + + /** + * Select a single server according to the ReadPreference and latency of the + * ServerDescription(s). The server is selected randomly from those that match the criteria. + */ + virtual boost::optional<ServerDescriptionPtr> selectServer( + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria) = 0; + + virtual ~ServerSelector(); +}; +using ServerSelectorPtr = std::unique_ptr<ServerSelector>; + +class SdamServerSelector : public ServerSelector { +public: + explicit SdamServerSelector(const ServerSelectionConfiguration& config); + + boost::optional<std::vector<ServerDescriptionPtr>> selectServers( + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria) override; + + boost::optional<ServerDescriptionPtr> selectServer( + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria) override; + + // remove servers that do not match the TagSet + void filterTags(std::vector<ServerDescriptionPtr>* servers, const TagSet& tagSet); + +private: + void _getCandidateServers(std::vector<ServerDescriptionPtr>* result, + const TopologyDescriptionPtr topologyDescription, + const ReadPreferenceSetting& criteria); + + bool _containsAllTags(ServerDescriptionPtr server, const BSONObj& tags); + + ServerDescriptionPtr _randomSelect(const std::vector<ServerDescriptionPtr>& servers) const; + + // staleness for a ServerDescription is defined here: + // https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#maxstalenessseconds + Milliseconds _calculateStaleness(const TopologyDescriptionPtr& topologyDescription, + const ServerDescriptionPtr& serverDescription) { + if (serverDescription->getType() != ServerType::kRSSecondary) + return Milliseconds(0); + + const Date_t& lastWriteDate = serverDescription->getLastWriteDate() + ? *serverDescription->getLastWriteDate() + : Date_t::min(); + + if (topologyDescription->getType() == TopologyType::kReplicaSetWithPrimary) { + // (S.lastUpdateTime - S.lastWriteDate) - (P.lastUpdateTime - P.lastWriteDate) + + // heartbeatFrequencyMS + + // topologyType == kReplicaSetWithPrimary implies the validity of the primary server + // description. + invariant(topologyDescription->getPrimary()); + const auto& primaryDescription = *topologyDescription->getPrimary(); + + const auto& primaryLastWriteDate = primaryDescription->getLastWriteDate() + ? *primaryDescription->getLastWriteDate() + : Date_t::min(); + + auto result = (serverDescription->getLastUpdateTime() - lastWriteDate) - + (primaryDescription->getLastUpdateTime() - primaryLastWriteDate) + + _config.getHeartBeatFrequencyMs(); + return duration_cast<Milliseconds>(result); + } else if (topologyDescription->getType() == TopologyType::kReplicaSetNoPrimary) { + // SMax.lastWriteDate - S.lastWriteDate + heartbeatFrequencyMS + Date_t maxLastWriteDate = Date_t::min(); + + // identify secondary with max last write date. + for (const auto& s : topologyDescription->getServers()) { + if (s->getType() != ServerType::kRSSecondary) + continue; + + const auto& sLastWriteDate = + s->getLastWriteDate() ? *s->getLastWriteDate() : Date_t::min(); + + if (sLastWriteDate > maxLastWriteDate) { + maxLastWriteDate = sLastWriteDate; + } + } + + auto result = (maxLastWriteDate - lastWriteDate) + _config.getHeartBeatFrequencyMs(); + return duration_cast<Milliseconds>(result); + } else { + // Not a replica set + return Milliseconds(0); + } + } + + bool recencyFilter(const ReadPreferenceSetting& readPref, const ServerDescriptionPtr& s); + + // A SelectionFilter is a higher order function used to filter out servers from the current + // Topology. It's return value is used as input to the TopologyDescription::findServers + // function, and is a function that takes a ServerDescriptionPtr and returns a bool indicating + // whether to keep this server or not based on the ReadPreference, server type, and recency + // metrics of the server. + using SelectionFilter = unique_function<std::function<bool(const ServerDescriptionPtr&)>( + const ReadPreferenceSetting&)>; + + const SelectionFilter secondaryFilter = [this](const ReadPreferenceSetting& readPref) { + return [&](const ServerDescriptionPtr& s) { + return (s->getType() == ServerType::kRSSecondary) && recencyFilter(readPref, s); + }; + }; + + const SelectionFilter primaryFilter = [this](const ReadPreferenceSetting& readPref) { + return [&](const ServerDescriptionPtr& s) { + return (s->getType() == ServerType::kRSPrimary) && recencyFilter(readPref, s); + }; + }; + + const SelectionFilter nearestFilter = [this](const ReadPreferenceSetting& readPref) { + return [&](const ServerDescriptionPtr& s) { + return (s->getType() == ServerType::kRSPrimary || + s->getType() == ServerType::kRSSecondary) && + recencyFilter(readPref, s); + }; + }; + + ServerSelectionConfiguration _config; + mutable PseudoRandom _random; +}; + +// This is used to filter out servers based on their current latency measurements. +struct LatencyWindow { + const IsMasterRTT lower; + const IsMasterRTT upper; + + explicit LatencyWindow(const IsMasterRTT lowerBound, const IsMasterRTT windowWidth) + : lower(lowerBound), upper(lowerBound + windowWidth) {} + + bool isWithinWindow(IsMasterRTT latency); + + // remove servers not in the latency window in-place. + void filterServers(std::vector<ServerDescriptionPtr>* servers); + + static bool rttCompareFn(const ServerDescriptionPtr& a, const ServerDescriptionPtr& b) { + return a->getRtt() < b->getRtt(); + } +}; +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_selector_test.cpp b/src/mongo/client/sdam/server_selector_test.cpp new file mode 100644 index 00000000000..233c00b32a5 --- /dev/null +++ b/src/mongo/client/sdam/server_selector_test.cpp @@ -0,0 +1,463 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/sdam/server_selector.h" + +#include <boost/optional/optional_io.hpp> + +#include "mongo/client/sdam/sdam_test_base.h" +#include "mongo/client/sdam/server_description_builder.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/client/sdam/topology_manager.h" +#include "mongo/db/wire_version.h" +#include "mongo/util/system_clock_source.h" + +namespace mongo::sdam { + +class ServerSelectorTestFixture : public SdamTestFixture { +public: + static inline const auto clockSource = SystemClockSource::get(); + static inline const auto sdamConfiguration = SdamConfiguration({{"s0"}}); + static inline const auto selectionConfig = + ServerSelectionConfiguration(Milliseconds(10), Milliseconds(10)); + + static constexpr auto SET_NAME = "set"; + static constexpr int NUM_ITERATIONS = 1000; + + struct TagSets { + static inline const auto eastProduction = BSON("dc" + << "east" + << "usage" + << "production"); + static inline const auto westProduction = BSON("dc" + << "west" + << "usage" + << "production"); + static inline const auto northTest = BSON("dc" + << "north" + << "usage" + << "test"); + static inline const auto northProduction = BSON("dc" + << "north" + << "usage" + << "production"); + static inline const auto production = BSON("usage" + << "production"); + + static inline const auto test = BSON("usage" + << "test"); + + static inline const auto integration = BSON("usage" + << "integration"); + + static inline const auto primary = BSON("tag" + << "primary"); + static inline const auto secondary = BSON("tag" + << "secondary"); + + static inline const auto emptySet = TagSet{BSONArray(BSONObj())}; + static inline const auto eastOrWestProductionSet = + TagSet(BSON_ARRAY(eastProduction << westProduction)); + static inline const auto westProductionSet = TagSet(BSON_ARRAY(westProduction)); + static inline const auto productionSet = TagSet(BSON_ARRAY(production)); + static inline const auto testSet = TagSet(BSON_ARRAY(test)); + static inline const auto integrationOrTestSet = TagSet(BSON_ARRAY(integration << test)); + static inline const auto integrationSet = TagSet(BSON_ARRAY(integration)); + + static inline const auto primarySet = TagSet(BSON_ARRAY(primary)); + static inline const auto secondarySet = TagSet(BSON_ARRAY(secondary)); + }; + + static ServerDescriptionPtr make_with_latency(IsMasterRTT latency, + ServerAddress address, + ServerType serverType = ServerType::kRSPrimary, + std::map<std::string, std::string> tags = {}) { + auto builder = ServerDescriptionBuilder() + .withType(serverType) + .withAddress(address) + .withSetName(SET_NAME) + .withRtt(latency) + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastUpdateTime(Date_t::now()); + + for (auto it = tags.begin(); it != tags.end(); ++it) { + builder.withTag(it->first, it->second); + } + + return builder.instance(); + } + + static auto makeServerDescriptionList() { + return std::vector<ServerDescriptionPtr>{ + make_with_latency(Milliseconds(1), + "s1", + ServerType::kRSSecondary, + {{"dc", "east"}, {"usage", "production"}}), + make_with_latency(Milliseconds(1), + "s1-test", + ServerType::kRSSecondary, + {{"dc", "east"}, {"usage", "test"}}), + make_with_latency(Milliseconds(1), + "s2", + ServerType::kRSSecondary, + {{"dc", "west"}, {"usage", "production"}}), + make_with_latency(Milliseconds(1), + "s2-test", + ServerType::kRSSecondary, + {{"dc", "west"}, {"usage", "test"}}), + make_with_latency(Milliseconds(1), + "s3", + ServerType::kRSSecondary, + {{"dc", "north"}, {"usage", "production"}})}; + }; + + SdamServerSelector selector = SdamServerSelector(selectionConfig); +}; + +TEST_F(ServerSelectorTestFixture, ShouldFilterCorrectlyByLatencyWindow) { + const auto delta = Milliseconds(10); + const auto windowWidth = Milliseconds(100); + const auto lowerBound = Milliseconds(100); + + auto window = LatencyWindow(lowerBound, windowWidth); + + std::vector<ServerDescriptionPtr> servers = { + make_with_latency(window.lower - delta, "less"), + make_with_latency(window.lower, "boundary-lower"), + make_with_latency(window.lower + delta, "within"), + make_with_latency(window.upper, "boundary-upper"), + make_with_latency(window.upper + delta, "greater")}; + + window.filterServers(&servers); + + ASSERT_EQ(3, servers.size()); + ASSERT_EQ("boundary-lower", servers[0]->getAddress()); + ASSERT_EQ("within", servers[1]->getAddress()); + ASSERT_EQ("boundary-upper", servers[2]->getAddress()); +} + +TEST_F(ServerSelectorTestFixture, ShouldThrowOnWireError) { + auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration); + auto oldServer = ServerDescriptionBuilder() + .withAddress(topologyDescription->getServers().back()->getAddress()) + .withType(ServerType::kRSPrimary) + .withMaxWireVersion(WireVersion::RELEASE_2_4_AND_BEFORE) + .withMinWireVersion(WireVersion::RELEASE_2_4_AND_BEFORE) + .instance(); + topologyDescription->installServerDescription(oldServer); + + ASSERT(!topologyDescription->isWireVersionCompatible()); + ASSERT_THROWS_CODE(selector.selectServers(topologyDescription, ReadPreferenceSetting()), + DBException, + ErrorCodes::IncompatibleServerVersion); +} + +TEST_F(ServerSelectorTestFixture, ShouldReturnNoneIfTopologyUnknown) { + auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration); + ASSERT_EQ(TopologyType::kUnknown, topologyDescription->getType()); + ASSERT_EQ(boost::none, selector.selectServers(topologyDescription, ReadPreferenceSetting())); +} + +TEST_F(ServerSelectorTestFixture, ShouldSelectRandomlyWhenMultipleOptionsAreAvailable) { + TopologyStateMachine stateMachine(sdamConfiguration); + auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration); + + const auto s0Latency = Milliseconds(1); + auto primary = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kRSPrimary) + .withRtt(s0Latency) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withHost("s3") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .instance(); + stateMachine.onServerDescription(*topologyDescription, primary); + + const auto s1Latency = Milliseconds((s0Latency + selectionConfig.getLocalThresholdMs()) / 2); + auto secondaryInLatencyWindow = make_with_latency(s1Latency, "s1", ServerType::kRSSecondary); + stateMachine.onServerDescription(*topologyDescription, secondaryInLatencyWindow); + + // s2 is on the boundary of the latency window + const auto s2Latency = s0Latency + selectionConfig.getLocalThresholdMs(); + auto secondaryOnBoundaryOfLatencyWindow = + make_with_latency(s2Latency, "s2", ServerType::kRSSecondary); + stateMachine.onServerDescription(*topologyDescription, secondaryOnBoundaryOfLatencyWindow); + + // s3 should not be selected + const auto s3Latency = s2Latency + Milliseconds(10); + auto secondaryTooFar = make_with_latency(s3Latency, "s3", ServerType::kRSSecondary); + stateMachine.onServerDescription(*topologyDescription, secondaryTooFar); + + std::map<ServerAddress, int> frequencyInfo{{"s0", 0}, {"s1", 0}, {"s2", 0}, {"s3", 0}}; + for (int i = 0; i < NUM_ITERATIONS; i++) { + auto server = selector.selectServer(topologyDescription, + ReadPreferenceSetting(ReadPreference::Nearest)); + if (server) { + frequencyInfo[(*server)->getAddress()]++; + } + } + + ASSERT(frequencyInfo["s0"]); + ASSERT(frequencyInfo["s1"]); + ASSERT(frequencyInfo["s2"]); + ASSERT_FALSE(frequencyInfo["s3"]); +} + +TEST_F(ServerSelectorTestFixture, ShouldFilterByLastWriteTime) { + TopologyStateMachine stateMachine(sdamConfiguration); + auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration); + + const int MAX_STALENESS = 60; + const auto sixtySeconds = Seconds(MAX_STALENESS); + const auto now = Date_t::now(); + + + const auto d0 = now - Milliseconds(1000); + const auto s0 = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kRSPrimary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .instance(); + stateMachine.onServerDescription(*topologyDescription, s0); + + const auto d1 = now - Milliseconds(1000 * 5); + const auto s1 = ServerDescriptionBuilder() + .withAddress("s1") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d1) + .instance(); + stateMachine.onServerDescription(*topologyDescription, s1); + + // d2 is stale, so s2 should not be selected. + const auto d2 = now - sixtySeconds - sixtySeconds; + const auto s2 = ServerDescriptionBuilder() + .withAddress("s2") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d2) + .instance(); + stateMachine.onServerDescription(*topologyDescription, s2); + + const auto readPref = + ReadPreferenceSetting(ReadPreference::Nearest, TagSets::emptySet, sixtySeconds); + + std::map<ServerAddress, int> frequencyInfo{{"s0", 0}, {"s1", 0}, {"s2", 0}}; + for (int i = 0; i < NUM_ITERATIONS; i++) { + auto server = selector.selectServer(topologyDescription, readPref); + + if (server) { + frequencyInfo[(*server)->getAddress()]++; + } + } + + ASSERT(frequencyInfo["s0"]); + ASSERT(frequencyInfo["s1"]); + ASSERT_FALSE(frequencyInfo["s2"]); +} + +TEST_F(ServerSelectorTestFixture, ShouldSelectPreferredIfAvailable) { + TopologyStateMachine stateMachine(sdamConfiguration); + auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration); + + const int MAX_STALENESS = 60; + const auto sixtySeconds = Seconds(MAX_STALENESS); + const auto now = Date_t::now(); + + + const auto d0 = now - Milliseconds(1000); + const auto s0 = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kRSPrimary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .withTag("tag", "primary") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s0); + + const auto s1 = ServerDescriptionBuilder() + .withAddress("s1") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .withTag("tag", "secondary") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s1); + + const auto primaryPreferredTagSecondary = + ReadPreferenceSetting(ReadPreference::PrimaryPreferred, TagSets::secondarySet); + auto result1 = selector.selectServer(topologyDescription, primaryPreferredTagSecondary); + ASSERT(result1 != boost::none); + ASSERT_EQ("s0", (*result1)->getAddress()); + + const auto secondaryPreferredWithTag = + ReadPreferenceSetting(ReadPreference::SecondaryPreferred, TagSets::secondarySet); + auto result2 = selector.selectServer(topologyDescription, secondaryPreferredWithTag); + ASSERT(result2 != boost::none); + ASSERT_EQ("s1", (*result2)->getAddress()); + + const auto secondaryPreferredNoTag = ReadPreferenceSetting(ReadPreference::SecondaryPreferred); + auto result3 = selector.selectServer(topologyDescription, secondaryPreferredNoTag); + ASSERT(result3 != boost::none); + ASSERT_EQ("s1", (*result2)->getAddress()); +} + +TEST_F(ServerSelectorTestFixture, ShouldSelectTaggedSecondaryIfPreferredPrimaryNotAvailable) { + TopologyStateMachine stateMachine(sdamConfiguration); + auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration); + + const int MAX_STALENESS = 60; + const auto sixtySeconds = Seconds(MAX_STALENESS); + const auto now = Date_t::now(); + + const auto d0 = now - Milliseconds(1000); + + const auto s0 = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kRSPrimary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .withTag("tag", "primary") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s0); + + // old primary unavailable + const auto s0_failed = ServerDescriptionBuilder() + .withAddress("s0") + .withType(ServerType::kUnknown) + .withSetName("set") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s0_failed); + + const auto s1 = ServerDescriptionBuilder() + .withAddress("s1") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .withTag("tag", "secondary") + .instance(); + stateMachine.onServerDescription(*topologyDescription, s1); + + const auto s2 = ServerDescriptionBuilder() + .withAddress("s2") + .withType(ServerType::kRSSecondary) + .withRtt(selectionConfig.getLocalThresholdMs()) + .withSetName("set") + .withHost("s0") + .withHost("s1") + .withHost("s2") + .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG) + .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION) + .withLastWriteDate(d0) + .instance(); + stateMachine.onServerDescription(*topologyDescription, s2); + + const auto primaryPreferredTagSecondary = + ReadPreferenceSetting(ReadPreference::PrimaryPreferred, TagSets::secondarySet); + auto result1 = selector.selectServer(topologyDescription, primaryPreferredTagSecondary); + ASSERT(result1 != boost::none); + ASSERT_EQ("s1", (*result1)->getAddress()); +} + +TEST_F(ServerSelectorTestFixture, ShouldFilterByTags) { + auto tags = TagSets::productionSet; + auto servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(3, servers.size()); + + tags = TagSets::eastOrWestProductionSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(2, servers.size()); + + tags = TagSets::testSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(2, servers.size()); + + tags = TagSets::integrationOrTestSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(2, servers.size()); + + tags = TagSets::westProductionSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(1, servers.size()); + + tags = TagSets::integrationSet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(0, servers.size()); + + tags = TagSets::emptySet; + servers = makeServerDescriptionList(); + selector.filterTags(&servers, tags); + ASSERT_EQ(makeServerDescriptionList().size(), servers.size()); +} +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_description.cpp b/src/mongo/client/sdam/topology_description.cpp index d3783719864..f236f388e90 100644 --- a/src/mongo/client/sdam/topology_description.cpp +++ b/src/mongo/client/sdam/topology_description.cpp @@ -107,6 +107,9 @@ const boost::optional<ServerDescriptionPtr> TopologyDescription::findServerByAdd boost::optional<ServerDescriptionPtr> TopologyDescription::installServerDescription( const ServerDescriptionPtr& newServerDescription) { + LOG(2) << "(" << getSetName() << ") install ServerDescription " + << newServerDescription->toString(); + boost::optional<ServerDescriptionPtr> previousDescription; if (getType() == TopologyType::kSingle) { // For Single, there is always one ServerDescription in TopologyDescription.servers; @@ -131,6 +134,8 @@ boost::optional<ServerDescriptionPtr> TopologyDescription::installServerDescript } } + newServerDescription->_topologyDescription = shared_from_this(); + checkWireCompatibilityVersions(); calculateLogicalSessionTimeout(); return previousDescription; @@ -174,13 +179,12 @@ void TopologyDescription::checkWireCompatibilityVersions() { break; } } - _compatibleError = (_compatible) ? boost::none : boost::make_optional(errorOss.str()); } const std::string TopologyDescription::minimumRequiredMongoVersionString(int version) { switch (version) { - case RESUMABLE_INITIAL_SYNC: + case RESUMABLE_INITIAL_SYNC: return "4.4"; case SHARDED_TRANSACTIONS: return "4.2"; @@ -270,54 +274,15 @@ std::string TopologyDescription::toString() { return toBSON().toString(); } -//////////////////////// -// SdamConfiguration -//////////////////////// -SdamConfiguration::SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList, - TopologyType initialType, - mongo::Milliseconds heartBeatFrequencyMs, - boost::optional<std::string> setName) - : _seedList(seedList), - _initialType(initialType), - _heartBeatFrequencyMs(heartBeatFrequencyMs), - _setName(setName) { - uassert(ErrorCodes::InvalidSeedList, - "seed list size must be >= 1", - !seedList || (*seedList).size() >= 1); - - uassert(ErrorCodes::InvalidSeedList, - "TopologyType Single must have exactly one entry in the seed list.", - _initialType != TopologyType::kSingle || (*seedList).size() == 1); - - uassert( - ErrorCodes::InvalidTopologyType, - "Only ToplogyTypes ReplicaSetNoPrimary and Single are allowed when a setName is provided.", - !_setName || - (_initialType == TopologyType::kReplicaSetNoPrimary || - _initialType == TopologyType::kSingle)); - - uassert(ErrorCodes::TopologySetNameRequired, - "setName is required for ReplicaSetNoPrimary", - _initialType != TopologyType::kReplicaSetNoPrimary || _setName); - - uassert(ErrorCodes::InvalidHeartBeatFrequency, - "topology heartbeat must be >= 500ms", - _heartBeatFrequencyMs >= kMinHeartbeatFrequencyMS); -} -const boost::optional<std::vector<ServerAddress>>& SdamConfiguration::getSeedList() const { - return _seedList; -} - -TopologyType SdamConfiguration::getInitialType() const { - return _initialType; -} - -Milliseconds SdamConfiguration::getHeartBeatFrequency() const { - return _heartBeatFrequencyMs; -} +boost::optional<ServerDescriptionPtr> TopologyDescription::getPrimary() { + if (getType() != TopologyType::kReplicaSetWithPrimary) { + return boost::none; + } -const boost::optional<std::string>& SdamConfiguration::getSetName() const { - return _setName; + auto foundPrimaries = findServers( + [](const ServerDescriptionPtr& s) { return s->getType() == ServerType::kRSPrimary; }); + invariant(foundPrimaries.size() == 1); + return foundPrimaries[0]; } -}; // namespace mongo::sdam +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_description.h b/src/mongo/client/sdam/topology_description.h index 5894ad0d20f..a0469eb118b 100644 --- a/src/mongo/client/sdam/topology_description.h +++ b/src/mongo/client/sdam/topology_description.h @@ -36,55 +36,13 @@ #include "mongo/bson/oid.h" #include "mongo/client/read_preference.h" +#include "mongo/client/sdam/sdam_configuration.h" #include "mongo/client/sdam/sdam_datatypes.h" #include "mongo/client/sdam/server_description.h" #include "mongo/platform/basic.h" namespace mongo::sdam { -class SdamConfiguration { -public: - SdamConfiguration() : SdamConfiguration(boost::none){}; - - /** - * Initialize the TopologyDescription. This constructor may uassert if the provided - * configuration options are not valid according to the Server Discovery & Monitoring Spec. - * - * Initial Servers - * initial servers may be set to a seed list of one or more server addresses. - * - * Initial TopologyType - * The initial TopologyType may be set to Single, Unknown, or ReplicaSetNoPrimary. - * - * Initial setName - * The client's initial replica set name is required in order to initially configure the - * topology type as ReplicaSetNoPrimary. - * - * Allowed configuration combinations - * TopologyType Single cannot be used with multiple seeds. - * If setName is not null, only TopologyType ReplicaSetNoPrimary and Single, are - * allowed. - */ - SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList, - TopologyType initialType = TopologyType::kUnknown, - mongo::Milliseconds heartBeatFrequencyMs = kDefaultHeartbeatFrequencyMs, - boost::optional<std::string> setName = boost::none); - - const boost::optional<std::vector<ServerAddress>>& getSeedList() const; - TopologyType getInitialType() const; - Milliseconds getHeartBeatFrequency() const; - const boost::optional<std::string>& getSetName() const; - - static inline const mongo::Milliseconds kDefaultHeartbeatFrequencyMs = mongo::Seconds(10); - static inline const mongo::Milliseconds kMinHeartbeatFrequencyMS = mongo::Milliseconds(500); - -private: - boost::optional<std::vector<ServerAddress>> _seedList; - TopologyType _initialType; - mongo::Milliseconds _heartBeatFrequencyMs; - boost::optional<std::string> _setName; -}; - -class TopologyDescription { +class TopologyDescription : public std::enable_shared_from_this<TopologyDescription> { public: TopologyDescription() : TopologyDescription(SdamConfiguration()) {} TopologyDescription(const TopologyDescription& source) = default; @@ -113,6 +71,7 @@ public: bool containsServerAddress(const ServerAddress& address) const; std::vector<ServerDescriptionPtr> findServers( std::function<bool(const ServerDescriptionPtr&)> predicate) const; + boost::optional<ServerDescriptionPtr> getPrimary(); /** * Adds the given ServerDescription or swaps it with an existing one @@ -129,12 +88,29 @@ public: std::string toString(); private: + friend bool operator==(const TopologyDescription& lhs, const TopologyDescription& rhs) { + return std::tie(lhs._setName, + lhs._type, + lhs._maxSetVersion, + lhs._maxElectionId, + lhs._servers, + lhs._compatible, + lhs._logicalSessionTimeoutMinutes) == + std::tie(rhs._setName, + rhs._type, + rhs._maxSetVersion, + rhs._maxElectionId, + rhs._servers, + rhs._compatible, + rhs._logicalSessionTimeoutMinutes); + } + /** * Checks if all server descriptions are compatible with this server's WireVersion. If an - * incompatible description is found, we set the topologyDescription's _compatible flag to false - * and store an error message in _compatibleError. A ServerDescription which is not Unknown is - * incompatible if: - * minWireVersion > serverMaxWireVersion, or maxWireVersion < serverMinWireVersion + * incompatible description is found, we set the topologyDescription's _compatible flag to + * false and store an error message in _compatibleError. A ServerDescription which is not + * Unknown is incompatible if: minWireVersion > serverMaxWireVersion, or maxWireVersion < + * serverMinWireVersion */ void checkWireCompatibilityVersions(); diff --git a/src/mongo/client/sdam/topology_description_test.cpp b/src/mongo/client/sdam/topology_description_test.cpp index 9f7a6a2dbee..d43be9642b8 100644 --- a/src/mongo/client/sdam/topology_description_test.cpp +++ b/src/mongo/client/sdam/topology_description_test.cpp @@ -26,8 +26,6 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - #include "mongo/client/sdam/sdam_test_base.h" #include "mongo/client/sdam/topology_description.h" @@ -36,7 +34,6 @@ #include "mongo/client/sdam/server_description.h" #include "mongo/client/sdam/server_description_builder.h" #include "mongo/db/wire_version.h" -#include "mongo/logv2/log.h" #include "mongo/unittest/death_test.h" namespace mongo { @@ -107,9 +104,10 @@ TEST_F(TopologyDescriptionTestFixture, ShouldAllowTypeSingleWithASingleSeed) { } TEST_F(TopologyDescriptionTestFixture, DoesNotAllowMultipleSeedsWithSingle) { - ASSERT_THROWS_CODE(TopologyDescription({kTwoServersNormalCase, TopologyType::kSingle}), - DBException, - ErrorCodes::InvalidSeedList); + ASSERT_THROWS_CODE( + TopologyDescription(SdamConfiguration(kTwoServersNormalCase, TopologyType::kSingle)), + DBException, + ErrorCodes::InvalidSeedList); } TEST_F(TopologyDescriptionTestFixture, ShouldSetTheReplicaSetName) { @@ -121,10 +119,10 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetTheReplicaSetName) { } TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowSettingTheReplicaSetNameWithWrongType) { - ASSERT_THROWS_CODE( - TopologyDescription({kOneServer, TopologyType::kUnknown, mongo::Seconds(10), kSetName}), - DBException, - ErrorCodes::InvalidTopologyType); + ASSERT_THROWS_CODE(TopologyDescription(SdamConfiguration( + kOneServer, TopologyType::kUnknown, mongo::Seconds(10), kSetName)), + DBException, + ErrorCodes::InvalidTopologyType); } TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowTopologyTypeRSNoPrimaryWithoutSetName) { @@ -146,9 +144,8 @@ TEST_F(TopologyDescriptionTestFixture, ShouldOnlyAllowSingleAndRsNoPrimaryWithSe topologyTypes.end()); for (const auto topologyType : topologyTypes) { - LOGV2(20217, - "Check TopologyType {topologyType} with setName value.", - "topologyType"_attr = toString(topologyType)); + unittest::log() << "Check TopologyType " << toString(topologyType) + << " with setName value."; ASSERT_THROWS_CODE( SdamConfiguration(kOneServer, topologyType, mongo::Seconds(10), kSetName), DBException, @@ -180,7 +177,7 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetWireCompatibilityErrorForMinWireVersionWhenMinWireVersionIsGreater) { const auto outgoingMaxWireVersion = WireSpec::instance().outgoing.maxWireVersion; const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10)); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared<TopologyDescription>(config); const auto serverDescriptionMinVersion = ServerDescriptionBuilder() .withAddress(kOneServer[0]) .withMe(kOneServer[0]) @@ -188,16 +185,16 @@ TEST_F(TopologyDescriptionTestFixture, .withMinWireVersion(outgoingMaxWireVersion + 1) .instance(); - ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); - topologyDescription.installServerDescription(serverDescriptionMinVersion); - ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); + topologyDescription->installServerDescription(serverDescriptionMinVersion); + ASSERT_NOT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); } TEST_F(TopologyDescriptionTestFixture, ShouldSetWireCompatibilityErrorForMinWireVersionWhenMaxWireVersionIsLess) { const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion; const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10)); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared<TopologyDescription>(config); const auto serverDescriptionMaxVersion = ServerDescriptionBuilder() .withAddress(kOneServer[0]) .withMe(kOneServer[0]) @@ -205,31 +202,31 @@ TEST_F(TopologyDescriptionTestFixture, .withMaxWireVersion(outgoingMinWireVersion - 1) .instance(); - ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); - topologyDescription.installServerDescription(serverDescriptionMaxVersion); - ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); + topologyDescription->installServerDescription(serverDescriptionMaxVersion); + ASSERT_NOT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); } TEST_F(TopologyDescriptionTestFixture, ShouldNotSetWireCompatibilityErrorWhenServerTypeIsUnknown) { const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion; const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10)); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared<TopologyDescription>(config); const auto serverDescriptionMaxVersion = ServerDescriptionBuilder().withMaxWireVersion(outgoingMinWireVersion - 1).instance(); - ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); - topologyDescription.installServerDescription(serverDescriptionMaxVersion); - ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); + topologyDescription->installServerDescription(serverDescriptionMaxVersion); + ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError()); } TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToMinOfAllServerDescriptions) { const auto config = SdamConfiguration(kThreeServers); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared<TopologyDescription>(config); const auto logicalSessionTimeouts = std::vector{300, 100, 200}; auto timeoutIt = logicalSessionTimeouts.begin(); const auto serverDescriptionsWithTimeouts = map<ServerDescriptionPtr, ServerDescriptionPtr>( - topologyDescription.getServers(), [&timeoutIt](const ServerDescriptionPtr& description) { + topologyDescription->getServers(), [&timeoutIt](const ServerDescriptionPtr& description) { auto newInstanceBuilder = ServerDescriptionBuilder() .withType(ServerType::kRSSecondary) .withAddress(description->getAddress()) @@ -240,26 +237,26 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToMinOfAllS }); for (auto description : serverDescriptionsWithTimeouts) { - topologyDescription.installServerDescription(description); + topologyDescription->installServerDescription(description); } int expectedLogicalSessionTimeout = *std::min_element(logicalSessionTimeouts.begin(), logicalSessionTimeouts.end()); ASSERT_EQUALS(expectedLogicalSessionTimeout, - topologyDescription.getLogicalSessionTimeoutMinutes()); + topologyDescription->getLogicalSessionTimeoutMinutes()); } TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToNoneIfAnyServerDescriptionHasNone) { const auto config = SdamConfiguration(kThreeServers); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared<TopologyDescription>(config); const auto logicalSessionTimeouts = std::vector{300, 100, 200}; auto timeoutIt = logicalSessionTimeouts.begin(); const auto serverDescriptionsWithTimeouts = map<ServerDescriptionPtr, ServerDescriptionPtr>( - topologyDescription.getServers(), [&](const ServerDescriptionPtr& description) { + topologyDescription->getServers(), [&](const ServerDescriptionPtr& description) { auto timeoutValue = (timeoutIt == logicalSessionTimeouts.begin()) ? boost::none : boost::make_optional(*timeoutIt); @@ -275,19 +272,19 @@ TEST_F(TopologyDescriptionTestFixture, }); for (auto description : serverDescriptionsWithTimeouts) { - topologyDescription.installServerDescription(description); + topologyDescription->installServerDescription(description); } - ASSERT_EQUALS(boost::none, topologyDescription.getLogicalSessionTimeoutMinutes()); + ASSERT_EQUALS(boost::none, topologyDescription->getLogicalSessionTimeoutMinutes()); } TEST_F(TopologyDescriptionTestFixture, ShouldUpdateTopologyVersionOnSuccess) { const auto config = SdamConfiguration(kThreeServers); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared<TopologyDescription>(config); // Deafult topologyVersion is null - ASSERT_EQUALS(topologyDescription.getServers().size(), 3); - auto serverDescription = topologyDescription.getServers()[1]; + ASSERT_EQUALS(topologyDescription->getServers().size(), 3); + auto serverDescription = topologyDescription->getServers()[1]; ASSERT(serverDescription->getTopologyVersion() == boost::none); // Create new serverDescription with topologyVersion, topologyDescription should have the new @@ -300,20 +297,19 @@ TEST_F(TopologyDescriptionTestFixture, ShouldUpdateTopologyVersionOnSuccess) { .withTopologyVersion(TopologyVersion(processId, 1)) .instance(); - topologyDescription.installServerDescription(newDescription); - ASSERT_EQUALS(topologyDescription.getServers().size(), 3); - auto topologyVersion = topologyDescription.getServers()[1]->getTopologyVersion(); - ASSERT(topologyVersion->getProcessId() == processId); - ASSERT(topologyVersion->getCounter() == 1); + topologyDescription->installServerDescription(newDescription); + ASSERT_EQUALS(topologyDescription->getServers().size(), 3); + auto topologyVersion = topologyDescription->getServers()[1]->getTopologyVersion(); + ASSERT(topologyVersion == TopologyVersion(processId, 1)); } TEST_F(TopologyDescriptionTestFixture, ShouldNotUpdateTopologyVersionOnError) { const auto config = SdamConfiguration(kThreeServers); - TopologyDescription topologyDescription(config); + const auto topologyDescription = std::make_shared<TopologyDescription>(config); // Deafult topologyVersion is null - ASSERT_EQUALS(topologyDescription.getServers().size(), 3); - auto serverDescription = topologyDescription.getServers()[1]; + ASSERT_EQUALS(topologyDescription->getServers().size(), 3); + auto serverDescription = topologyDescription->getServers()[1]; ASSERT(serverDescription->getTopologyVersion() == boost::none); auto newDescription = ServerDescriptionBuilder() @@ -321,9 +317,9 @@ TEST_F(TopologyDescriptionTestFixture, ShouldNotUpdateTopologyVersionOnError) { .withError("error") .instance(); - topologyDescription.installServerDescription(newDescription); - ASSERT_EQUALS(topologyDescription.getServers().size(), 3); - auto topologyVersion = topologyDescription.getServers()[1]->getTopologyVersion(); + topologyDescription->installServerDescription(newDescription); + ASSERT_EQUALS(topologyDescription->getServers().size(), 3); + auto topologyVersion = topologyDescription->getServers()[1]->getTopologyVersion(); ASSERT(topologyVersion == boost::none); } }; // namespace sdam diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp new file mode 100644 index 00000000000..995685b71b9 --- /dev/null +++ b/src/mongo/client/sdam/topology_listener.cpp @@ -0,0 +1,163 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/client/sdam/topology_listener.h" + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/util/log.h" + +namespace mongo::sdam { +void TopologyEventsPublisher::registerListener(TopologyListenerPtr listener) { + stdx::lock_guard lock(_mutex); + _listeners.push_back(listener); +} + +void TopologyEventsPublisher::removeListener(TopologyListenerPtr listener) { + stdx::lock_guard lock(_mutex); + _listeners.erase(std::remove(_listeners.begin(), _listeners.end(), listener), _listeners.end()); +} + +void TopologyEventsPublisher::close() { + stdx::lock_guard lock(_mutex); + _listeners.clear(); + _isClosed = true; +} + +void TopologyEventsPublisher::onTopologyDescriptionChangedEvent( + UUID topologyId, + TopologyDescriptionPtr previousDescription, + TopologyDescriptionPtr newDescription) { + { + stdx::lock_guard lock(_eventQueueMutex); + EventPtr event = std::make_unique<Event>(); + event->type = EventType::TOPOLOGY_DESCRIPTION_CHANGED; + event->topologyId = std::move(topologyId); + event->previousDescription = previousDescription; + event->newDescription = newDescription; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} + +void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, + const ServerAddress& hostAndPort, + const BSONObj reply) { + { + stdx::lock_guard lock(_eventQueueMutex); + EventPtr event = std::make_unique<Event>(); + event->type = EventType::HEARTBEAT_SUCCESS; + event->duration = duration_cast<IsMasterRTT>(durationMs); + event->hostAndPort = hostAndPort; + event->reply = reply; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} + +void TopologyEventsPublisher::onServerHeartbeatFailureEvent(IsMasterRTT durationMs, + Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply) { + { + stdx::lock_guard lock(_eventQueueMutex); + EventPtr event = std::make_unique<Event>(); + event->type = EventType::HEARTBEAT_FAILURE; + event->duration = duration_cast<IsMasterRTT>(durationMs); + event->hostAndPort = hostAndPort; + event->reply = reply; + event->status = errorStatus; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} + +void TopologyEventsPublisher::_scheduleNextDelivery() { + // run nextDelivery async + _executor->schedule( + [self = shared_from_this()](const Status& status) { self->_nextDelivery(); }); +} + +void TopologyEventsPublisher::onServerPingFailedEvent(const ServerAddress& hostAndPort, + const Status& status) {} + +void TopologyEventsPublisher::onServerPingSucceededEvent(IsMasterRTT durationMS, + const ServerAddress& hostAndPort) {} + +// TODO: this could be done in batches if this is a bottleneck. +void TopologyEventsPublisher::_nextDelivery() { + // get the next event to send + EventPtr nextEvent; + { + stdx::lock_guard lock(_eventQueueMutex); + if (!_eventQueue.size()) { + return; + } + nextEvent = std::move(_eventQueue.front()); + _eventQueue.pop_front(); + } + + // release the lock before sending to avoid deadlock in the case there + // are events generated by sending the current one. + std::vector<TopologyListenerPtr> listeners; + { + stdx::lock_guard lock(_mutex); + if (_isClosed) { + return; + } + listeners = _listeners; + } + + // send to the listeners outside of the lock. + for (auto listener : listeners) { + _sendEvent(listener, *nextEvent); + } +} + +void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Event& event) { + switch (event.type) { + case EventType::HEARTBEAT_SUCCESS: + listener->onServerHeartbeatSucceededEvent( + duration_cast<IsMasterRTT>(event.duration), event.hostAndPort, event.reply); + break; + case EventType::HEARTBEAT_FAILURE: + listener->onServerHeartbeatFailureEvent(duration_cast<IsMasterRTT>(event.duration), + event.status, + event.hostAndPort, + event.reply); + break; + case EventType::TOPOLOGY_DESCRIPTION_CHANGED: + // TODO: fix uuid or just remove + listener->onTopologyDescriptionChangedEvent( + UUID::gen(), event.previousDescription, event.newDescription); + break; + default: + MONGO_UNREACHABLE; + } +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h index 0cf38ddf68d..dde1ae3b683 100644 --- a/src/mongo/client/sdam/topology_listener.h +++ b/src/mongo/client/sdam/topology_listener.h @@ -26,10 +26,13 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ - #pragma once +#include <deque> +#include <memory> +#include <vector> #include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/executor/task_executor.h" #include "mongo/util/uuid.h" namespace mongo::sdam { @@ -49,13 +52,18 @@ public: TopologyDescriptionPtr previousDescription, TopologyDescriptionPtr newDescription){}; + virtual void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, + Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply){}; /** * Called when a ServerHeartBeatSucceededEvent is published - A heartbeat sent to the server at * hostAndPort succeeded. durationMS is the execution time of the event, including the time it * took to send the message and recieve the reply from the server. */ virtual void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, - const ServerAddress& hostAndPort){}; + const ServerAddress& hostAndPort, + const BSONObj reply){}; /* * Called when a ServerPingFailedEvent is published - A monitoring ping to the server at @@ -70,4 +78,69 @@ public: virtual void onServerPingSucceededEvent(IsMasterRTT durationMS, const ServerAddress& hostAndPort){}; }; + +/** + * This class publishes TopologyListener events to a group of registered listeners. + * + * To publish an event to all registered listeners call the corresponding event function on the + * TopologyEventsPublisher instance. + */ +class TopologyEventsPublisher final : public TopologyListener, + public std::enable_shared_from_this<TopologyEventsPublisher> { +public: + TopologyEventsPublisher(std::shared_ptr<executor::TaskExecutor> executor) + : _executor(executor){}; + void registerListener(TopologyListenerPtr listener); + void removeListener(TopologyListenerPtr listener); + void close(); + + void onTopologyDescriptionChangedEvent(UUID topologyId, + TopologyDescriptionPtr previousDescription, + TopologyDescriptionPtr newDescription) override; + void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, + const ServerAddress& hostAndPort, + const BSONObj reply) override; + void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, + Status errorStatus, + const ServerAddress& hostAndPort, + const BSONObj reply) override; + void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override; + void onServerPingSucceededEvent(IsMasterRTT durationMS, + const ServerAddress& hostAndPort) override; + + +private: + enum class EventType { + HEARTBEAT_SUCCESS, + HEARTBEAT_FAILURE, + PING_SUCCESS, + PING_FAILURE, + TOPOLOGY_DESCRIPTION_CHANGED + }; + struct Event { + EventType type; + ServerAddress hostAndPort; + IsMasterRTT duration; + BSONObj reply; + TopologyDescriptionPtr previousDescription; + TopologyDescriptionPtr newDescription; + boost::optional<UUID> topologyId; + Status status = Status::OK(); + }; + using EventPtr = std::unique_ptr<Event>; + + void _sendEvent(TopologyListenerPtr listener, const TopologyEventsPublisher::Event& event); + void _nextDelivery(); + void _scheduleNextDelivery(); + + // Lock acquisition order to avoid deadlock is _eventQueueMutex -> _mutex + Mutex _eventQueueMutex; + std::deque<EventPtr> _eventQueue; + + Mutex _mutex; + bool _isClosed = false; + std::shared_ptr<executor::TaskExecutor> _executor; + std::vector<TopologyListenerPtr> _listeners; +}; +using TopologyEventsPublisherPtr = std::shared_ptr<TopologyEventsPublisher>; } // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp index 7897e6948f7..64d74e114d9 100644 --- a/src/mongo/client/sdam/topology_manager.cpp +++ b/src/mongo/client/sdam/topology_manager.cpp @@ -32,9 +32,9 @@ #include "mongo/client/sdam/topology_state_machine.h" #include "mongo/logv2/log.h" +#include "mongo/rpc/topology_version_gen.h" namespace mongo::sdam { - namespace { /* Compare topologyVersions to determine if the isMaster response's topologyVersion is stale @@ -57,16 +57,19 @@ bool isStaleTopologyVersion(boost::optional<TopologyVersion> lastTopologyVersion return false; } - } // namespace -TopologyManager::TopologyManager(SdamConfiguration config, ClockSource* clockSource) + +TopologyManager::TopologyManager(SdamConfiguration config, + ClockSource* clockSource, + TopologyEventsPublisherPtr eventsPublisher) : _config(std::move(config)), _clockSource(clockSource), _topologyDescription(std::make_unique<TopologyDescription>(_config)), - _topologyStateMachine(std::make_unique<TopologyStateMachine>(_config)) {} + _topologyStateMachine(std::make_unique<TopologyStateMachine>(_config)), + _topologyEventsPublisher(eventsPublisher) {} -void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome) { +bool TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome) { stdx::lock_guard<mongo::Mutex> lock(_mutex); boost::optional<IsMasterRTT> lastRTT; @@ -85,11 +88,11 @@ void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome if (isStaleTopologyVersion(lastTopologyVersion, newTopologyVersion)) { LOGV2( 23930, - "Ignoring this isMaster response because our topologyVersion: {lastTopologyVersion}is " + "Ignoring this isMaster response because our topologyVersion: {lastTopologyVersion} is " "fresher than the provided topologyVersion: {newTopologyVersion}", "lastTopologyVersion"_attr = lastTopologyVersion->toBSON(), "newTopologyVersion"_attr = newTopologyVersion->toBSON()); - return; + return false; } boost::optional<int> poolResetCounter = lastPoolResetCounter; @@ -98,17 +101,60 @@ void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome poolResetCounter = ++lastPoolResetCounter.get(); } - // newTopologyVersion will be null if the isMaster response did not provide one. auto newServerDescription = std::make_shared<ServerDescription>( _clockSource, isMasterOutcome, lastRTT, newTopologyVersion, poolResetCounter); - auto newTopologyDescription = std::make_unique<TopologyDescription>(*_topologyDescription); - _topologyStateMachine->onServerDescription(*newTopologyDescription, newServerDescription); - _topologyDescription = std::move(newTopologyDescription); + auto oldTopologyDescription = _topologyDescription; + _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription); + + // if we are equal to the old description, just install the new description without + // performing any actions on the state machine. + auto isEqualToOldServerDescription = + (lastServerDescription && (*lastServerDescription->get()) == *newServerDescription); + if (isEqualToOldServerDescription) { + _topologyDescription->installServerDescription(newServerDescription); + } else { + _topologyStateMachine->onServerDescription(*_topologyDescription, newServerDescription); + } + + _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription); + return true; } const std::shared_ptr<TopologyDescription> TopologyManager::getTopologyDescription() const { stdx::lock_guard<mongo::Mutex> lock(_mutex); return _topologyDescription; } + +void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt) { + stdx::lock_guard<mongo::Mutex> lock(_mutex); + + auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort); + if (oldServerDescription) { + auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt); + + auto oldTopologyDescription = _topologyDescription; + _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription); + _topologyDescription->installServerDescription(newServerDescription); + + _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription); + + return; + } + + // otherwise, the server was removed from the topology. Nothing to do. + LOGV2(433301, + str::stream() << "Not updating RTT. Server {server}" << hostAndPort + << " does not exist in ", + "server"_attr = hostAndPort, + "setName"_attr = getTopologyDescription()->getSetName()); +} + +void TopologyManager::_publishTopologyDescriptionChanged( + const TopologyDescriptionPtr& oldTopologyDescription, + const TopologyDescriptionPtr& newTopologyDescription) const { + if (_topologyEventsPublisher) + _topologyEventsPublisher->onTopologyDescriptionChangedEvent( + newTopologyDescription->getId(), oldTopologyDescription, newTopologyDescription); +} }; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_manager.h b/src/mongo/client/sdam/topology_manager.h index 292d48b6e8e..5ae64d94644 100644 --- a/src/mongo/client/sdam/topology_manager.h +++ b/src/mongo/client/sdam/topology_manager.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2019-present MongoDB, Inc. + * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -31,6 +31,7 @@ #include "mongo/client/sdam/sdam_datatypes.h" #include "mongo/client/sdam/topology_description.h" +#include "mongo/client/sdam/topology_listener.h" #include "mongo/client/sdam/topology_state_machine.h" namespace mongo::sdam { @@ -44,7 +45,9 @@ class TopologyManager { TopologyManager(const TopologyManager&) = delete; public: - TopologyManager(SdamConfiguration config, ClockSource* clockSource); + explicit TopologyManager(SdamConfiguration config, + ClockSource* clockSource, + TopologyEventsPublisherPtr eventsPublisher = nullptr); /** * This function atomically: @@ -57,7 +60,19 @@ public: * IsMasterOutcomes serially, as required by: * https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#process-one-ismaster-outcome-at-a-time */ - void onServerDescription(const IsMasterOutcome& isMasterOutcome); + bool onServerDescription(const IsMasterOutcome& isMasterOutcome); + + + /** + * This function updates the RTT value for a server without executing any state machine actions. + * It atomically: + * 1. Clones the current TopologyDescription + * 2. Clones the ServerDescription corresponding to hostAndPort such that it contains the new + * RTT value. + * 3. Installs the cloned ServerDescription into the TopologyDescription from step 1 + * 4. Installs the cloned TopologyDescription as the current one. + */ + void onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt); /** * Get the current TopologyDescription. This is safe to call from multiple threads. @@ -65,10 +80,15 @@ public: const TopologyDescriptionPtr getTopologyDescription() const; private: + void _publishTopologyDescriptionChanged( + const TopologyDescriptionPtr& oldTopologyDescription, + const TopologyDescriptionPtr& newTopologyDescription) const; + mutable mongo::Mutex _mutex = MONGO_MAKE_LATCH("TopologyManager"); const SdamConfiguration _config; ClockSource* _clockSource; - std::shared_ptr<TopologyDescription> _topologyDescription; - std::unique_ptr<TopologyStateMachine> _topologyStateMachine; + TopologyDescriptionPtr _topologyDescription; + TopologyStateMachinePtr _topologyStateMachine; + TopologyEventsPublisherPtr _topologyEventsPublisher; }; } // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_state_machine.h b/src/mongo/client/sdam/topology_state_machine.h index abed9bc854f..fcb5b7e0c99 100644 --- a/src/mongo/client/sdam/topology_state_machine.h +++ b/src/mongo/client/sdam/topology_state_machine.h @@ -101,4 +101,5 @@ private: static inline auto kLogPrefix = "sdam : "; }; +using TopologyStateMachinePtr = std::unique_ptr<TopologyStateMachine>; } // namespace mongo::sdam |