summaryrefslogtreecommitdiff
path: root/src/mongo/client/sdam
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/sdam')
-rw-r--r--src/mongo/client/sdam/SConscript14
-rw-r--r--src/mongo/client/sdam/sdam.h37
-rw-r--r--src/mongo/client/sdam/sdam_configuration.cpp95
-rw-r--r--src/mongo/client/sdam/sdam_configuration.h100
-rw-r--r--src/mongo/client/sdam/sdam_datatypes.h6
-rw-r--r--src/mongo/client/sdam/server_description.cpp27
-rw-r--r--src/mongo/client/sdam/server_description.h7
-rw-r--r--src/mongo/client/sdam/server_selector.cpp267
-rw-r--r--src/mongo/client/sdam/server_selector.h193
-rw-r--r--src/mongo/client/sdam/server_selector_test.cpp463
-rw-r--r--src/mongo/client/sdam/topology_description.cpp65
-rw-r--r--src/mongo/client/sdam/topology_description.h72
-rw-r--r--src/mongo/client/sdam/topology_description_test.cpp90
-rw-r--r--src/mongo/client/sdam/topology_listener.cpp163
-rw-r--r--src/mongo/client/sdam/topology_listener.h77
-rw-r--r--src/mongo/client/sdam/topology_manager.cpp68
-rw-r--r--src/mongo/client/sdam/topology_manager.h30
-rw-r--r--src/mongo/client/sdam/topology_state_machine.h1
18 files changed, 1611 insertions, 164 deletions
diff --git a/src/mongo/client/sdam/SConscript b/src/mongo/client/sdam/SConscript
index 0a10d22332f..6cc5c642ad6 100644
--- a/src/mongo/client/sdam/SConscript
+++ b/src/mongo/client/sdam/SConscript
@@ -7,16 +7,21 @@ env = env.Clone()
env.Library(
target='sdam',
source=[
+ 'sdam_configuration.cpp',
'sdam_datatypes.cpp',
'server_description.cpp',
'topology_description.cpp',
+ 'topology_listener.cpp',
'topology_state_machine.cpp',
'topology_manager.cpp',
+ 'server_selector.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/repl/optime',
'$BUILD_DIR/mongo/util/clock_sources',
+ '$BUILD_DIR/mongo/client/read_preference',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/db/wire_version',
'$BUILD_DIR/mongo/rpc/metadata',
],
@@ -68,6 +73,15 @@ env.CppUnitTest(
)
env.CppUnitTest(
+ target='server_selector_test',
+ source=['server_selector_test.cpp'],
+ LIBDEPS=[
+ 'sdam',
+ 'sdam_test',
+ ],
+)
+
+env.CppUnitTest(
target='topology_state_machine_test',
source=['topology_state_machine_test.cpp'],
LIBDEPS=['sdam', 'sdam_test'],
diff --git a/src/mongo/client/sdam/sdam.h b/src/mongo/client/sdam/sdam.h
new file mode 100644
index 00000000000..16645e2418a
--- /dev/null
+++ b/src/mongo/client/sdam/sdam.h
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/client/sdam/sdam.h"
+#include "mongo/client/sdam/sdam_datatypes.h"
+#include "mongo/client/sdam/server_description.h"
+#include "mongo/client/sdam/server_selector.h"
+#include "mongo/client/sdam/topology_description.h"
+#include "mongo/client/sdam/topology_listener.h"
+#include "mongo/client/sdam/topology_manager.h"
diff --git a/src/mongo/client/sdam/sdam_configuration.cpp b/src/mongo/client/sdam/sdam_configuration.cpp
new file mode 100644
index 00000000000..d9852ddae94
--- /dev/null
+++ b/src/mongo/client/sdam/sdam_configuration.cpp
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "sdam_configuration.h"
+
+namespace mongo::sdam {
+SdamConfiguration::SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList,
+ TopologyType initialType,
+ mongo::Milliseconds heartBeatFrequencyMs,
+ boost::optional<std::string> setName)
+ : _seedList(seedList),
+ _initialType(initialType),
+ _heartBeatFrequencyMs(heartBeatFrequencyMs),
+ _setName(setName) {
+ uassert(ErrorCodes::InvalidSeedList,
+ "seed list size must be >= 1",
+ !seedList || (*seedList).size() >= 1);
+
+ uassert(ErrorCodes::InvalidSeedList,
+ "TopologyType Single must have exactly one entry in the seed list.",
+ _initialType != TopologyType::kSingle || (*seedList).size() == 1);
+
+ uassert(
+ ErrorCodes::InvalidTopologyType,
+ "Only ToplogyTypes ReplicaSetNoPrimary and Single are allowed when a setName is provided.",
+ !_setName ||
+ (_initialType == TopologyType::kReplicaSetNoPrimary ||
+ _initialType == TopologyType::kSingle));
+
+ uassert(ErrorCodes::TopologySetNameRequired,
+ "setName is required for ReplicaSetNoPrimary",
+ _initialType != TopologyType::kReplicaSetNoPrimary || _setName);
+
+ uassert(ErrorCodes::InvalidHeartBeatFrequency,
+ "topology heartbeat must be >= 500ms",
+ _heartBeatFrequencyMs >= kMinHeartbeatFrequencyMS);
+}
+
+const boost::optional<std::vector<ServerAddress>>& SdamConfiguration::getSeedList() const {
+ return _seedList;
+}
+
+TopologyType SdamConfiguration::getInitialType() const {
+ return _initialType;
+}
+
+Milliseconds SdamConfiguration::getHeartBeatFrequency() const {
+ return _heartBeatFrequencyMs;
+}
+
+const boost::optional<std::string>& SdamConfiguration::getSetName() const {
+ return _setName;
+}
+
+
+ServerSelectionConfiguration::ServerSelectionConfiguration(
+ const Milliseconds localThresholdMs, const Milliseconds serverSelectionTimeoutMs)
+ : _localThresholdMs(localThresholdMs), _serverSelectionTimeoutMs(serverSelectionTimeoutMs) {}
+
+Milliseconds ServerSelectionConfiguration::getLocalThresholdMs() const {
+ return _localThresholdMs;
+}
+
+Milliseconds ServerSelectionConfiguration::getServerSelectionTimeoutMs() const {
+ return _serverSelectionTimeoutMs;
+}
+Milliseconds ServerSelectionConfiguration::getHeartBeatFrequencyMs() const {
+ return _heartBeatFrequencyMs;
+}
+}; // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/sdam_configuration.h b/src/mongo/client/sdam/sdam_configuration.h
new file mode 100644
index 00000000000..895e52f4d87
--- /dev/null
+++ b/src/mongo/client/sdam/sdam_configuration.h
@@ -0,0 +1,100 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/client/sdam/sdam_datatypes.h"
+
+namespace mongo::sdam {
+class SdamConfiguration {
+public:
+ SdamConfiguration() : SdamConfiguration(boost::none){};
+
+ /**
+ * Initialize the TopologyDescription. This constructor may uassert if the provided
+ * configuration options are not valid according to the Server Discovery & Monitoring Spec.
+ *
+ * Initial Servers
+ * initial servers may be set to a seed list of one or more server addresses.
+ *
+ * Initial TopologyType
+ * The initial TopologyType may be set to Single, Unknown, or ReplicaSetNoPrimary.
+ *
+ * Initial setName
+ * The client's initial replica set name is required in order to initially configure the
+ * topology type as ReplicaSetNoPrimary.
+ *
+ * Allowed configuration combinations
+ * TopologyType Single cannot be used with multiple seeds.
+ * If setName is not null, only TopologyType ReplicaSetNoPrimary and Single, are
+ * allowed.
+ */
+ explicit SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList,
+ TopologyType initialType = TopologyType::kUnknown,
+ Milliseconds heartBeatFrequencyMs = kDefaultHeartbeatFrequencyMs,
+ boost::optional<std::string> setName = boost::none);
+
+ const boost::optional<std::vector<ServerAddress>>& getSeedList() const;
+ TopologyType getInitialType() const;
+ Milliseconds getHeartBeatFrequency() const;
+ const boost::optional<std::string>& getSetName() const;
+
+ static constexpr Milliseconds kDefaultHeartbeatFrequencyMs = Seconds(10);
+ static constexpr Milliseconds kMinHeartbeatFrequencyMS = Milliseconds(500);
+ static constexpr Milliseconds kDefaultConnectTimeoutMS = Milliseconds(100);
+
+private:
+ boost::optional<std::vector<ServerAddress>> _seedList;
+ TopologyType _initialType;
+ Milliseconds _heartBeatFrequencyMs;
+ boost::optional<std::string> _setName;
+};
+
+class ServerSelectionConfiguration {
+public:
+ explicit ServerSelectionConfiguration(const Milliseconds localThresholdMs,
+ const Milliseconds serverSelectionTimeoutMs);
+
+ Milliseconds getLocalThresholdMs() const;
+ Milliseconds getServerSelectionTimeoutMs() const;
+ Milliseconds getHeartBeatFrequencyMs() const;
+
+ static constexpr Milliseconds kDefaultLocalThresholdMS = Milliseconds(15);
+ static constexpr Milliseconds kDefaultServerSelectionTimeoutMs = Milliseconds(30000);
+
+ static ServerSelectionConfiguration defaultConfiguration() {
+ return ServerSelectionConfiguration{kDefaultLocalThresholdMS,
+ kDefaultServerSelectionTimeoutMs};
+ }
+
+private:
+ Milliseconds _localThresholdMs = kDefaultLocalThresholdMS;
+ Milliseconds _serverSelectionTimeoutMs = kDefaultServerSelectionTimeoutMs;
+ Milliseconds _heartBeatFrequencyMs = SdamConfiguration::kDefaultHeartbeatFrequencyMs;
+};
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h
index d3f7e7f3b50..520d517da87 100644
--- a/src/mongo/client/sdam/sdam_datatypes.h
+++ b/src/mongo/client/sdam/sdam_datatypes.h
@@ -126,4 +126,10 @@ using ServerDescriptionPtr = std::shared_ptr<ServerDescription>;
class TopologyDescription;
using TopologyDescriptionPtr = std::shared_ptr<TopologyDescription>;
+
+class TopologyManager;
+using TopologyManagerPtr = std::unique_ptr<TopologyManager>;
+
+class TopologyListener;
+using TopologyListenerPtr = std::shared_ptr<TopologyListener>;
}; // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/server_description.cpp b/src/mongo/client/sdam/server_description.cpp
index 1674840a5a6..42dd37d0cdc 100644
--- a/src/mongo/client/sdam/server_description.cpp
+++ b/src/mongo/client/sdam/server_description.cpp
@@ -26,9 +26,9 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/client/sdam/server_description.h"
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include <algorithm>
#include <boost/algorithm/string.hpp>
@@ -137,6 +137,14 @@ void ServerDescription::saveTags(BSONObj tagsObj) {
}
}
+void ServerDescription::appendBsonTags(BSONObjBuilder& builder) const {
+ for (const auto& pair : _tags) {
+ const auto& key = pair.first;
+ const auto& value = pair.second;
+ builder.append(key, value);
+ }
+}
+
void ServerDescription::saveElectionId(BSONElement electionId) {
if (electionId.type() == jstOID) {
_electionId = electionId.OID();
@@ -399,6 +407,11 @@ BSONObj ServerDescription::toBson() const {
bson.append("arbiters", _arbiters);
bson.append("passives", _passives);
+ if (getTags().size()) {
+ BSONObjBuilder tagsBuilder(bson.subobjStart("tags"));
+ appendBsonTags(tagsBuilder);
+ }
+
return bson.obj();
}
@@ -414,6 +427,18 @@ std::string ServerDescription::toString() const {
return toBson().toString();
}
+ServerDescriptionPtr ServerDescription::cloneWithRTT(IsMasterRTT rtt) {
+ auto newServerDescription = std::make_shared<ServerDescription>(*this);
+ newServerDescription->_rtt = rtt;
+ return newServerDescription;
+}
+
+const boost::optional<TopologyDescriptionPtr> ServerDescription::getTopologyDescription() {
+ return (_topologyDescription)
+ ? boost::optional<TopologyDescriptionPtr>(_topologyDescription->lock())
+ : boost::none;
+}
+
bool operator==(const mongo::sdam::ServerDescription& a, const mongo::sdam::ServerDescription& b) {
return a.isEquivalent(b);
diff --git a/src/mongo/client/sdam/server_description.h b/src/mongo/client/sdam/server_description.h
index 6fe02a5a9d8..7d95331fa0e 100644
--- a/src/mongo/client/sdam/server_description.h
+++ b/src/mongo/client/sdam/server_description.h
@@ -78,6 +78,7 @@ public:
const boost::optional<ServerAddress>& getMe() const;
const boost::optional<std::string>& getSetName() const;
const std::map<std::string, std::string>& getTags() const;
+ void appendBsonTags(BSONObjBuilder& builder) const;
// network attributes
const boost::optional<std::string>& getError() const;
@@ -104,9 +105,11 @@ public:
const boost::optional<int>& getSetVersion() const;
const boost::optional<OID>& getElectionId() const;
const boost::optional<TopologyVersion>& getTopologyVersion() const;
+ const boost::optional<TopologyDescriptionPtr> getTopologyDescription();
BSONObj toBson() const;
std::string toString() const;
+ ServerDescriptionPtr cloneWithRTT(IsMasterRTT rtt);
private:
/**
@@ -202,6 +205,10 @@ private:
// pool for server. Incremented on network error or timeout.
int _poolResetCounter = 0;
+ // The topology description of that we are a part of
+ boost::optional<std::weak_ptr<TopologyDescription>> _topologyDescription;
+
+ friend class TopologyDescription;
friend class ServerDescriptionBuilder;
};
diff --git a/src/mongo/client/sdam/server_selector.cpp b/src/mongo/client/sdam/server_selector.cpp
new file mode 100644
index 00000000000..31bacb1b43e
--- /dev/null
+++ b/src/mongo/client/sdam/server_selector.cpp
@@ -0,0 +1,267 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "server_selector.h"
+
+#include <algorithm>
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+#include "mongo/client/sdam/topology_description.h"
+#include "mongo/platform/random.h"
+#include "mongo/util/log.h"
+
+namespace mongo::sdam {
+ServerSelector::~ServerSelector() {}
+
+SdamServerSelector::SdamServerSelector(const ServerSelectionConfiguration& config)
+ : _config(config), _random(PseudoRandom(SecureRandom().nextInt64())) {}
+
+void SdamServerSelector::_getCandidateServers(std::vector<ServerDescriptionPtr>* result,
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria) {
+ // when querying the primary we don't need to consider tags
+ bool shouldTagFilter = true;
+
+ // TODO: check to see if we want to enforce minOpTime at all since
+ // it was effectively optional in the original implementation.
+ // TODO: the old version of the RSM does this, and many of
+ // the tests seem to rely on this behavior for correctness.
+ if (!criteria.minOpTime.isNull()) {
+ auto eligibleServers = topologyDescription->findServers([](const ServerDescriptionPtr& s) {
+ return (s->getType() == ServerType::kRSPrimary ||
+ s->getType() == ServerType::kRSSecondary);
+ });
+
+ auto beginIt = eligibleServers.begin();
+ auto endIt = eligibleServers.end();
+ auto maxIt = std::max_element(beginIt,
+ endIt,
+ [topologyDescription](const ServerDescriptionPtr& left,
+ const ServerDescriptionPtr& right) {
+ return left->getOpTime() < right->getOpTime();
+ });
+ if (maxIt != endIt) {
+ auto maxOpTime = (*maxIt)->getOpTime();
+ if (maxOpTime && maxOpTime < criteria.minOpTime) {
+ // ignore minOpTime
+ const_cast<ReadPreferenceSetting&>(criteria) = ReadPreferenceSetting(criteria.pref);
+ log() << "ignoring minOpTime for " << criteria.toString();
+ }
+ }
+ }
+
+ switch (criteria.pref) {
+ case ReadPreference::Nearest:
+ *result = topologyDescription->findServers(nearestFilter(criteria));
+ break;
+
+ case ReadPreference::SecondaryOnly:
+ *result = topologyDescription->findServers(secondaryFilter(criteria));
+ break;
+
+ case ReadPreference::PrimaryOnly: {
+ const auto primaryCriteria = ReadPreferenceSetting(criteria.pref);
+ *result = topologyDescription->findServers(primaryFilter(primaryCriteria));
+ shouldTagFilter = false;
+ break;
+ }
+
+ case ReadPreference::PrimaryPreferred: {
+ // ignore tags and max staleness for primary query
+ auto primaryCriteria = ReadPreferenceSetting(ReadPreference::PrimaryOnly);
+ _getCandidateServers(result, topologyDescription, primaryCriteria);
+ if (result->size()) {
+ shouldTagFilter = false;
+ break;
+ }
+
+ // keep tags and maxStaleness for secondary query
+ auto secondaryCriteria = criteria;
+ secondaryCriteria.pref = ReadPreference::SecondaryOnly;
+ _getCandidateServers(result, topologyDescription, secondaryCriteria);
+ break;
+ }
+
+ case ReadPreference::SecondaryPreferred: {
+ // keep tags and maxStaleness for secondary query
+ auto secondaryCriteria = criteria;
+ secondaryCriteria.pref = ReadPreference::SecondaryOnly;
+ _getCandidateServers(result, topologyDescription, secondaryCriteria);
+ if (result->size()) {
+ break;
+ }
+
+ // ignore tags and maxStaleness for primary query
+ shouldTagFilter = false;
+ auto primaryCriteria = ReadPreferenceSetting(ReadPreference::PrimaryOnly);
+ _getCandidateServers(result, topologyDescription, primaryCriteria);
+ break;
+ }
+
+ default:
+ MONGO_UNREACHABLE
+ }
+
+ if (shouldTagFilter) {
+ filterTags(result, criteria.tags);
+ }
+}
+
+boost::optional<std::vector<ServerDescriptionPtr>> SdamServerSelector::selectServers(
+ const TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) {
+
+ // If the topology wire version is invalid, raise an error
+ if (!topologyDescription->isWireVersionCompatible()) {
+ uasserted(ErrorCodes::IncompatibleServerVersion,
+ *topologyDescription->getWireVersionCompatibleError());
+ }
+
+ if (topologyDescription->getType() == TopologyType::kUnknown) {
+ return boost::none;
+ }
+
+ if (topologyDescription->getType() == TopologyType::kSingle) {
+ auto servers = topologyDescription->getServers();
+ return (servers.size() && servers[0]->getType() != ServerType::kUnknown)
+ ? boost::optional<std::vector<ServerDescriptionPtr>>{{servers[0]}}
+ : boost::none;
+ }
+
+ std::vector<ServerDescriptionPtr> results;
+ _getCandidateServers(&results, topologyDescription, criteria);
+
+ if (results.size()) {
+ ServerDescriptionPtr minServer =
+ *std::min_element(results.begin(), results.end(), LatencyWindow::rttCompareFn);
+
+ invariant(minServer->getRtt());
+ auto latencyWindow = LatencyWindow(*minServer->getRtt(), _config.getLocalThresholdMs());
+ latencyWindow.filterServers(&results);
+
+ // latency window should always leave at least one result
+ invariant(results.size());
+
+ return results;
+ }
+
+ return boost::none;
+}
+
+ServerDescriptionPtr SdamServerSelector::_randomSelect(
+ const std::vector<ServerDescriptionPtr>& servers) const {
+ return servers[_random.nextInt64(servers.size())];
+}
+
+boost::optional<ServerDescriptionPtr> SdamServerSelector::selectServer(
+ const TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) {
+ auto servers = selectServers(topologyDescription, criteria);
+ return servers ? boost::optional<ServerDescriptionPtr>(_randomSelect(*servers)) : boost::none;
+}
+
+bool SdamServerSelector::_containsAllTags(ServerDescriptionPtr server, const BSONObj& tags) {
+ auto serverTags = server->getTags();
+ for (auto& checkTag : tags) {
+ auto checkKey = checkTag.fieldName();
+ auto checkValue = checkTag.String();
+ auto pos = serverTags.find(checkKey);
+ if (pos == serverTags.end() || pos->second != checkValue) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void SdamServerSelector::filterTags(std::vector<ServerDescriptionPtr>* servers,
+ const TagSet& tagSet) {
+ const auto& checkTags = tagSet.getTagBSON();
+
+ if (checkTags.nFields() == 0)
+ return;
+
+ const auto predicate = [&](const ServerDescriptionPtr& s) {
+ auto it = checkTags.begin();
+ while (it != checkTags.end()) {
+ if (it->isABSONObj()) {
+ const BSONObj& tags = it->Obj();
+ if (_containsAllTags(s, tags)) {
+ // found a match -- don't remove the server
+ return false;
+ }
+ } else {
+ log() << "invalid tags specified for server selection; tags should be specified as "
+ "a bson Obj: "
+ << it->toString();
+ }
+ ++it;
+ }
+
+ // remove the server
+ return true;
+ };
+
+ servers->erase(std::remove_if(servers->begin(), servers->end(), predicate), servers->end());
+}
+
+bool SdamServerSelector::recencyFilter(const ReadPreferenceSetting& readPref,
+ const ServerDescriptionPtr& s) {
+ bool result = true;
+
+ // TODO: check to see if we want to enforce minOpTime at all since
+ // it was effectively optional in the original implementation.
+ if (!readPref.minOpTime.isNull()) {
+ result = result && (s->getOpTime() >= readPref.minOpTime);
+ }
+
+ if (readPref.maxStalenessSeconds.count()) {
+ auto topologyDescription = s->getTopologyDescription();
+ invariant(topologyDescription);
+ auto staleness = _calculateStaleness(*topologyDescription, s);
+ result = result && (staleness <= readPref.maxStalenessSeconds);
+ }
+
+ return result;
+}
+
+
+void LatencyWindow::filterServers(std::vector<ServerDescriptionPtr>* servers) {
+ servers->erase(std::remove_if(servers->begin(),
+ servers->end(),
+ [&](const ServerDescriptionPtr& s) {
+ // Servers that have made it to this stage are not ServerType
+ // == kUnknown, so they must have an associated latency.
+ invariant(s->getType() != ServerType::kUnknown);
+ invariant(s->getRtt());
+ return !this->isWithinWindow(*s->getRtt());
+ }),
+ servers->end());
+}
+
+bool LatencyWindow::isWithinWindow(IsMasterRTT latency) {
+ return lower <= latency && latency <= upper;
+}
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/server_selector.h b/src/mongo/client/sdam/server_selector.h
new file mode 100644
index 00000000000..9cb676bd3b6
--- /dev/null
+++ b/src/mongo/client/sdam/server_selector.h
@@ -0,0 +1,193 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+#include <functional>
+#include <vector>
+
+#include "mongo/client/read_preference.h"
+#include "mongo/client/sdam/sdam_configuration.h"
+#include "mongo/client/sdam/sdam_datatypes.h"
+#include "mongo/client/sdam/server_description.h"
+#include "mongo/client/sdam/topology_description.h"
+#include "mongo/platform/random.h"
+
+namespace mongo::sdam {
+/**
+ * This is the interface that allows one to select a server to satisfy a DB operation given a
+ * TopologyDescription and a ReadPreferenceSetting.
+ */
+class ServerSelector {
+public:
+ /**
+ * Finds a list of candidate servers according to the ReadPreferenceSetting.
+ */
+ virtual boost::optional<std::vector<ServerDescriptionPtr>> selectServers(
+ TopologyDescriptionPtr topologyDescription, const ReadPreferenceSetting& criteria) = 0;
+
+ /**
+ * Select a single server according to the ReadPreference and latency of the
+ * ServerDescription(s). The server is selected randomly from those that match the criteria.
+ */
+ virtual boost::optional<ServerDescriptionPtr> selectServer(
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria) = 0;
+
+ virtual ~ServerSelector();
+};
+using ServerSelectorPtr = std::unique_ptr<ServerSelector>;
+
+class SdamServerSelector : public ServerSelector {
+public:
+ explicit SdamServerSelector(const ServerSelectionConfiguration& config);
+
+ boost::optional<std::vector<ServerDescriptionPtr>> selectServers(
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria) override;
+
+ boost::optional<ServerDescriptionPtr> selectServer(
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria) override;
+
+ // remove servers that do not match the TagSet
+ void filterTags(std::vector<ServerDescriptionPtr>* servers, const TagSet& tagSet);
+
+private:
+ void _getCandidateServers(std::vector<ServerDescriptionPtr>* result,
+ const TopologyDescriptionPtr topologyDescription,
+ const ReadPreferenceSetting& criteria);
+
+ bool _containsAllTags(ServerDescriptionPtr server, const BSONObj& tags);
+
+ ServerDescriptionPtr _randomSelect(const std::vector<ServerDescriptionPtr>& servers) const;
+
+ // staleness for a ServerDescription is defined here:
+ // https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#maxstalenessseconds
+ Milliseconds _calculateStaleness(const TopologyDescriptionPtr& topologyDescription,
+ const ServerDescriptionPtr& serverDescription) {
+ if (serverDescription->getType() != ServerType::kRSSecondary)
+ return Milliseconds(0);
+
+ const Date_t& lastWriteDate = serverDescription->getLastWriteDate()
+ ? *serverDescription->getLastWriteDate()
+ : Date_t::min();
+
+ if (topologyDescription->getType() == TopologyType::kReplicaSetWithPrimary) {
+ // (S.lastUpdateTime - S.lastWriteDate) - (P.lastUpdateTime - P.lastWriteDate) +
+ // heartbeatFrequencyMS
+
+ // topologyType == kReplicaSetWithPrimary implies the validity of the primary server
+ // description.
+ invariant(topologyDescription->getPrimary());
+ const auto& primaryDescription = *topologyDescription->getPrimary();
+
+ const auto& primaryLastWriteDate = primaryDescription->getLastWriteDate()
+ ? *primaryDescription->getLastWriteDate()
+ : Date_t::min();
+
+ auto result = (serverDescription->getLastUpdateTime() - lastWriteDate) -
+ (primaryDescription->getLastUpdateTime() - primaryLastWriteDate) +
+ _config.getHeartBeatFrequencyMs();
+ return duration_cast<Milliseconds>(result);
+ } else if (topologyDescription->getType() == TopologyType::kReplicaSetNoPrimary) {
+ // SMax.lastWriteDate - S.lastWriteDate + heartbeatFrequencyMS
+ Date_t maxLastWriteDate = Date_t::min();
+
+ // identify secondary with max last write date.
+ for (const auto& s : topologyDescription->getServers()) {
+ if (s->getType() != ServerType::kRSSecondary)
+ continue;
+
+ const auto& sLastWriteDate =
+ s->getLastWriteDate() ? *s->getLastWriteDate() : Date_t::min();
+
+ if (sLastWriteDate > maxLastWriteDate) {
+ maxLastWriteDate = sLastWriteDate;
+ }
+ }
+
+ auto result = (maxLastWriteDate - lastWriteDate) + _config.getHeartBeatFrequencyMs();
+ return duration_cast<Milliseconds>(result);
+ } else {
+ // Not a replica set
+ return Milliseconds(0);
+ }
+ }
+
+ bool recencyFilter(const ReadPreferenceSetting& readPref, const ServerDescriptionPtr& s);
+
+ // A SelectionFilter is a higher order function used to filter out servers from the current
+ // Topology. It's return value is used as input to the TopologyDescription::findServers
+ // function, and is a function that takes a ServerDescriptionPtr and returns a bool indicating
+ // whether to keep this server or not based on the ReadPreference, server type, and recency
+ // metrics of the server.
+ using SelectionFilter = unique_function<std::function<bool(const ServerDescriptionPtr&)>(
+ const ReadPreferenceSetting&)>;
+
+ const SelectionFilter secondaryFilter = [this](const ReadPreferenceSetting& readPref) {
+ return [&](const ServerDescriptionPtr& s) {
+ return (s->getType() == ServerType::kRSSecondary) && recencyFilter(readPref, s);
+ };
+ };
+
+ const SelectionFilter primaryFilter = [this](const ReadPreferenceSetting& readPref) {
+ return [&](const ServerDescriptionPtr& s) {
+ return (s->getType() == ServerType::kRSPrimary) && recencyFilter(readPref, s);
+ };
+ };
+
+ const SelectionFilter nearestFilter = [this](const ReadPreferenceSetting& readPref) {
+ return [&](const ServerDescriptionPtr& s) {
+ return (s->getType() == ServerType::kRSPrimary ||
+ s->getType() == ServerType::kRSSecondary) &&
+ recencyFilter(readPref, s);
+ };
+ };
+
+ ServerSelectionConfiguration _config;
+ mutable PseudoRandom _random;
+};
+
+// This is used to filter out servers based on their current latency measurements.
+struct LatencyWindow {
+ const IsMasterRTT lower;
+ const IsMasterRTT upper;
+
+ explicit LatencyWindow(const IsMasterRTT lowerBound, const IsMasterRTT windowWidth)
+ : lower(lowerBound), upper(lowerBound + windowWidth) {}
+
+ bool isWithinWindow(IsMasterRTT latency);
+
+ // remove servers not in the latency window in-place.
+ void filterServers(std::vector<ServerDescriptionPtr>* servers);
+
+ static bool rttCompareFn(const ServerDescriptionPtr& a, const ServerDescriptionPtr& b) {
+ return a->getRtt() < b->getRtt();
+ }
+};
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/server_selector_test.cpp b/src/mongo/client/sdam/server_selector_test.cpp
new file mode 100644
index 00000000000..233c00b32a5
--- /dev/null
+++ b/src/mongo/client/sdam/server_selector_test.cpp
@@ -0,0 +1,463 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/client/sdam/server_selector.h"
+
+#include <boost/optional/optional_io.hpp>
+
+#include "mongo/client/sdam/sdam_test_base.h"
+#include "mongo/client/sdam/server_description_builder.h"
+#include "mongo/client/sdam/topology_description.h"
+#include "mongo/client/sdam/topology_manager.h"
+#include "mongo/db/wire_version.h"
+#include "mongo/util/system_clock_source.h"
+
+namespace mongo::sdam {
+
+class ServerSelectorTestFixture : public SdamTestFixture {
+public:
+ static inline const auto clockSource = SystemClockSource::get();
+ static inline const auto sdamConfiguration = SdamConfiguration({{"s0"}});
+ static inline const auto selectionConfig =
+ ServerSelectionConfiguration(Milliseconds(10), Milliseconds(10));
+
+ static constexpr auto SET_NAME = "set";
+ static constexpr int NUM_ITERATIONS = 1000;
+
+ struct TagSets {
+ static inline const auto eastProduction = BSON("dc"
+ << "east"
+ << "usage"
+ << "production");
+ static inline const auto westProduction = BSON("dc"
+ << "west"
+ << "usage"
+ << "production");
+ static inline const auto northTest = BSON("dc"
+ << "north"
+ << "usage"
+ << "test");
+ static inline const auto northProduction = BSON("dc"
+ << "north"
+ << "usage"
+ << "production");
+ static inline const auto production = BSON("usage"
+ << "production");
+
+ static inline const auto test = BSON("usage"
+ << "test");
+
+ static inline const auto integration = BSON("usage"
+ << "integration");
+
+ static inline const auto primary = BSON("tag"
+ << "primary");
+ static inline const auto secondary = BSON("tag"
+ << "secondary");
+
+ static inline const auto emptySet = TagSet{BSONArray(BSONObj())};
+ static inline const auto eastOrWestProductionSet =
+ TagSet(BSON_ARRAY(eastProduction << westProduction));
+ static inline const auto westProductionSet = TagSet(BSON_ARRAY(westProduction));
+ static inline const auto productionSet = TagSet(BSON_ARRAY(production));
+ static inline const auto testSet = TagSet(BSON_ARRAY(test));
+ static inline const auto integrationOrTestSet = TagSet(BSON_ARRAY(integration << test));
+ static inline const auto integrationSet = TagSet(BSON_ARRAY(integration));
+
+ static inline const auto primarySet = TagSet(BSON_ARRAY(primary));
+ static inline const auto secondarySet = TagSet(BSON_ARRAY(secondary));
+ };
+
+ static ServerDescriptionPtr make_with_latency(IsMasterRTT latency,
+ ServerAddress address,
+ ServerType serverType = ServerType::kRSPrimary,
+ std::map<std::string, std::string> tags = {}) {
+ auto builder = ServerDescriptionBuilder()
+ .withType(serverType)
+ .withAddress(address)
+ .withSetName(SET_NAME)
+ .withRtt(latency)
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastUpdateTime(Date_t::now());
+
+ for (auto it = tags.begin(); it != tags.end(); ++it) {
+ builder.withTag(it->first, it->second);
+ }
+
+ return builder.instance();
+ }
+
+ static auto makeServerDescriptionList() {
+ return std::vector<ServerDescriptionPtr>{
+ make_with_latency(Milliseconds(1),
+ "s1",
+ ServerType::kRSSecondary,
+ {{"dc", "east"}, {"usage", "production"}}),
+ make_with_latency(Milliseconds(1),
+ "s1-test",
+ ServerType::kRSSecondary,
+ {{"dc", "east"}, {"usage", "test"}}),
+ make_with_latency(Milliseconds(1),
+ "s2",
+ ServerType::kRSSecondary,
+ {{"dc", "west"}, {"usage", "production"}}),
+ make_with_latency(Milliseconds(1),
+ "s2-test",
+ ServerType::kRSSecondary,
+ {{"dc", "west"}, {"usage", "test"}}),
+ make_with_latency(Milliseconds(1),
+ "s3",
+ ServerType::kRSSecondary,
+ {{"dc", "north"}, {"usage", "production"}})};
+ };
+
+ SdamServerSelector selector = SdamServerSelector(selectionConfig);
+};
+
+TEST_F(ServerSelectorTestFixture, ShouldFilterCorrectlyByLatencyWindow) {
+ const auto delta = Milliseconds(10);
+ const auto windowWidth = Milliseconds(100);
+ const auto lowerBound = Milliseconds(100);
+
+ auto window = LatencyWindow(lowerBound, windowWidth);
+
+ std::vector<ServerDescriptionPtr> servers = {
+ make_with_latency(window.lower - delta, "less"),
+ make_with_latency(window.lower, "boundary-lower"),
+ make_with_latency(window.lower + delta, "within"),
+ make_with_latency(window.upper, "boundary-upper"),
+ make_with_latency(window.upper + delta, "greater")};
+
+ window.filterServers(&servers);
+
+ ASSERT_EQ(3, servers.size());
+ ASSERT_EQ("boundary-lower", servers[0]->getAddress());
+ ASSERT_EQ("within", servers[1]->getAddress());
+ ASSERT_EQ("boundary-upper", servers[2]->getAddress());
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldThrowOnWireError) {
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+ auto oldServer = ServerDescriptionBuilder()
+ .withAddress(topologyDescription->getServers().back()->getAddress())
+ .withType(ServerType::kRSPrimary)
+ .withMaxWireVersion(WireVersion::RELEASE_2_4_AND_BEFORE)
+ .withMinWireVersion(WireVersion::RELEASE_2_4_AND_BEFORE)
+ .instance();
+ topologyDescription->installServerDescription(oldServer);
+
+ ASSERT(!topologyDescription->isWireVersionCompatible());
+ ASSERT_THROWS_CODE(selector.selectServers(topologyDescription, ReadPreferenceSetting()),
+ DBException,
+ ErrorCodes::IncompatibleServerVersion);
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldReturnNoneIfTopologyUnknown) {
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+ ASSERT_EQ(TopologyType::kUnknown, topologyDescription->getType());
+ ASSERT_EQ(boost::none, selector.selectServers(topologyDescription, ReadPreferenceSetting()));
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldSelectRandomlyWhenMultipleOptionsAreAvailable) {
+ TopologyStateMachine stateMachine(sdamConfiguration);
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+
+ const auto s0Latency = Milliseconds(1);
+ auto primary = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kRSPrimary)
+ .withRtt(s0Latency)
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withHost("s3")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, primary);
+
+ const auto s1Latency = Milliseconds((s0Latency + selectionConfig.getLocalThresholdMs()) / 2);
+ auto secondaryInLatencyWindow = make_with_latency(s1Latency, "s1", ServerType::kRSSecondary);
+ stateMachine.onServerDescription(*topologyDescription, secondaryInLatencyWindow);
+
+ // s2 is on the boundary of the latency window
+ const auto s2Latency = s0Latency + selectionConfig.getLocalThresholdMs();
+ auto secondaryOnBoundaryOfLatencyWindow =
+ make_with_latency(s2Latency, "s2", ServerType::kRSSecondary);
+ stateMachine.onServerDescription(*topologyDescription, secondaryOnBoundaryOfLatencyWindow);
+
+ // s3 should not be selected
+ const auto s3Latency = s2Latency + Milliseconds(10);
+ auto secondaryTooFar = make_with_latency(s3Latency, "s3", ServerType::kRSSecondary);
+ stateMachine.onServerDescription(*topologyDescription, secondaryTooFar);
+
+ std::map<ServerAddress, int> frequencyInfo{{"s0", 0}, {"s1", 0}, {"s2", 0}, {"s3", 0}};
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ auto server = selector.selectServer(topologyDescription,
+ ReadPreferenceSetting(ReadPreference::Nearest));
+ if (server) {
+ frequencyInfo[(*server)->getAddress()]++;
+ }
+ }
+
+ ASSERT(frequencyInfo["s0"]);
+ ASSERT(frequencyInfo["s1"]);
+ ASSERT(frequencyInfo["s2"]);
+ ASSERT_FALSE(frequencyInfo["s3"]);
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldFilterByLastWriteTime) {
+ TopologyStateMachine stateMachine(sdamConfiguration);
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+
+ const int MAX_STALENESS = 60;
+ const auto sixtySeconds = Seconds(MAX_STALENESS);
+ const auto now = Date_t::now();
+
+
+ const auto d0 = now - Milliseconds(1000);
+ const auto s0 = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kRSPrimary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s0);
+
+ const auto d1 = now - Milliseconds(1000 * 5);
+ const auto s1 = ServerDescriptionBuilder()
+ .withAddress("s1")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d1)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s1);
+
+ // d2 is stale, so s2 should not be selected.
+ const auto d2 = now - sixtySeconds - sixtySeconds;
+ const auto s2 = ServerDescriptionBuilder()
+ .withAddress("s2")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d2)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s2);
+
+ const auto readPref =
+ ReadPreferenceSetting(ReadPreference::Nearest, TagSets::emptySet, sixtySeconds);
+
+ std::map<ServerAddress, int> frequencyInfo{{"s0", 0}, {"s1", 0}, {"s2", 0}};
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ auto server = selector.selectServer(topologyDescription, readPref);
+
+ if (server) {
+ frequencyInfo[(*server)->getAddress()]++;
+ }
+ }
+
+ ASSERT(frequencyInfo["s0"]);
+ ASSERT(frequencyInfo["s1"]);
+ ASSERT_FALSE(frequencyInfo["s2"]);
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldSelectPreferredIfAvailable) {
+ TopologyStateMachine stateMachine(sdamConfiguration);
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+
+ const int MAX_STALENESS = 60;
+ const auto sixtySeconds = Seconds(MAX_STALENESS);
+ const auto now = Date_t::now();
+
+
+ const auto d0 = now - Milliseconds(1000);
+ const auto s0 = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kRSPrimary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .withTag("tag", "primary")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s0);
+
+ const auto s1 = ServerDescriptionBuilder()
+ .withAddress("s1")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .withTag("tag", "secondary")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s1);
+
+ const auto primaryPreferredTagSecondary =
+ ReadPreferenceSetting(ReadPreference::PrimaryPreferred, TagSets::secondarySet);
+ auto result1 = selector.selectServer(topologyDescription, primaryPreferredTagSecondary);
+ ASSERT(result1 != boost::none);
+ ASSERT_EQ("s0", (*result1)->getAddress());
+
+ const auto secondaryPreferredWithTag =
+ ReadPreferenceSetting(ReadPreference::SecondaryPreferred, TagSets::secondarySet);
+ auto result2 = selector.selectServer(topologyDescription, secondaryPreferredWithTag);
+ ASSERT(result2 != boost::none);
+ ASSERT_EQ("s1", (*result2)->getAddress());
+
+ const auto secondaryPreferredNoTag = ReadPreferenceSetting(ReadPreference::SecondaryPreferred);
+ auto result3 = selector.selectServer(topologyDescription, secondaryPreferredNoTag);
+ ASSERT(result3 != boost::none);
+ ASSERT_EQ("s1", (*result2)->getAddress());
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldSelectTaggedSecondaryIfPreferredPrimaryNotAvailable) {
+ TopologyStateMachine stateMachine(sdamConfiguration);
+ auto topologyDescription = std::make_shared<TopologyDescription>(sdamConfiguration);
+
+ const int MAX_STALENESS = 60;
+ const auto sixtySeconds = Seconds(MAX_STALENESS);
+ const auto now = Date_t::now();
+
+ const auto d0 = now - Milliseconds(1000);
+
+ const auto s0 = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kRSPrimary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .withTag("tag", "primary")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s0);
+
+ // old primary unavailable
+ const auto s0_failed = ServerDescriptionBuilder()
+ .withAddress("s0")
+ .withType(ServerType::kUnknown)
+ .withSetName("set")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s0_failed);
+
+ const auto s1 = ServerDescriptionBuilder()
+ .withAddress("s1")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .withTag("tag", "secondary")
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s1);
+
+ const auto s2 = ServerDescriptionBuilder()
+ .withAddress("s2")
+ .withType(ServerType::kRSSecondary)
+ .withRtt(selectionConfig.getLocalThresholdMs())
+ .withSetName("set")
+ .withHost("s0")
+ .withHost("s1")
+ .withHost("s2")
+ .withMinWireVersion(WireVersion::SUPPORTS_OP_MSG)
+ .withMaxWireVersion(WireVersion::LATEST_WIRE_VERSION)
+ .withLastWriteDate(d0)
+ .instance();
+ stateMachine.onServerDescription(*topologyDescription, s2);
+
+ const auto primaryPreferredTagSecondary =
+ ReadPreferenceSetting(ReadPreference::PrimaryPreferred, TagSets::secondarySet);
+ auto result1 = selector.selectServer(topologyDescription, primaryPreferredTagSecondary);
+ ASSERT(result1 != boost::none);
+ ASSERT_EQ("s1", (*result1)->getAddress());
+}
+
+TEST_F(ServerSelectorTestFixture, ShouldFilterByTags) {
+ auto tags = TagSets::productionSet;
+ auto servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(3, servers.size());
+
+ tags = TagSets::eastOrWestProductionSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(2, servers.size());
+
+ tags = TagSets::testSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(2, servers.size());
+
+ tags = TagSets::integrationOrTestSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(2, servers.size());
+
+ tags = TagSets::westProductionSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(1, servers.size());
+
+ tags = TagSets::integrationSet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(0, servers.size());
+
+ tags = TagSets::emptySet;
+ servers = makeServerDescriptionList();
+ selector.filterTags(&servers, tags);
+ ASSERT_EQ(makeServerDescriptionList().size(), servers.size());
+}
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_description.cpp b/src/mongo/client/sdam/topology_description.cpp
index d3783719864..f236f388e90 100644
--- a/src/mongo/client/sdam/topology_description.cpp
+++ b/src/mongo/client/sdam/topology_description.cpp
@@ -107,6 +107,9 @@ const boost::optional<ServerDescriptionPtr> TopologyDescription::findServerByAdd
boost::optional<ServerDescriptionPtr> TopologyDescription::installServerDescription(
const ServerDescriptionPtr& newServerDescription) {
+ LOG(2) << "(" << getSetName() << ") install ServerDescription "
+ << newServerDescription->toString();
+
boost::optional<ServerDescriptionPtr> previousDescription;
if (getType() == TopologyType::kSingle) {
// For Single, there is always one ServerDescription in TopologyDescription.servers;
@@ -131,6 +134,8 @@ boost::optional<ServerDescriptionPtr> TopologyDescription::installServerDescript
}
}
+ newServerDescription->_topologyDescription = shared_from_this();
+
checkWireCompatibilityVersions();
calculateLogicalSessionTimeout();
return previousDescription;
@@ -174,13 +179,12 @@ void TopologyDescription::checkWireCompatibilityVersions() {
break;
}
}
-
_compatibleError = (_compatible) ? boost::none : boost::make_optional(errorOss.str());
}
const std::string TopologyDescription::minimumRequiredMongoVersionString(int version) {
switch (version) {
- case RESUMABLE_INITIAL_SYNC:
+ case RESUMABLE_INITIAL_SYNC:
return "4.4";
case SHARDED_TRANSACTIONS:
return "4.2";
@@ -270,54 +274,15 @@ std::string TopologyDescription::toString() {
return toBSON().toString();
}
-////////////////////////
-// SdamConfiguration
-////////////////////////
-SdamConfiguration::SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList,
- TopologyType initialType,
- mongo::Milliseconds heartBeatFrequencyMs,
- boost::optional<std::string> setName)
- : _seedList(seedList),
- _initialType(initialType),
- _heartBeatFrequencyMs(heartBeatFrequencyMs),
- _setName(setName) {
- uassert(ErrorCodes::InvalidSeedList,
- "seed list size must be >= 1",
- !seedList || (*seedList).size() >= 1);
-
- uassert(ErrorCodes::InvalidSeedList,
- "TopologyType Single must have exactly one entry in the seed list.",
- _initialType != TopologyType::kSingle || (*seedList).size() == 1);
-
- uassert(
- ErrorCodes::InvalidTopologyType,
- "Only ToplogyTypes ReplicaSetNoPrimary and Single are allowed when a setName is provided.",
- !_setName ||
- (_initialType == TopologyType::kReplicaSetNoPrimary ||
- _initialType == TopologyType::kSingle));
-
- uassert(ErrorCodes::TopologySetNameRequired,
- "setName is required for ReplicaSetNoPrimary",
- _initialType != TopologyType::kReplicaSetNoPrimary || _setName);
-
- uassert(ErrorCodes::InvalidHeartBeatFrequency,
- "topology heartbeat must be >= 500ms",
- _heartBeatFrequencyMs >= kMinHeartbeatFrequencyMS);
-}
-const boost::optional<std::vector<ServerAddress>>& SdamConfiguration::getSeedList() const {
- return _seedList;
-}
-
-TopologyType SdamConfiguration::getInitialType() const {
- return _initialType;
-}
-
-Milliseconds SdamConfiguration::getHeartBeatFrequency() const {
- return _heartBeatFrequencyMs;
-}
+boost::optional<ServerDescriptionPtr> TopologyDescription::getPrimary() {
+ if (getType() != TopologyType::kReplicaSetWithPrimary) {
+ return boost::none;
+ }
-const boost::optional<std::string>& SdamConfiguration::getSetName() const {
- return _setName;
+ auto foundPrimaries = findServers(
+ [](const ServerDescriptionPtr& s) { return s->getType() == ServerType::kRSPrimary; });
+ invariant(foundPrimaries.size() == 1);
+ return foundPrimaries[0];
}
-}; // namespace mongo::sdam
+} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_description.h b/src/mongo/client/sdam/topology_description.h
index 5894ad0d20f..a0469eb118b 100644
--- a/src/mongo/client/sdam/topology_description.h
+++ b/src/mongo/client/sdam/topology_description.h
@@ -36,55 +36,13 @@
#include "mongo/bson/oid.h"
#include "mongo/client/read_preference.h"
+#include "mongo/client/sdam/sdam_configuration.h"
#include "mongo/client/sdam/sdam_datatypes.h"
#include "mongo/client/sdam/server_description.h"
#include "mongo/platform/basic.h"
namespace mongo::sdam {
-class SdamConfiguration {
-public:
- SdamConfiguration() : SdamConfiguration(boost::none){};
-
- /**
- * Initialize the TopologyDescription. This constructor may uassert if the provided
- * configuration options are not valid according to the Server Discovery & Monitoring Spec.
- *
- * Initial Servers
- * initial servers may be set to a seed list of one or more server addresses.
- *
- * Initial TopologyType
- * The initial TopologyType may be set to Single, Unknown, or ReplicaSetNoPrimary.
- *
- * Initial setName
- * The client's initial replica set name is required in order to initially configure the
- * topology type as ReplicaSetNoPrimary.
- *
- * Allowed configuration combinations
- * TopologyType Single cannot be used with multiple seeds.
- * If setName is not null, only TopologyType ReplicaSetNoPrimary and Single, are
- * allowed.
- */
- SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList,
- TopologyType initialType = TopologyType::kUnknown,
- mongo::Milliseconds heartBeatFrequencyMs = kDefaultHeartbeatFrequencyMs,
- boost::optional<std::string> setName = boost::none);
-
- const boost::optional<std::vector<ServerAddress>>& getSeedList() const;
- TopologyType getInitialType() const;
- Milliseconds getHeartBeatFrequency() const;
- const boost::optional<std::string>& getSetName() const;
-
- static inline const mongo::Milliseconds kDefaultHeartbeatFrequencyMs = mongo::Seconds(10);
- static inline const mongo::Milliseconds kMinHeartbeatFrequencyMS = mongo::Milliseconds(500);
-
-private:
- boost::optional<std::vector<ServerAddress>> _seedList;
- TopologyType _initialType;
- mongo::Milliseconds _heartBeatFrequencyMs;
- boost::optional<std::string> _setName;
-};
-
-class TopologyDescription {
+class TopologyDescription : public std::enable_shared_from_this<TopologyDescription> {
public:
TopologyDescription() : TopologyDescription(SdamConfiguration()) {}
TopologyDescription(const TopologyDescription& source) = default;
@@ -113,6 +71,7 @@ public:
bool containsServerAddress(const ServerAddress& address) const;
std::vector<ServerDescriptionPtr> findServers(
std::function<bool(const ServerDescriptionPtr&)> predicate) const;
+ boost::optional<ServerDescriptionPtr> getPrimary();
/**
* Adds the given ServerDescription or swaps it with an existing one
@@ -129,12 +88,29 @@ public:
std::string toString();
private:
+ friend bool operator==(const TopologyDescription& lhs, const TopologyDescription& rhs) {
+ return std::tie(lhs._setName,
+ lhs._type,
+ lhs._maxSetVersion,
+ lhs._maxElectionId,
+ lhs._servers,
+ lhs._compatible,
+ lhs._logicalSessionTimeoutMinutes) ==
+ std::tie(rhs._setName,
+ rhs._type,
+ rhs._maxSetVersion,
+ rhs._maxElectionId,
+ rhs._servers,
+ rhs._compatible,
+ rhs._logicalSessionTimeoutMinutes);
+ }
+
/**
* Checks if all server descriptions are compatible with this server's WireVersion. If an
- * incompatible description is found, we set the topologyDescription's _compatible flag to false
- * and store an error message in _compatibleError. A ServerDescription which is not Unknown is
- * incompatible if:
- * minWireVersion > serverMaxWireVersion, or maxWireVersion < serverMinWireVersion
+ * incompatible description is found, we set the topologyDescription's _compatible flag to
+ * false and store an error message in _compatibleError. A ServerDescription which is not
+ * Unknown is incompatible if: minWireVersion > serverMaxWireVersion, or maxWireVersion <
+ * serverMinWireVersion
*/
void checkWireCompatibilityVersions();
diff --git a/src/mongo/client/sdam/topology_description_test.cpp b/src/mongo/client/sdam/topology_description_test.cpp
index 9f7a6a2dbee..d43be9642b8 100644
--- a/src/mongo/client/sdam/topology_description_test.cpp
+++ b/src/mongo/client/sdam/topology_description_test.cpp
@@ -26,8 +26,6 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
-
#include "mongo/client/sdam/sdam_test_base.h"
#include "mongo/client/sdam/topology_description.h"
@@ -36,7 +34,6 @@
#include "mongo/client/sdam/server_description.h"
#include "mongo/client/sdam/server_description_builder.h"
#include "mongo/db/wire_version.h"
-#include "mongo/logv2/log.h"
#include "mongo/unittest/death_test.h"
namespace mongo {
@@ -107,9 +104,10 @@ TEST_F(TopologyDescriptionTestFixture, ShouldAllowTypeSingleWithASingleSeed) {
}
TEST_F(TopologyDescriptionTestFixture, DoesNotAllowMultipleSeedsWithSingle) {
- ASSERT_THROWS_CODE(TopologyDescription({kTwoServersNormalCase, TopologyType::kSingle}),
- DBException,
- ErrorCodes::InvalidSeedList);
+ ASSERT_THROWS_CODE(
+ TopologyDescription(SdamConfiguration(kTwoServersNormalCase, TopologyType::kSingle)),
+ DBException,
+ ErrorCodes::InvalidSeedList);
}
TEST_F(TopologyDescriptionTestFixture, ShouldSetTheReplicaSetName) {
@@ -121,10 +119,10 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetTheReplicaSetName) {
}
TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowSettingTheReplicaSetNameWithWrongType) {
- ASSERT_THROWS_CODE(
- TopologyDescription({kOneServer, TopologyType::kUnknown, mongo::Seconds(10), kSetName}),
- DBException,
- ErrorCodes::InvalidTopologyType);
+ ASSERT_THROWS_CODE(TopologyDescription(SdamConfiguration(
+ kOneServer, TopologyType::kUnknown, mongo::Seconds(10), kSetName)),
+ DBException,
+ ErrorCodes::InvalidTopologyType);
}
TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowTopologyTypeRSNoPrimaryWithoutSetName) {
@@ -146,9 +144,8 @@ TEST_F(TopologyDescriptionTestFixture, ShouldOnlyAllowSingleAndRsNoPrimaryWithSe
topologyTypes.end());
for (const auto topologyType : topologyTypes) {
- LOGV2(20217,
- "Check TopologyType {topologyType} with setName value.",
- "topologyType"_attr = toString(topologyType));
+ unittest::log() << "Check TopologyType " << toString(topologyType)
+ << " with setName value.";
ASSERT_THROWS_CODE(
SdamConfiguration(kOneServer, topologyType, mongo::Seconds(10), kSetName),
DBException,
@@ -180,7 +177,7 @@ TEST_F(TopologyDescriptionTestFixture,
ShouldSetWireCompatibilityErrorForMinWireVersionWhenMinWireVersionIsGreater) {
const auto outgoingMaxWireVersion = WireSpec::instance().outgoing.maxWireVersion;
const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10));
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto serverDescriptionMinVersion = ServerDescriptionBuilder()
.withAddress(kOneServer[0])
.withMe(kOneServer[0])
@@ -188,16 +185,16 @@ TEST_F(TopologyDescriptionTestFixture,
.withMinWireVersion(outgoingMaxWireVersion + 1)
.instance();
- ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
- topologyDescription.installServerDescription(serverDescriptionMinVersion);
- ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
+ ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
+ topologyDescription->installServerDescription(serverDescriptionMinVersion);
+ ASSERT_NOT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
}
TEST_F(TopologyDescriptionTestFixture,
ShouldSetWireCompatibilityErrorForMinWireVersionWhenMaxWireVersionIsLess) {
const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion;
const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10));
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto serverDescriptionMaxVersion = ServerDescriptionBuilder()
.withAddress(kOneServer[0])
.withMe(kOneServer[0])
@@ -205,31 +202,31 @@ TEST_F(TopologyDescriptionTestFixture,
.withMaxWireVersion(outgoingMinWireVersion - 1)
.instance();
- ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
- topologyDescription.installServerDescription(serverDescriptionMaxVersion);
- ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
+ ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
+ topologyDescription->installServerDescription(serverDescriptionMaxVersion);
+ ASSERT_NOT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
}
TEST_F(TopologyDescriptionTestFixture, ShouldNotSetWireCompatibilityErrorWhenServerTypeIsUnknown) {
const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion;
const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10));
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto serverDescriptionMaxVersion =
ServerDescriptionBuilder().withMaxWireVersion(outgoingMinWireVersion - 1).instance();
- ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
- topologyDescription.installServerDescription(serverDescriptionMaxVersion);
- ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError());
+ ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
+ topologyDescription->installServerDescription(serverDescriptionMaxVersion);
+ ASSERT_EQUALS(boost::none, topologyDescription->getWireVersionCompatibleError());
}
TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToMinOfAllServerDescriptions) {
const auto config = SdamConfiguration(kThreeServers);
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto logicalSessionTimeouts = std::vector{300, 100, 200};
auto timeoutIt = logicalSessionTimeouts.begin();
const auto serverDescriptionsWithTimeouts = map<ServerDescriptionPtr, ServerDescriptionPtr>(
- topologyDescription.getServers(), [&timeoutIt](const ServerDescriptionPtr& description) {
+ topologyDescription->getServers(), [&timeoutIt](const ServerDescriptionPtr& description) {
auto newInstanceBuilder = ServerDescriptionBuilder()
.withType(ServerType::kRSSecondary)
.withAddress(description->getAddress())
@@ -240,26 +237,26 @@ TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToMinOfAllS
});
for (auto description : serverDescriptionsWithTimeouts) {
- topologyDescription.installServerDescription(description);
+ topologyDescription->installServerDescription(description);
}
int expectedLogicalSessionTimeout =
*std::min_element(logicalSessionTimeouts.begin(), logicalSessionTimeouts.end());
ASSERT_EQUALS(expectedLogicalSessionTimeout,
- topologyDescription.getLogicalSessionTimeoutMinutes());
+ topologyDescription->getLogicalSessionTimeoutMinutes());
}
TEST_F(TopologyDescriptionTestFixture,
ShouldSetLogicalSessionTimeoutToNoneIfAnyServerDescriptionHasNone) {
const auto config = SdamConfiguration(kThreeServers);
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
const auto logicalSessionTimeouts = std::vector{300, 100, 200};
auto timeoutIt = logicalSessionTimeouts.begin();
const auto serverDescriptionsWithTimeouts = map<ServerDescriptionPtr, ServerDescriptionPtr>(
- topologyDescription.getServers(), [&](const ServerDescriptionPtr& description) {
+ topologyDescription->getServers(), [&](const ServerDescriptionPtr& description) {
auto timeoutValue = (timeoutIt == logicalSessionTimeouts.begin())
? boost::none
: boost::make_optional(*timeoutIt);
@@ -275,19 +272,19 @@ TEST_F(TopologyDescriptionTestFixture,
});
for (auto description : serverDescriptionsWithTimeouts) {
- topologyDescription.installServerDescription(description);
+ topologyDescription->installServerDescription(description);
}
- ASSERT_EQUALS(boost::none, topologyDescription.getLogicalSessionTimeoutMinutes());
+ ASSERT_EQUALS(boost::none, topologyDescription->getLogicalSessionTimeoutMinutes());
}
TEST_F(TopologyDescriptionTestFixture, ShouldUpdateTopologyVersionOnSuccess) {
const auto config = SdamConfiguration(kThreeServers);
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
// Deafult topologyVersion is null
- ASSERT_EQUALS(topologyDescription.getServers().size(), 3);
- auto serverDescription = topologyDescription.getServers()[1];
+ ASSERT_EQUALS(topologyDescription->getServers().size(), 3);
+ auto serverDescription = topologyDescription->getServers()[1];
ASSERT(serverDescription->getTopologyVersion() == boost::none);
// Create new serverDescription with topologyVersion, topologyDescription should have the new
@@ -300,20 +297,19 @@ TEST_F(TopologyDescriptionTestFixture, ShouldUpdateTopologyVersionOnSuccess) {
.withTopologyVersion(TopologyVersion(processId, 1))
.instance();
- topologyDescription.installServerDescription(newDescription);
- ASSERT_EQUALS(topologyDescription.getServers().size(), 3);
- auto topologyVersion = topologyDescription.getServers()[1]->getTopologyVersion();
- ASSERT(topologyVersion->getProcessId() == processId);
- ASSERT(topologyVersion->getCounter() == 1);
+ topologyDescription->installServerDescription(newDescription);
+ ASSERT_EQUALS(topologyDescription->getServers().size(), 3);
+ auto topologyVersion = topologyDescription->getServers()[1]->getTopologyVersion();
+ ASSERT(topologyVersion == TopologyVersion(processId, 1));
}
TEST_F(TopologyDescriptionTestFixture, ShouldNotUpdateTopologyVersionOnError) {
const auto config = SdamConfiguration(kThreeServers);
- TopologyDescription topologyDescription(config);
+ const auto topologyDescription = std::make_shared<TopologyDescription>(config);
// Deafult topologyVersion is null
- ASSERT_EQUALS(topologyDescription.getServers().size(), 3);
- auto serverDescription = topologyDescription.getServers()[1];
+ ASSERT_EQUALS(topologyDescription->getServers().size(), 3);
+ auto serverDescription = topologyDescription->getServers()[1];
ASSERT(serverDescription->getTopologyVersion() == boost::none);
auto newDescription = ServerDescriptionBuilder()
@@ -321,9 +317,9 @@ TEST_F(TopologyDescriptionTestFixture, ShouldNotUpdateTopologyVersionOnError) {
.withError("error")
.instance();
- topologyDescription.installServerDescription(newDescription);
- ASSERT_EQUALS(topologyDescription.getServers().size(), 3);
- auto topologyVersion = topologyDescription.getServers()[1]->getTopologyVersion();
+ topologyDescription->installServerDescription(newDescription);
+ ASSERT_EQUALS(topologyDescription->getServers().size(), 3);
+ auto topologyVersion = topologyDescription->getServers()[1]->getTopologyVersion();
ASSERT(topologyVersion == boost::none);
}
}; // namespace sdam
diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp
new file mode 100644
index 00000000000..995685b71b9
--- /dev/null
+++ b/src/mongo/client/sdam/topology_listener.cpp
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/client/sdam/topology_listener.h"
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+#include "mongo/util/log.h"
+
+namespace mongo::sdam {
+void TopologyEventsPublisher::registerListener(TopologyListenerPtr listener) {
+ stdx::lock_guard lock(_mutex);
+ _listeners.push_back(listener);
+}
+
+void TopologyEventsPublisher::removeListener(TopologyListenerPtr listener) {
+ stdx::lock_guard lock(_mutex);
+ _listeners.erase(std::remove(_listeners.begin(), _listeners.end(), listener), _listeners.end());
+}
+
+void TopologyEventsPublisher::close() {
+ stdx::lock_guard lock(_mutex);
+ _listeners.clear();
+ _isClosed = true;
+}
+
+void TopologyEventsPublisher::onTopologyDescriptionChangedEvent(
+ UUID topologyId,
+ TopologyDescriptionPtr previousDescription,
+ TopologyDescriptionPtr newDescription) {
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::TOPOLOGY_DESCRIPTION_CHANGED;
+ event->topologyId = std::move(topologyId);
+ event->previousDescription = previousDescription;
+ event->newDescription = newDescription;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
+
+void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) {
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::HEARTBEAT_SUCCESS;
+ event->duration = duration_cast<IsMasterRTT>(durationMs);
+ event->hostAndPort = hostAndPort;
+ event->reply = reply;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
+
+void TopologyEventsPublisher::onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
+ Status errorStatus,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) {
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::HEARTBEAT_FAILURE;
+ event->duration = duration_cast<IsMasterRTT>(durationMs);
+ event->hostAndPort = hostAndPort;
+ event->reply = reply;
+ event->status = errorStatus;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
+
+void TopologyEventsPublisher::_scheduleNextDelivery() {
+ // run nextDelivery async
+ _executor->schedule(
+ [self = shared_from_this()](const Status& status) { self->_nextDelivery(); });
+}
+
+void TopologyEventsPublisher::onServerPingFailedEvent(const ServerAddress& hostAndPort,
+ const Status& status) {}
+
+void TopologyEventsPublisher::onServerPingSucceededEvent(IsMasterRTT durationMS,
+ const ServerAddress& hostAndPort) {}
+
+// TODO: this could be done in batches if this is a bottleneck.
+void TopologyEventsPublisher::_nextDelivery() {
+ // get the next event to send
+ EventPtr nextEvent;
+ {
+ stdx::lock_guard lock(_eventQueueMutex);
+ if (!_eventQueue.size()) {
+ return;
+ }
+ nextEvent = std::move(_eventQueue.front());
+ _eventQueue.pop_front();
+ }
+
+ // release the lock before sending to avoid deadlock in the case there
+ // are events generated by sending the current one.
+ std::vector<TopologyListenerPtr> listeners;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (_isClosed) {
+ return;
+ }
+ listeners = _listeners;
+ }
+
+ // send to the listeners outside of the lock.
+ for (auto listener : listeners) {
+ _sendEvent(listener, *nextEvent);
+ }
+}
+
+void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Event& event) {
+ switch (event.type) {
+ case EventType::HEARTBEAT_SUCCESS:
+ listener->onServerHeartbeatSucceededEvent(
+ duration_cast<IsMasterRTT>(event.duration), event.hostAndPort, event.reply);
+ break;
+ case EventType::HEARTBEAT_FAILURE:
+ listener->onServerHeartbeatFailureEvent(duration_cast<IsMasterRTT>(event.duration),
+ event.status,
+ event.hostAndPort,
+ event.reply);
+ break;
+ case EventType::TOPOLOGY_DESCRIPTION_CHANGED:
+ // TODO: fix uuid or just remove
+ listener->onTopologyDescriptionChangedEvent(
+ UUID::gen(), event.previousDescription, event.newDescription);
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+}; // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h
index 0cf38ddf68d..dde1ae3b683 100644
--- a/src/mongo/client/sdam/topology_listener.h
+++ b/src/mongo/client/sdam/topology_listener.h
@@ -26,10 +26,13 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-
#pragma once
+#include <deque>
+#include <memory>
+#include <vector>
#include "mongo/client/sdam/sdam_datatypes.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/util/uuid.h"
namespace mongo::sdam {
@@ -49,13 +52,18 @@ public:
TopologyDescriptionPtr previousDescription,
TopologyDescriptionPtr newDescription){};
+ virtual void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
+ Status errorStatus,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply){};
/**
* Called when a ServerHeartBeatSucceededEvent is published - A heartbeat sent to the server at
* hostAndPort succeeded. durationMS is the execution time of the event, including the time it
* took to send the message and recieve the reply from the server.
*/
virtual void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
- const ServerAddress& hostAndPort){};
+ const ServerAddress& hostAndPort,
+ const BSONObj reply){};
/*
* Called when a ServerPingFailedEvent is published - A monitoring ping to the server at
@@ -70,4 +78,69 @@ public:
virtual void onServerPingSucceededEvent(IsMasterRTT durationMS,
const ServerAddress& hostAndPort){};
};
+
+/**
+ * This class publishes TopologyListener events to a group of registered listeners.
+ *
+ * To publish an event to all registered listeners call the corresponding event function on the
+ * TopologyEventsPublisher instance.
+ */
+class TopologyEventsPublisher final : public TopologyListener,
+ public std::enable_shared_from_this<TopologyEventsPublisher> {
+public:
+ TopologyEventsPublisher(std::shared_ptr<executor::TaskExecutor> executor)
+ : _executor(executor){};
+ void registerListener(TopologyListenerPtr listener);
+ void removeListener(TopologyListenerPtr listener);
+ void close();
+
+ void onTopologyDescriptionChangedEvent(UUID topologyId,
+ TopologyDescriptionPtr previousDescription,
+ TopologyDescriptionPtr newDescription) override;
+ void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) override;
+ void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
+ Status errorStatus,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) override;
+ void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override;
+ void onServerPingSucceededEvent(IsMasterRTT durationMS,
+ const ServerAddress& hostAndPort) override;
+
+
+private:
+ enum class EventType {
+ HEARTBEAT_SUCCESS,
+ HEARTBEAT_FAILURE,
+ PING_SUCCESS,
+ PING_FAILURE,
+ TOPOLOGY_DESCRIPTION_CHANGED
+ };
+ struct Event {
+ EventType type;
+ ServerAddress hostAndPort;
+ IsMasterRTT duration;
+ BSONObj reply;
+ TopologyDescriptionPtr previousDescription;
+ TopologyDescriptionPtr newDescription;
+ boost::optional<UUID> topologyId;
+ Status status = Status::OK();
+ };
+ using EventPtr = std::unique_ptr<Event>;
+
+ void _sendEvent(TopologyListenerPtr listener, const TopologyEventsPublisher::Event& event);
+ void _nextDelivery();
+ void _scheduleNextDelivery();
+
+ // Lock acquisition order to avoid deadlock is _eventQueueMutex -> _mutex
+ Mutex _eventQueueMutex;
+ std::deque<EventPtr> _eventQueue;
+
+ Mutex _mutex;
+ bool _isClosed = false;
+ std::shared_ptr<executor::TaskExecutor> _executor;
+ std::vector<TopologyListenerPtr> _listeners;
+};
+using TopologyEventsPublisherPtr = std::shared_ptr<TopologyEventsPublisher>;
} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp
index 7897e6948f7..64d74e114d9 100644
--- a/src/mongo/client/sdam/topology_manager.cpp
+++ b/src/mongo/client/sdam/topology_manager.cpp
@@ -32,9 +32,9 @@
#include "mongo/client/sdam/topology_state_machine.h"
#include "mongo/logv2/log.h"
+#include "mongo/rpc/topology_version_gen.h"
namespace mongo::sdam {
-
namespace {
/* Compare topologyVersions to determine if the isMaster response's topologyVersion is stale
@@ -57,16 +57,19 @@ bool isStaleTopologyVersion(boost::optional<TopologyVersion> lastTopologyVersion
return false;
}
-
} // namespace
-TopologyManager::TopologyManager(SdamConfiguration config, ClockSource* clockSource)
+
+TopologyManager::TopologyManager(SdamConfiguration config,
+ ClockSource* clockSource,
+ TopologyEventsPublisherPtr eventsPublisher)
: _config(std::move(config)),
_clockSource(clockSource),
_topologyDescription(std::make_unique<TopologyDescription>(_config)),
- _topologyStateMachine(std::make_unique<TopologyStateMachine>(_config)) {}
+ _topologyStateMachine(std::make_unique<TopologyStateMachine>(_config)),
+ _topologyEventsPublisher(eventsPublisher) {}
-void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome) {
+bool TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome) {
stdx::lock_guard<mongo::Mutex> lock(_mutex);
boost::optional<IsMasterRTT> lastRTT;
@@ -85,11 +88,11 @@ void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome
if (isStaleTopologyVersion(lastTopologyVersion, newTopologyVersion)) {
LOGV2(
23930,
- "Ignoring this isMaster response because our topologyVersion: {lastTopologyVersion}is "
+ "Ignoring this isMaster response because our topologyVersion: {lastTopologyVersion} is "
"fresher than the provided topologyVersion: {newTopologyVersion}",
"lastTopologyVersion"_attr = lastTopologyVersion->toBSON(),
"newTopologyVersion"_attr = newTopologyVersion->toBSON());
- return;
+ return false;
}
boost::optional<int> poolResetCounter = lastPoolResetCounter;
@@ -98,17 +101,60 @@ void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome
poolResetCounter = ++lastPoolResetCounter.get();
}
- // newTopologyVersion will be null if the isMaster response did not provide one.
auto newServerDescription = std::make_shared<ServerDescription>(
_clockSource, isMasterOutcome, lastRTT, newTopologyVersion, poolResetCounter);
- auto newTopologyDescription = std::make_unique<TopologyDescription>(*_topologyDescription);
- _topologyStateMachine->onServerDescription(*newTopologyDescription, newServerDescription);
- _topologyDescription = std::move(newTopologyDescription);
+ auto oldTopologyDescription = _topologyDescription;
+ _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription);
+
+ // if we are equal to the old description, just install the new description without
+ // performing any actions on the state machine.
+ auto isEqualToOldServerDescription =
+ (lastServerDescription && (*lastServerDescription->get()) == *newServerDescription);
+ if (isEqualToOldServerDescription) {
+ _topologyDescription->installServerDescription(newServerDescription);
+ } else {
+ _topologyStateMachine->onServerDescription(*_topologyDescription, newServerDescription);
+ }
+
+ _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription);
+ return true;
}
const std::shared_ptr<TopologyDescription> TopologyManager::getTopologyDescription() const {
stdx::lock_guard<mongo::Mutex> lock(_mutex);
return _topologyDescription;
}
+
+void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt) {
+ stdx::lock_guard<mongo::Mutex> lock(_mutex);
+
+ auto oldServerDescription = _topologyDescription->findServerByAddress(hostAndPort);
+ if (oldServerDescription) {
+ auto newServerDescription = (*oldServerDescription)->cloneWithRTT(rtt);
+
+ auto oldTopologyDescription = _topologyDescription;
+ _topologyDescription = std::make_shared<TopologyDescription>(*_topologyDescription);
+ _topologyDescription->installServerDescription(newServerDescription);
+
+ _publishTopologyDescriptionChanged(oldTopologyDescription, _topologyDescription);
+
+ return;
+ }
+
+ // otherwise, the server was removed from the topology. Nothing to do.
+ LOGV2(433301,
+ str::stream() << "Not updating RTT. Server {server}" << hostAndPort
+ << " does not exist in ",
+ "server"_attr = hostAndPort,
+ "setName"_attr = getTopologyDescription()->getSetName());
+}
+
+void TopologyManager::_publishTopologyDescriptionChanged(
+ const TopologyDescriptionPtr& oldTopologyDescription,
+ const TopologyDescriptionPtr& newTopologyDescription) const {
+ if (_topologyEventsPublisher)
+ _topologyEventsPublisher->onTopologyDescriptionChangedEvent(
+ newTopologyDescription->getId(), oldTopologyDescription, newTopologyDescription);
+}
}; // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_manager.h b/src/mongo/client/sdam/topology_manager.h
index 292d48b6e8e..5ae64d94644 100644
--- a/src/mongo/client/sdam/topology_manager.h
+++ b/src/mongo/client/sdam/topology_manager.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2019-present MongoDB, Inc.
+ * Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -31,6 +31,7 @@
#include "mongo/client/sdam/sdam_datatypes.h"
#include "mongo/client/sdam/topology_description.h"
+#include "mongo/client/sdam/topology_listener.h"
#include "mongo/client/sdam/topology_state_machine.h"
namespace mongo::sdam {
@@ -44,7 +45,9 @@ class TopologyManager {
TopologyManager(const TopologyManager&) = delete;
public:
- TopologyManager(SdamConfiguration config, ClockSource* clockSource);
+ explicit TopologyManager(SdamConfiguration config,
+ ClockSource* clockSource,
+ TopologyEventsPublisherPtr eventsPublisher = nullptr);
/**
* This function atomically:
@@ -57,7 +60,19 @@ public:
* IsMasterOutcomes serially, as required by:
* https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#process-one-ismaster-outcome-at-a-time
*/
- void onServerDescription(const IsMasterOutcome& isMasterOutcome);
+ bool onServerDescription(const IsMasterOutcome& isMasterOutcome);
+
+
+ /**
+ * This function updates the RTT value for a server without executing any state machine actions.
+ * It atomically:
+ * 1. Clones the current TopologyDescription
+ * 2. Clones the ServerDescription corresponding to hostAndPort such that it contains the new
+ * RTT value.
+ * 3. Installs the cloned ServerDescription into the TopologyDescription from step 1
+ * 4. Installs the cloned TopologyDescription as the current one.
+ */
+ void onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT rtt);
/**
* Get the current TopologyDescription. This is safe to call from multiple threads.
@@ -65,10 +80,15 @@ public:
const TopologyDescriptionPtr getTopologyDescription() const;
private:
+ void _publishTopologyDescriptionChanged(
+ const TopologyDescriptionPtr& oldTopologyDescription,
+ const TopologyDescriptionPtr& newTopologyDescription) const;
+
mutable mongo::Mutex _mutex = MONGO_MAKE_LATCH("TopologyManager");
const SdamConfiguration _config;
ClockSource* _clockSource;
- std::shared_ptr<TopologyDescription> _topologyDescription;
- std::unique_ptr<TopologyStateMachine> _topologyStateMachine;
+ TopologyDescriptionPtr _topologyDescription;
+ TopologyStateMachinePtr _topologyStateMachine;
+ TopologyEventsPublisherPtr _topologyEventsPublisher;
};
} // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/topology_state_machine.h b/src/mongo/client/sdam/topology_state_machine.h
index abed9bc854f..fcb5b7e0c99 100644
--- a/src/mongo/client/sdam/topology_state_machine.h
+++ b/src/mongo/client/sdam/topology_state_machine.h
@@ -101,4 +101,5 @@ private:
static inline auto kLogPrefix = "sdam : ";
};
+using TopologyStateMachinePtr = std::unique_ptr<TopologyStateMachine>;
} // namespace mongo::sdam