diff options
author | Lamont Nelson <lamont.nelson@mongodb.com> | 2019-11-08 21:36:55 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-08 21:36:55 +0000 |
commit | 124ad1c022f20dbeade4d67947e328dfe4b04e20 (patch) | |
tree | 01ae45e79f6f888ad4f15e78f2759d42c03975eb | |
parent | f02841245d47c0118f18aa1f1ee88c72ddf012ab (diff) | |
download | mongo-124ad1c022f20dbeade4d67947e328dfe4b04e20.tar.gz |
SERVER-43331 Implement State Machine for Server Discovery and Monitoring Spec
19 files changed, 3455 insertions, 0 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 168b24c5f9e..1dca1fea005 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -326,6 +326,10 @@ error_codes: - {code: 290,name: TransactionExceededLifetimeLimitSeconds,categories: [ExceededTimeLimitError]} - {code: 291,name: NoQueryExecutionPlans} - {code: 292,name: QueryExceededMemoryLimitNoDiskUseAllowed} + - {code: 293,name: InvalidSeedList} + - {code: 294,name: InvalidTopologyType} + - {code: 295,name: InvalidHeartBeatFrequency} + - {code: 296,name: TopologySetNameRequired} # Error codes 4000-8999 are reserved. diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 097e1916a11..a50eba71e70 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -4,6 +4,11 @@ Import('env') env = env.Clone() +env.SConscript( + dirs=['sdam'], + exports=['env'] +) + # Contains only the core ConnectionString functionality, *not* the ability to call connect() and # return a DBClientBase* back. For that you need to link against the 'clientdriver_network' library. env.Library( diff --git a/src/mongo/client/sdam/SConscript b/src/mongo/client/sdam/SConscript new file mode 100644 index 00000000000..19c8760fc3f --- /dev/null +++ b/src/mongo/client/sdam/SConscript @@ -0,0 +1,50 @@ +# -*- mode: python -*- + +Import('env') + +env = env.Clone() + +env.Library( + target='sdam', + source=[ + 'sdam_datatypes.cpp', + 'server_description.cpp', + 'topology_description.cpp', + 'topology_state_machine.cpp', + 'topology_manager.cpp' + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/repl/optime', + '$BUILD_DIR/mongo/util/clock_sources', + '$BUILD_DIR/mongo/db/wire_version' + ], +) + +env.Library( + target='sdam_test', + source=[ + 'server_description_builder.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], +) + +env.CppUnitTest( + target='topology_description_test', + source=['topology_description_test.cpp'], + LIBDEPS=['sdam', 'sdam_test'], +) + +env.CppUnitTest( + target='server_description_test', + source=['server_description_test.cpp'], + LIBDEPS=['sdam', 'sdam_test'], +) + +env.CppUnitTest( + target='topology_state_machine_test', + source=['topology_state_machine_test.cpp'], + LIBDEPS=['sdam', 'sdam_test'], +) diff --git a/src/mongo/client/sdam/sdam_datatypes.cpp b/src/mongo/client/sdam/sdam_datatypes.cpp new file mode 100644 index 00000000000..d9cf9657171 --- /dev/null +++ b/src/mongo/client/sdam/sdam_datatypes.cpp @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/client/sdam/sdam_datatypes.h" + +namespace mongo::sdam { +std::string toString(const ServerType serverType) { + switch (serverType) { + case ServerType::kStandalone: + return "Standalone"; + case ServerType::kMongos: + return "Mongos"; + case ServerType::kRSPrimary: + return "RSPrimary"; + case ServerType::kRSSecondary: + return "RSSecondary"; + case ServerType::kRSArbiter: + return "RSArbiter"; + case ServerType::kRSOther: + return "RSOther"; + case ServerType::kRSGhost: + return "RSGhost"; + case ServerType::kUnknown: + return "Unknown"; + default: + MONGO_UNREACHABLE; + } +} + +std::ostream& operator<<(std::ostream& os, const ServerType serverType) { + os << toString(serverType); + return os; +} + +const std::vector<ServerType> allServerTypes() { + static auto const result = std::vector<ServerType>{ServerType::kStandalone, + ServerType::kMongos, + ServerType::kRSPrimary, + ServerType::kRSSecondary, + ServerType::kRSArbiter, + ServerType::kRSOther, + ServerType::kRSGhost, + ServerType::kUnknown}; + return result; +} + + +std::string toString(const TopologyType topologyType) { + switch (topologyType) { + case TopologyType::kReplicaSetNoPrimary: + return "ReplicaSetNoPrimary"; + case TopologyType::kReplicaSetWithPrimary: + return "ReplicaSetWithPrimary"; + case TopologyType::kSharded: + return "Sharded"; + case TopologyType::kUnknown: + return "Unknown"; + case TopologyType::kSingle: + return "Single"; + default: + MONGO_UNREACHABLE + } +} + +std::ostream& operator<<(std::ostream& os, const TopologyType topologyType) { + os << toString(topologyType); + return os; +} + +const std::vector<TopologyType> allTopologyTypes() { + static auto const result = std::vector<TopologyType>{TopologyType::kSingle, + TopologyType::kReplicaSetNoPrimary, + TopologyType::kReplicaSetWithPrimary, + TopologyType::kSharded, + TopologyType::kUnknown}; + return result; +} + +const ServerAddress& IsMasterOutcome::getServer() const { + return _server; +} +bool IsMasterOutcome::isSuccess() const { + return _success; +} +const boost::optional<BSONObj>& IsMasterOutcome::getResponse() const { + return _response; +} +const boost::optional<IsMasterRTT>& IsMasterOutcome::getRtt() const { + return _rtt; +} +const std::string& IsMasterOutcome::getErrorMsg() const { + return _errorMsg; +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h new file mode 100644 index 00000000000..d8b37df42d2 --- /dev/null +++ b/src/mongo/client/sdam/sdam_datatypes.h @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <chrono> +#include <string> + +#include "mongo/bson/bsonobj.h" +#include "mongo/util/duration.h" + + +/** + * The data structures in this file are defined in the "Server Discovery & Monitoring" + * specification, which governs how topology changes are detected in a cluster. See + * https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst + * for more information. + */ +namespace mongo::sdam { +enum class TopologyType { + kSingle, + kReplicaSetNoPrimary, + kReplicaSetWithPrimary, + kSharded, + kUnknown +}; +const std::vector<TopologyType> allTopologyTypes(); +std::string toString(const TopologyType topologyType); +std::ostream& operator<<(std::ostream& os, const TopologyType topologyType); + +enum class ServerType { + kStandalone, + kMongos, + kRSPrimary, + kRSSecondary, + kRSArbiter, + kRSOther, + kRSGhost, + kUnknown +}; +const std::vector<ServerType> allServerTypes(); +std::string toString(const ServerType serverType); +std::ostream& operator<<(std::ostream& os, const ServerType serverType); + +using ServerAddress = std::string; +using IsMasterRTT = mongo::Nanoseconds; + +// The result of an attempt to call the "ismaster" command on a server. +class IsMasterOutcome { + IsMasterOutcome() = delete; + +public: + // success constructor + IsMasterOutcome(ServerAddress server, BSONObj response, IsMasterRTT rtt) + : _server(std::move(server)), _success(true), _response(response), _rtt(rtt) {} + + // failure constructor + IsMasterOutcome(ServerAddress server, std::string errorMsg) + : _server(std::move(server)), _success(false), _errorMsg(errorMsg) {} + + const ServerAddress& getServer() const; + bool isSuccess() const; + const boost::optional<BSONObj>& getResponse() const; + const boost::optional<IsMasterRTT>& getRtt() const; + const std::string& getErrorMsg() const; + +private: + ServerAddress _server; + // indicating the success or failure of the attempt + bool _success; + // an error message in case of failure + std::string _errorMsg; + // a document containing the command response (or boost::none if it failed) + boost::optional<BSONObj> _response; + // the round trip time to execute the command (or null if it failed) + boost::optional<IsMasterRTT> _rtt; +}; + +class ServerDescription; +using ServerDescriptionPtr = std::shared_ptr<ServerDescription>; + +class TopologyDescription; +using TopologyDescriptionPtr = std::shared_ptr<TopologyDescription>; +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/sdam_test_base.h b/src/mongo/client/sdam/sdam_test_base.h new file mode 100644 index 00000000000..3532e0588b7 --- /dev/null +++ b/src/mongo/client/sdam/sdam_test_base.h @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include <map> +#include <ostream> +#include <set> +#include <string> +#include <type_traits> +#include <vector> + +#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/client/sdam/server_description.h" + + +/** + * The following facilitates writing tests in the Server Discovery And Monitoring (sdam) namespace. + */ +namespace mongo { +template <typename T> +std::ostream& operator<<(std::ostream& os, const std::vector<T>& s) { + os << "["; + size_t i = 0; + for (const auto& item : s) { + os << item; + if (i != s.size() - 1) + os << ", "; + } + os << "]"; + return os; +} + +template <typename T> +std::ostream& operator<<(std::ostream& os, const std::set<T>& s) { + os << "{"; + size_t i = 0; + for (const auto& item : s) { + os << item; + if (i != s.size() - 1) + os << ", "; + } + os << "}"; + return os; +} + +template <typename K, typename V> +std::ostream& operator<<(std::ostream& os, const std::map<K, V>& m) { + os << "{"; + size_t i = 0; + for (const auto& item : m) { + os << item.first << ": " << item.second; + if (i != m.size() - 1) + os << ", "; + } + os << "}"; + return os; +} + +template std::ostream& operator<<(std::ostream& os, + const std::vector<mongo::sdam::ServerDescriptionPtr>& v); +template std::ostream& operator<<(std::ostream& os, const std::set<std::string>& s); +template std::ostream& operator<<(std::ostream& os, const std::map<std::string, std::string>& m); +}; // namespace mongo + +// We include this here because the ASSERT_EQUALS needs to have the operator<< defined +// beforehand for the types used in the tests. +#include "mongo/unittest/unittest.h" +namespace mongo { +namespace sdam { +using mongo::operator<<; + +class SdamTestFixture : public mongo::unittest::Test { +protected: + template <typename T, typename U> + std::vector<U> map(std::vector<T> source, std::function<U(const T&)> f) { + std::vector<U> result; + std::transform(source.begin(), + source.end(), + std::back_inserter(result), + [f](const auto& item) { return f(item); }); + return result; + } + + template <typename T, typename U> + std::set<U> mapSet(std::vector<T> source, std::function<U(const T&)> f) { + auto v = map<T, U>(source, f); + std::set<U> result(v.begin(), v.end()); + return result; + } +}; +} // namespace sdam +} // namespace mongo diff --git a/src/mongo/client/sdam/server_description.cpp b/src/mongo/client/sdam/server_description.cpp new file mode 100644 index 00000000000..21726e4ee1a --- /dev/null +++ b/src/mongo/client/sdam/server_description.cpp @@ -0,0 +1,364 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/client/sdam/server_description.h" +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include <algorithm> +#include <boost/algorithm/string.hpp> +#include <boost/optional.hpp> +#include <set> + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/oid.h" +#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/util/duration.h" +#include "mongo/util/log.h" + + +namespace mongo::sdam { +ServerDescription::ServerDescription(ClockSource* clockSource, + const IsMasterOutcome& isMasterOutcome, + boost::optional<IsMasterRTT> lastRtt) + : ServerDescription(isMasterOutcome.getServer()) { + if (isMasterOutcome.isSuccess()) { + const auto response = *isMasterOutcome.getResponse(); + + // type must be parsed before RTT is calculated. + parseTypeFromIsMaster(response); + calculateRtt(*isMasterOutcome.getRtt(), lastRtt); + + _lastUpdateTime = clockSource->now(); + _minWireVersion = response["minWireVersion"].numberInt(); + _maxWireVersion = response["maxWireVersion"].numberInt(); + + saveLastWriteInfo(response.getObjectField("lastWrite")); + saveHosts(response); + saveTags(response.getObjectField("tags")); + saveElectionId(response.getField("electionId")); + + auto lsTimeoutField = response.getField("logicalSessionTimeoutMinutes"); + if (lsTimeoutField.type() == BSONType::NumberInt) { + _logicalSessionTimeoutMinutes = lsTimeoutField.numberInt(); + } + + auto setVersionField = response.getField("setVersion"); + if (setVersionField.type() == BSONType::NumberInt) { + _setVersion = response["setVersion"].numberInt(); + } + + auto setNameField = response.getField("setName"); + if (setNameField.type() == BSONType::String) { + _setName = response["setName"].str(); + } + + auto primaryField = response.getField("primary"); + if (primaryField.type() == BSONType::String) { + _primary = response.getStringField("primary"); + } + } else { + _error = isMasterOutcome.getErrorMsg(); + } +} + +void ServerDescription::storeHostListIfPresent(const std::string key, + const BSONObj response, + std::set<ServerAddress>& destination) { + if (response.hasField(key)) { + auto hostsBsonArray = response[key].Array(); + std::transform(hostsBsonArray.begin(), + hostsBsonArray.end(), + std::inserter(destination, destination.begin()), + [](const BSONElement e) { return boost::to_lower_copy(e.String()); }); + } +} + +void ServerDescription::saveHosts(const BSONObj response) { + if (response.hasField("me")) { + auto me = response.getField("me"); + _me = boost::to_lower_copy(me.str()); + } + + storeHostListIfPresent("hosts", response, _hosts); + storeHostListIfPresent("passives", response, _passives); + storeHostListIfPresent("arbiters", response, _arbiters); +} + +void ServerDescription::saveTags(BSONObj tagsObj) { + const auto keys = tagsObj.getFieldNames<std::set<std::string>>(); + for (const auto key : keys) { + _tags[key] = tagsObj.getStringField(key); + } +} + +void ServerDescription::saveElectionId(BSONElement electionId) { + if (electionId.type() == jstOID) { + _electionId = electionId.OID(); + } +} + +void ServerDescription::calculateRtt(const IsMasterRTT currentRtt, + const boost::optional<IsMasterRTT> lastRtt) { + if (getType() == ServerType::kUnknown) { + // if a server's type is Unknown, it's RTT is null + // see: + // https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#roundtriptime + return; + } + + if (lastRtt) { + // new_rtt = alpha * x + (1 - alpha) * old_rtt + _rtt = IsMasterRTT(static_cast<IsMasterRTT::rep>(kRttAlpha * currentRtt.count() + + (1 - kRttAlpha) * lastRtt.get().count())); + } else { + _rtt = currentRtt; + } +} + +void ServerDescription::saveLastWriteInfo(BSONObj lastWriteBson) { + const auto lastWriteDateField = lastWriteBson.getField("lastWriteDate"); + if (lastWriteDateField.type() == BSONType::Date) { + _lastWriteDate = lastWriteDateField.date(); + } + + const auto opTimeParse = + repl::OpTime::parseFromOplogEntry(lastWriteBson.getObjectField("opTime")); + if (opTimeParse.isOK()) { + _opTime = opTimeParse.getValue(); + } +} + +void ServerDescription::parseTypeFromIsMaster(const BSONObj isMaster) { + ServerType t; + bool hasSetName = isMaster.hasField("setName"); + + if (isMaster.getField("ok").numberInt() != 1) { + t = ServerType::kUnknown; + } else if (!hasSetName && !isMaster.hasField("msg") && !isMaster.getBoolField("isreplicaset")) { + t = ServerType::kStandalone; + } else if (kIsDbGrid == isMaster.getStringField("msg")) { + t = ServerType::kMongos; + } else if (hasSetName && isMaster.getBoolField("ismaster")) { + t = ServerType::kRSPrimary; + } else if (hasSetName && isMaster.getBoolField("secondary")) { + t = ServerType::kRSSecondary; + } else if (hasSetName && isMaster.getBoolField("arbiterOnly")) { + t = ServerType::kRSArbiter; + } else if (hasSetName && isMaster.getBoolField("hidden")) { + t = ServerType::kRSOther; + } else if (isMaster.getBoolField("isreplicaset")) { + t = ServerType::kRSGhost; + } else { + error() << "unknown server type from successful ismaster reply: " << isMaster.toString(); + t = ServerType::kUnknown; + } + _type = t; +} + +const ServerAddress& ServerDescription::getAddress() const { + return _address; +} + +const boost::optional<std::string>& ServerDescription::getError() const { + return _error; +} + +const boost::optional<IsMasterRTT>& ServerDescription::getRtt() const { + return _rtt; +} + +const boost::optional<mongo::Date_t>& ServerDescription::getLastWriteDate() const { + return _lastWriteDate; +} + +const boost::optional<repl::OpTime>& ServerDescription::getOpTime() const { + return _opTime; +} + +ServerType ServerDescription::getType() const { + return _type; +} + +const boost::optional<ServerAddress>& ServerDescription::getMe() const { + return _me; +} + +const std::set<ServerAddress>& ServerDescription::getHosts() const { + return _hosts; +} + +const std::set<ServerAddress>& ServerDescription::getPassives() const { + return _passives; +} + +const std::set<ServerAddress>& ServerDescription::getArbiters() const { + return _arbiters; +} + +const std::map<std::string, std::string>& ServerDescription::getTags() const { + return _tags; +} + +const boost::optional<std::string>& ServerDescription::getSetName() const { + return _setName; +} + +const boost::optional<int>& ServerDescription::getSetVersion() const { + return _setVersion; +} + +const boost::optional<mongo::OID>& ServerDescription::getElectionId() const { + return _electionId; +} + +const boost::optional<ServerAddress>& ServerDescription::getPrimary() const { + return _primary; +} + +const mongo::Date_t ServerDescription::getLastUpdateTime() const { + return *_lastUpdateTime; +} + +const boost::optional<int>& ServerDescription::getLogicalSessionTimeoutMinutes() const { + return _logicalSessionTimeoutMinutes; +} + +bool ServerDescription::isEquivalent(const ServerDescription& other) const { + auto otherValues = std::tie(other._type, + other._minWireVersion, + other._maxWireVersion, + other._me, + other._hosts, + other._passives, + other._arbiters, + other._tags, + other._setName, + other._setVersion, + other._electionId, + other._primary, + other._logicalSessionTimeoutMinutes); + auto thisValues = std::tie(_type, + _minWireVersion, + _maxWireVersion, + _me, + _hosts, + _passives, + _arbiters, + _tags, + _setName, + _setVersion, + _electionId, + _primary, + _logicalSessionTimeoutMinutes); + return thisValues == otherValues; +} + +bool ServerDescription::isDataBearingServer() const { + return kDataServerTypes.find(_type) != kDataServerTypes.end(); +} + +// output server description to bson. This is primarily used for debugging. +BSONObj ServerDescription::toBson() const { + BSONObjBuilder bson; + bson.append("address", _address); + if (_rtt) { + bson.append("roundTripTime", durationCount<Microseconds>(*_rtt)); + } + + if (_lastWriteDate) { + bson.appendDate("lastWriteDate", *_lastWriteDate); + } + + if (_opTime) { + bson.append("opTime", _opTime->toBSON()); + } + + { + using mongo::sdam::toString; + bson.append("type", toString(_type)); + } + + bson.append("minWireVersion", _minWireVersion); + bson.append("maxWireVersion", _maxWireVersion); + if (_me) { + bson.append("me", *_me); + } + if (_setName) { + bson.append("setName", *_setName); + } + if (_setVersion) { + bson.append("setVersion", *_setVersion); + } + if (_electionId) { + bson.append("electionId", *_electionId); + } + if (_primary) { + bson.append("primary", *_primary); + } + if (_lastUpdateTime) { + bson.append("lastUpdateTime", *_lastUpdateTime); + } + if (_logicalSessionTimeoutMinutes) { + bson.append("logicalSessionTimeoutMinutes", *_logicalSessionTimeoutMinutes); + } + return bson.obj(); +} + +int ServerDescription::getMinWireVersion() const { + return _minWireVersion; +} + +int ServerDescription::getMaxWireVersion() const { + return _maxWireVersion; +} + +std::string ServerDescription::toString() const { + return toBson().toString(); +} + + +bool operator==(const mongo::sdam::ServerDescription& a, const mongo::sdam::ServerDescription& b) { + return a.isEquivalent(b); +} + +bool operator!=(const mongo::sdam::ServerDescription& a, const mongo::sdam::ServerDescription& b) { + return !(a == b); +} + +std::ostream& operator<<(std::ostream& os, const ServerDescription& description) { + BSONObj obj = description.toBson(); + os << obj.toString(); + return os; +} + +std::ostream& operator<<(std::ostream& os, const ServerDescriptionPtr& description) { + os << *description; + return os; +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_description.h b/src/mongo/client/sdam/server_description.h new file mode 100644 index 00000000000..0d69ad86f2c --- /dev/null +++ b/src/mongo/client/sdam/server_description.h @@ -0,0 +1,199 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include <boost/algorithm/string.hpp> +#include <boost/optional.hpp> +#include <map> +#include <ostream> +#include <set> +#include <utility> + +#include "mongo/bson/oid.h" +#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/db/repl/optime.h" +#include "mongo/platform/basic.h" +#include "mongo/util/clock_source.h" + +namespace mongo::sdam { +class ServerDescription { + ServerDescription() = delete; + +public: + /** + * Construct an unknown ServerDescription with default values except the server's address. + */ + ServerDescription(ServerAddress address) + : _address(std::move(address)), _type(ServerType::kUnknown) { + boost::to_lower(_address); + } + + /** + * Build a new ServerDescription according to the rules of the SDAM spec based on the + * last RTT to the server and isMaster response. + */ + ServerDescription(ClockSource* clockSource, + const IsMasterOutcome& isMasterOutcome, + boost::optional<IsMasterRTT> lastRtt = boost::none); + + /** + * This determines if a server description is equivalent according to the Server Discovery and + * Monitoring specification. Members marked with (=) are used to determine equality. Note that + * these members do not include RTT or the server's address. + */ + bool isEquivalent(const ServerDescription& other) const; + + // server identity + const ServerAddress& getAddress() const; + ServerType getType() const; + const boost::optional<ServerAddress>& getMe() const; + const boost::optional<std::string>& getSetName() const; + const std::map<std::string, std::string>& getTags() const; + + // network attributes + const boost::optional<std::string>& getError() const; + const boost::optional<IsMasterRTT>& getRtt() const; + const boost::optional<int>& getLogicalSessionTimeoutMinutes() const; + + // server capabilities + int getMinWireVersion() const; + int getMaxWireVersion() const; + bool isDataBearingServer() const; + + // server 'time' + const Date_t getLastUpdateTime() const; + const boost::optional<Date_t>& getLastWriteDate() const; + const boost::optional<repl::OpTime>& getOpTime() const; + + // topology membership + const boost::optional<ServerAddress>& getPrimary() const; + const std::set<ServerAddress>& getHosts() const; + const std::set<ServerAddress>& getPassives() const; + const std::set<ServerAddress>& getArbiters() const; + const boost::optional<int>& getSetVersion() const; + const boost::optional<OID>& getElectionId() const; + + BSONObj toBson() const; + std::string toString() const; + +private: + /** + * Classify the server's type based on the ismaster response. + * @param isMaster - reply information for the ismaster command + */ + void parseTypeFromIsMaster(const BSONObj isMaster); + + + void calculateRtt(const IsMasterRTT currentRtt, const boost::optional<IsMasterRTT> lastRtt); + void saveLastWriteInfo(BSONObj lastWriteBson); + + void storeHostListIfPresent(const std::string key, + const BSONObj response, + std::set<ServerAddress>& destination); + void saveHosts(const BSONObj response); + void saveTags(BSONObj tagsObj); + void saveElectionId(BSONElement electionId); + + static inline const std::set<ServerType> kDataServerTypes{ServerType::kMongos, + ServerType::kRSPrimary, + ServerType::kRSSecondary, + ServerType::kStandalone}; + + static inline const std::string kIsDbGrid = "isdbgrid"; + static inline const double kRttAlpha = 0.2; + + // address: the hostname or IP, and the port number, that the client connects to. Note that this + // is not the server's ismaster.me field, in the case that the server reports an address + // different from the address the client uses. + ServerAddress _address; + + // error: information about the last error related to this server. Default null. + boost::optional<std::string> _error; + + // roundTripTime: the duration of the ismaster call. Default null. + boost::optional<IsMasterRTT> _rtt; + + // lastWriteDate: a 64-bit BSON datetime or null. The "lastWriteDate" from the server's most + // recent ismaster response. + boost::optional<Date_t> _lastWriteDate; + + // opTime: an ObjectId or null. The last opTime reported by the server; an ObjectId or null. + // (Only mongos and shard servers record this field when monitoring config servers as replica + // sets.) + boost::optional<repl::OpTime> _opTime; + + // (=) type: a ServerType enum value. Default Unknown. + ServerType _type; + + // (=) minWireVersion, maxWireVersion: the wire protocol version range supported by the server. + // Both default to 0. Use min and maxWireVersion only to determine compatibility. + int _minWireVersion = 0; + int _maxWireVersion = 0; + + // (=) me: The hostname or IP, and the port number, that this server was configured with in the + // replica set. Default null. + boost::optional<ServerAddress> _me; + + // (=) hosts, passives, arbiters: Sets of addresses. This server's opinion of the replica set's + // members, if any. These hostnames are normalized to lower-case. Default empty. The client + // monitors all three types of servers in a replica set. + std::set<ServerAddress> _hosts; + std::set<ServerAddress> _passives; + std::set<ServerAddress> _arbiters; + + // (=) tags: map from string to string. Default empty. + std::map<std::string, std::string> _tags; + + // (=) setName: string or null. Default null. + boost::optional<std::string> _setName; + + // (=) setVersion: integer or null. Default null. + boost::optional<int> _setVersion; + + // (=) electionId: an ObjectId, if this is a MongoDB 2.6+ replica set member that believes it is + // primary. See using setVersion and electionId to detect stale primaries. Default null. + boost::optional<OID> _electionId; + + // (=) primary: an address. This server's opinion of who the primary is. Default null. + boost::optional<ServerAddress> _primary; + + // lastUpdateTime: when this server was last checked. Default "infinity ago". + boost::optional<Date_t> _lastUpdateTime = Date_t::min(); + + // (=) logicalSessionTimeoutMinutes: integer or null. Default null. + boost::optional<int> _logicalSessionTimeoutMinutes; + + friend class ServerDescriptionBuilder; +}; + +bool operator==(const mongo::sdam::ServerDescription& a, const mongo::sdam::ServerDescription& b); +bool operator!=(const mongo::sdam::ServerDescription& a, const mongo::sdam::ServerDescription& b); +std::ostream& operator<<(std::ostream& os, const ServerDescriptionPtr& description); +std::ostream& operator<<(std::ostream& os, const ServerDescription& description); +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_description_builder.cpp b/src/mongo/client/sdam/server_description_builder.cpp new file mode 100644 index 00000000000..16c40b025b8 --- /dev/null +++ b/src/mongo/client/sdam/server_description_builder.cpp @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/sdam/server_description_builder.h" + +namespace mongo::sdam { +ServerDescriptionPtr ServerDescriptionBuilder::instance() const { + return _instance; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withAddress(const ServerAddress& address) { + _instance->_address = address; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withError(const std::string& error) { + _instance->_error = error; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withRtt(const IsMasterRTT& rtt) { + _instance->_rtt = rtt; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withLastWriteDate(const Date_t& lastWriteDate) { + _instance->_lastWriteDate = lastWriteDate; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withOpTime(const repl::OpTime opTime) { + _instance->_opTime = opTime; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withType(const ServerType type) { + _instance->_type = type; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withMinWireVersion(int minVersion) { + _instance->_minWireVersion = minVersion; + return *this; +} +ServerDescriptionBuilder& ServerDescriptionBuilder::withMaxWireVersion(int maxVersion) { + _instance->_maxWireVersion = maxVersion; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withMe(const ServerAddress& me) { + _instance->_me = boost::to_lower_copy(me); + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withHost(const ServerAddress& host) { + _instance->_hosts.emplace(boost::to_lower_copy(host)); + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withPassive(const ServerAddress& passive) { + _instance->_passives.emplace(boost::to_lower_copy(passive)); + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withArbiter(const ServerAddress& arbiter) { + _instance->_arbiters.emplace(boost::to_lower_copy(arbiter)); + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withTag(const std::string key, + const std::string value) { + _instance->_tags[key] = value; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withSetName(const std::string setName) { + _instance->_setName = std::move(setName); + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withSetVersion(const int setVersion) { + _instance->_setVersion = setVersion; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withElectionId(const OID& electionId) { + _instance->_electionId = electionId; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withPrimary(const ServerAddress& primary) { + _instance->_primary = primary; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withLastUpdateTime( + const Date_t& lastUpdateTime) { + _instance->_lastUpdateTime = lastUpdateTime; + return *this; +} + +ServerDescriptionBuilder& ServerDescriptionBuilder::withLogicalSessionTimeoutMinutes( + const boost::optional<int> logicalSessionTimeoutMinutes) { + _instance->_logicalSessionTimeoutMinutes = logicalSessionTimeoutMinutes; + return *this; +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_description_builder.h b/src/mongo/client/sdam/server_description_builder.h new file mode 100644 index 00000000000..f9ddf7e1a41 --- /dev/null +++ b/src/mongo/client/sdam/server_description_builder.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once +#include "mongo/client/sdam/server_description.h" + +namespace mongo::sdam { + +/** + * This class is used in the unit tests to construct ServerDescription instances. For production + * code, ServerDescription instances should be constructed using its constructors. + */ +class ServerDescriptionBuilder { +public: + ServerDescriptionBuilder() = default; + + /** + * Return the configured ServerDescription instance. + */ + ServerDescriptionPtr instance() const; + + // server identity + ServerDescriptionBuilder& withAddress(const ServerAddress& address); + ServerDescriptionBuilder& withType(const ServerType type); + ServerDescriptionBuilder& withMe(const ServerAddress& me); + ServerDescriptionBuilder& withTag(const std::string key, const std::string value); + ServerDescriptionBuilder& withSetName(const std::string setName); + + // network attributes + ServerDescriptionBuilder& withRtt(const IsMasterRTT& rtt); + ServerDescriptionBuilder& withError(const std::string& error); + ServerDescriptionBuilder& withLogicalSessionTimeoutMinutes( + const boost::optional<int> logicalSessionTimeoutMinutes); + + // server capabilities + ServerDescriptionBuilder& withMinWireVersion(int minVersion); + ServerDescriptionBuilder& withMaxWireVersion(int maxVersion); + + // server 'time' + ServerDescriptionBuilder& withLastWriteDate(const Date_t& lastWriteDate); + ServerDescriptionBuilder& withOpTime(const repl::OpTime opTime); + ServerDescriptionBuilder& withLastUpdateTime(const Date_t& lastUpdateTime); + + // topology membership + ServerDescriptionBuilder& withPrimary(const ServerAddress& primary); + ServerDescriptionBuilder& withHost(const ServerAddress& host); + ServerDescriptionBuilder& withPassive(const ServerAddress& passive); + ServerDescriptionBuilder& withArbiter(const ServerAddress& arbiter); + ServerDescriptionBuilder& withSetVersion(const int setVersion); + ServerDescriptionBuilder& withElectionId(const OID& electionId); + +private: + constexpr static auto kServerAddressNotSet = "address.not.set:1234"; + ServerDescriptionPtr _instance = + std::shared_ptr<ServerDescription>(new ServerDescription(kServerAddressNotSet)); +}; +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/server_description_test.cpp b/src/mongo/client/sdam/server_description_test.cpp new file mode 100644 index 00000000000..263392dfe6f --- /dev/null +++ b/src/mongo/client/sdam/server_description_test.cpp @@ -0,0 +1,474 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/sdam/sdam_test_base.h" + +#include <boost/algorithm/string.hpp> +#include <boost/optional/optional_io.hpp> +#include <ostream> +#include <set> + +#include "mongo/client/sdam/server_description.h" +#include "mongo/client/sdam/server_description_builder.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/optime.h" +#include "mongo/util/system_clock_source.h" + +namespace mongo::sdam { +TEST(ServerDescriptionTest, ShouldNormalizeAddress) { + ServerDescription a("foo:1234"); + ServerDescription b("FOo:1234"); + ASSERT_EQUALS(a.getAddress(), b.getAddress()); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareDefaultValuesAsEqual) { + auto a = ServerDescription("foo:1234"); + auto b = ServerDescription("foo:1234"); + ASSERT_EQUALS(a, b); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareDifferentAddressButSameServerTypeAsEqual) { + // Note: The SDAM specification does not prescribe how to compare server descriptions with + // different addresses for equality. We choose that two descriptions are considered equal if + // their addresses are different. + auto a = *ServerDescriptionBuilder() + .withAddress("foo:1234") + .withType(ServerType::kStandalone) + .instance(); + auto b = *ServerDescriptionBuilder() + .withAddress("bar:1234") + .withType(ServerType::kStandalone) + .instance(); + ASSERT_EQUALS(a, b); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareServerTypes) { + auto a = *ServerDescriptionBuilder().withType(ServerType::kStandalone).instance(); + auto b = *ServerDescriptionBuilder().withType(ServerType::kRSSecondary).instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareMinWireVersion) { + auto a = *ServerDescriptionBuilder().withMinWireVersion(1).instance(); + auto b = *ServerDescriptionBuilder().withMinWireVersion(2).instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareMaxWireVersion) { + auto a = *ServerDescriptionBuilder().withMaxWireVersion(1).instance(); + auto b = *ServerDescriptionBuilder().withMaxWireVersion(2).instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareMeValues) { + auto a = *ServerDescriptionBuilder().withMe("foo").instance(); + auto b = *ServerDescriptionBuilder().withMe("bar").instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareHosts) { + auto a = *ServerDescriptionBuilder().withHost("foo").instance(); + auto b = *ServerDescriptionBuilder().withHost("bar").instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldComparePassives) { + auto a = *ServerDescriptionBuilder().withPassive("foo").instance(); + auto b = *ServerDescriptionBuilder().withPassive("bar").instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareArbiters) { + auto a = *ServerDescriptionBuilder().withArbiter("foo").instance(); + auto b = *ServerDescriptionBuilder().withArbiter("bar").instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareMultipleHostsOrderDoesntMatter) { + auto a = *ServerDescriptionBuilder().withHost("foo").withHost("bar").instance(); + auto b = *ServerDescriptionBuilder().withHost("bar").withHost("foo").instance(); + ASSERT_EQUALS(a, b); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareMultiplePassivesOrderDoesntMatter) { + auto a = *ServerDescriptionBuilder().withPassive("foo").withPassive("bar").instance(); + auto b = *ServerDescriptionBuilder().withPassive("bar").withPassive("foo").instance(); + ASSERT_EQUALS(a, b); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareMultipleArbitersOrderDoesntMatter) { + auto a = *ServerDescriptionBuilder().withArbiter("foo").withArbiter("bar").instance(); + auto b = *ServerDescriptionBuilder().withArbiter("bar").withArbiter("foo").instance(); + ASSERT_EQUALS(a, b); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareTags) { + auto a = *ServerDescriptionBuilder().withTag("foo", "bar").instance(); + auto b = *ServerDescriptionBuilder().withTag("baz", "buz").instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareSetName) { + auto a = *ServerDescriptionBuilder().withSetName("foo").instance(); + auto b = *ServerDescriptionBuilder().withSetName("bar").instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareSetVersion) { + auto a = *ServerDescriptionBuilder().withSetVersion(1).instance(); + auto b = *ServerDescriptionBuilder().withSetVersion(2).instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareElectionId) { + auto a = *ServerDescriptionBuilder().withElectionId(OID::max()).instance(); + auto b = *ServerDescriptionBuilder().withElectionId(OID("000000000000000000000000")).instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldComparePrimary) { + auto a = *ServerDescriptionBuilder().withPrimary("foo:1234").instance(); + auto b = *ServerDescriptionBuilder().withPrimary("bar:1234").instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + +TEST(ServerDescriptionEqualityTest, ShouldCompareLogicalSessionTimeout) { + auto a = *ServerDescriptionBuilder().withLogicalSessionTimeoutMinutes(1).instance(); + auto b = *ServerDescriptionBuilder().withLogicalSessionTimeoutMinutes(2).instance(); + ASSERT_NOT_EQUALS(a, b); + ASSERT_EQUALS(a, a); +} + + +class ServerDescriptionTestFixture : public SdamTestFixture { +protected: + // returns a set containing the elements in the given bson array with lowercase values. + std::set<std::string> toHostSet(std::vector<BSONElement> bsonArray) { + return mapSet<BSONElement, std::string>( + bsonArray, [](const BSONElement& e) { return boost::to_lower_copy(e.String()); }); + } + + std::map<std::string, std::string> toStringMap(BSONObj bsonObj) { + std::map<std::string, std::string> result; + const auto keys = bsonObj.getFieldNames<std::set<std::string>>(); + std::transform(keys.begin(), + keys.end(), + std::inserter(result, result.begin()), + [bsonObj](const std::string& key) { + return std::pair<const std::string, std::string>( + key, bsonObj.getStringField(key)); + }); + return result; + } + + static BSONObjBuilder okBuilder() { + return std::move(BSONObjBuilder().append("ok", 1)); + } + + static inline const auto clockSource = SystemClockSource::get(); + + static inline const auto kBsonOk = okBuilder().obj(); + static inline const auto kBsonMissingOk = BSONObjBuilder().obj(); + static inline const auto kBsonMongos = okBuilder().append("msg", "isdbgrid").obj(); + static inline const auto kBsonRsPrimary = + okBuilder().append("ismaster", true).append("setName", "foo").obj(); + static inline const auto kBsonRsSecondary = + okBuilder().append("secondary", true).append("setName", "foo").obj(); + static inline const auto kBsonRsArbiter = + okBuilder().append("arbiterOnly", true).append("setName", "foo").obj(); + static inline const auto kBsonRsOther = + okBuilder().append("hidden", true).append("setName", "foo").obj(); + static inline const auto kBsonRsGhost = okBuilder().append("isreplicaset", true).obj(); + static inline const auto kBsonWireVersion = + okBuilder().append("minWireVersion", 1).append("maxWireVersion", 2).obj(); + static inline const auto kBsonTags = + okBuilder() + .append("tags", BSONObjBuilder().append("foo", "bar").append("baz", "buz").obj()) + .obj(); + static inline const mongo::repl::OpTime kOpTime = + mongo::repl::OpTime(Timestamp(1568848910), 24); + static inline const Date_t kLastWriteDate = + dateFromISOString("2019-09-18T23:21:50Z").getValue(); + static inline const auto kBsonLastWrite = + okBuilder() + .append("lastWrite", + BSONObjBuilder() + .appendTimeT("lastWriteDate", kLastWriteDate.toTimeT()) + .append("opTime", kOpTime.toBSON()) + .obj()) + .obj(); + static inline const auto kBsonHostNames = okBuilder() + .append("me", "Me:1234") + .appendArray("hosts", + BSON_ARRAY("Foo:1234" + << "Bar:1234")) + .appendArray("arbiters", + BSON_ARRAY("Baz:1234" + << "Buz:1234")) + .appendArray("passives", + BSON_ARRAY("Biz:1234" + << "Boz:1234")) + .obj(); + static inline const auto kBsonSetVersionName = + okBuilder().append("setVersion", 1).append("setName", "bar").obj(); + static inline const auto kBsonElectionId = okBuilder().append("electionId", OID::max()).obj(); + static inline const auto kBsonPrimary = okBuilder().append("primary", "foo:1234").obj(); + static inline const auto kBsonLogicalSessionTimeout = + okBuilder().append("logicalSessionTimeoutMinutes", 1).obj(); +}; + +TEST_F(ServerDescriptionTestFixture, ShouldParseTypeAsUnknownForIsMasterError) { + auto response = IsMasterOutcome("foo:1234", "an error occurred"); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(ServerType::kUnknown, description.getType()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldParseTypeAsUnknownIfOkMissing) { + auto response = IsMasterOutcome("foo:1234", kBsonMissingOk, IsMasterRTT::min()); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(ServerType::kUnknown, description.getType()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldParseTypeAsStandalone) { + // No "msg: isdbgrid", no setName, and no "isreplicaset: true". + auto response = IsMasterOutcome("foo:1234", kBsonOk, IsMasterRTT::min()); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(ServerType::kStandalone, description.getType()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldParseTypeAsMongos) { + // contains "msg: isdbgrid" + auto response = IsMasterOutcome("foo:1234", kBsonMongos, IsMasterRTT::min()); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(ServerType::kMongos, description.getType()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldParseTypeAsRSPrimary) { + // "ismaster: true", "setName" in response + auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, IsMasterRTT::min()); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(ServerType::kRSPrimary, description.getType()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldParseTypeAsRSSecondary) { + // "secondary: true", "setName" in response + auto response = IsMasterOutcome("foo:1234", kBsonRsSecondary, IsMasterRTT::min()); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(ServerType::kRSSecondary, description.getType()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldParseTypeAsArbiter) { + // "arbiterOnly: true", "setName" in response. + auto response = IsMasterOutcome("foo:1234", kBsonRsArbiter, IsMasterRTT::min()); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(ServerType::kRSArbiter, description.getType()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldParseTypeAsOther) { + // "hidden: true", "setName" in response, or not primary, secondary, nor arbiter + auto response = IsMasterOutcome("foo:1234", kBsonRsOther, IsMasterRTT::min()); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(ServerType::kRSOther, description.getType()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldParseTypeAsGhost) { + // "isreplicaset: true" in response. + auto response = IsMasterOutcome("foo:1234", kBsonRsGhost, IsMasterRTT::min()); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(ServerType::kRSGhost, description.getType()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreErrorDescription) { + auto errorMsg = "an error occurred"; + auto response = IsMasterOutcome("foo:1234", errorMsg); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(errorMsg, *description.getError()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreRTTWithNoPreviousLatency) { + auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, IsMasterRTT::max()); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(IsMasterRTT::max(), *description.getRtt()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreRTTNullWhenServerTypeIsUnknown) { + auto response = IsMasterOutcome("foo:1234", kBsonMissingOk, IsMasterRTT::max()); + auto description = ServerDescription(clockSource, response, boost::none); + ASSERT_EQUALS(boost::none, description.getRtt()); +} + +TEST_F(ServerDescriptionTestFixture, + ShouldStoreMovingAverageRTTWhenChangingFromOneKnownServerTypeToAnother) { + auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(40)); + auto lastServerDescription = ServerDescriptionBuilder() + .withType(ServerType::kRSSecondary) + .withRtt(mongo::Milliseconds(20)) + .instance(); + auto description = ServerDescription(clockSource, response, lastServerDescription->getRtt()); + ASSERT_EQUALS(24, durationCount<mongo::Milliseconds>(*description.getRtt())); + + auto response2 = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(30)); + auto description2 = ServerDescription(clockSource, response2, description.getRtt()); + ASSERT_EQUALS(25, durationCount<mongo::Milliseconds>(*description2.getRtt())); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreLastWriteDate) { + auto response = IsMasterOutcome("foo:1234", kBsonLastWrite, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(kLastWriteDate, description.getLastWriteDate()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreOpTime) { + auto response = IsMasterOutcome("foo:1234", kBsonLastWrite, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(kOpTime, description.getOpTime()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreLastUpdateTime) { + auto testStart = clockSource->now(); + auto response = IsMasterOutcome("foo:1234", kBsonRsPrimary, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_GREATER_THAN_OR_EQUALS(description.getLastUpdateTime(), testStart); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreHostNamesAsLowercase) { + auto response = IsMasterOutcome("FOO:1234", kBsonHostNames, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + + ASSERT_EQUALS("foo:1234", description.getAddress()); + + ASSERT_EQUALS(boost::to_lower_copy(std::string(kBsonHostNames.getStringField("me"))), + *description.getMe()); + + auto expectedHosts = toHostSet(kBsonHostNames.getField("hosts").Array()); + ASSERT_EQUALS(expectedHosts, description.getHosts()); + + auto expectedPassives = toHostSet(kBsonHostNames.getField("passives").Array()); + ASSERT_EQUALS(expectedPassives, description.getPassives()); + + auto expectedArbiters = toHostSet(kBsonHostNames.getField("arbiters").Array()); + ASSERT_EQUALS(expectedArbiters, description.getArbiters()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreMinMaxWireVersion) { + auto response = IsMasterOutcome("foo:1234", kBsonWireVersion, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(kBsonWireVersion["minWireVersion"].Int(), description.getMinWireVersion()); + ASSERT_EQUALS(kBsonWireVersion["maxWireVersion"].Int(), description.getMaxWireVersion()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreTags) { + auto response = IsMasterOutcome("foo:1234", kBsonTags, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(toStringMap(kBsonTags["tags"].Obj()), description.getTags()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreSetVersionAndName) { + auto response = IsMasterOutcome("foo:1234", kBsonSetVersionName, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(kBsonSetVersionName.getIntField("setVersion"), description.getSetVersion()); + ASSERT_EQUALS(std::string(kBsonSetVersionName.getStringField("setName")), + description.getSetName()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreElectionId) { + auto response = IsMasterOutcome("foo:1234", kBsonElectionId, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(kBsonElectionId.getField("electionId").OID(), description.getElectionId()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStorePrimary) { + auto response = IsMasterOutcome("foo:1234", kBsonPrimary, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(std::string(kBsonPrimary.getStringField("primary")), description.getPrimary()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreLogicalSessionTimeout) { + auto response = + IsMasterOutcome("foo:1234", kBsonLogicalSessionTimeout, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(kBsonLogicalSessionTimeout.getIntField("logicalSessionTimeoutMinutes"), + description.getLogicalSessionTimeoutMinutes()); +} + + +TEST_F(ServerDescriptionTestFixture, ShouldStoreServerAddressOnError) { + auto response = IsMasterOutcome("foo:1234", "an error occurred"); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(std::string("foo:1234"), description.getAddress()); +} + +TEST_F(ServerDescriptionTestFixture, ShouldStoreCorrectDefaultValuesOnSuccess) { + auto response = IsMasterOutcome("foo:1234", kBsonOk, mongo::Milliseconds(40)); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(boost::none, description.getError()); + ASSERT_EQUALS(boost::none, description.getLastWriteDate()); + ASSERT_EQUALS(0, description.getMinWireVersion()); + ASSERT_EQUALS(0, description.getMaxWireVersion()); + ASSERT_EQUALS(boost::none, description.getMe()); + ASSERT_EQUALS(static_cast<size_t>(0), description.getHosts().size()); + ASSERT_EQUALS(static_cast<size_t>(0), description.getPassives().size()); + ASSERT_EQUALS(static_cast<size_t>(0), description.getTags().size()); + ASSERT_EQUALS(boost::none, description.getSetName()); + ASSERT_EQUALS(boost::none, description.getSetVersion()); + ASSERT_EQUALS(boost::none, description.getElectionId()); + ASSERT_EQUALS(boost::none, description.getPrimary()); + ASSERT_EQUALS(boost::none, description.getLogicalSessionTimeoutMinutes()); +} + + +TEST_F(ServerDescriptionTestFixture, ShouldStoreCorrectDefaultValuesOnFailure) { + auto response = IsMasterOutcome("foo:1234", "an error occurred"); + auto description = ServerDescription(clockSource, response); + ASSERT_EQUALS(boost::none, description.getLastWriteDate()); + ASSERT_EQUALS(ServerType::kUnknown, description.getType()); + ASSERT_EQUALS(0, description.getMinWireVersion()); + ASSERT_EQUALS(0, description.getMaxWireVersion()); + ASSERT_EQUALS(boost::none, description.getMe()); + ASSERT_EQUALS(static_cast<size_t>(0), description.getHosts().size()); + ASSERT_EQUALS(static_cast<size_t>(0), description.getPassives().size()); + ASSERT_EQUALS(static_cast<size_t>(0), description.getTags().size()); + ASSERT_EQUALS(boost::none, description.getSetName()); + ASSERT_EQUALS(boost::none, description.getSetVersion()); + ASSERT_EQUALS(boost::none, description.getElectionId()); + ASSERT_EQUALS(boost::none, description.getPrimary()); + ASSERT_EQUALS(boost::none, description.getLogicalSessionTimeoutMinutes()); +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_description.cpp b/src/mongo/client/sdam/topology_description.cpp new file mode 100644 index 00000000000..45f6491c466 --- /dev/null +++ b/src/mongo/client/sdam/topology_description.cpp @@ -0,0 +1,282 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/sdam/topology_description.h" + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/client/sdam/server_description.h" +#include "mongo/db/wire_version.h" +#include "mongo/util/log.h" + +namespace mongo::sdam { +//////////////////////// +// TopologyDescription +//////////////////////// +TopologyDescription::TopologyDescription(SdamConfiguration config) + : _type(config.getInitialType()), _setName(config.getSetName()) { + if (auto seeds = config.getSeedList()) { + _servers.clear(); + for (auto address : *seeds) { + _servers.push_back(std::make_shared<ServerDescription>(address)); + } + } +} + +const UUID& TopologyDescription::getId() const { + return _id; +} + +TopologyType TopologyDescription::getType() const { + return _type; +} + +const boost::optional<std::string>& TopologyDescription::getSetName() const { + return _setName; +} + +const boost::optional<int>& TopologyDescription::getMaxSetVersion() const { + return _maxSetVersion; +} + +const boost::optional<OID>& TopologyDescription::getMaxElectionId() const { + return _maxElectionId; +} + +const std::vector<ServerDescriptionPtr>& TopologyDescription::getServers() const { + return _servers; +} + +bool TopologyDescription::isWireVersionCompatible() const { + return _compatible; +} + +const boost::optional<std::string>& TopologyDescription::getWireVersionCompatibleError() const { + return _compatibleError; +} + +const boost::optional<int>& TopologyDescription::getLogicalSessionTimeoutMinutes() const { + return _logicalSessionTimeoutMinutes; +} + +void TopologyDescription::setType(TopologyType type) { + _type = type; +} + +bool TopologyDescription::containsServerAddress(const ServerAddress& address) const { + return findServerByAddress(address) != boost::none; +} + +std::vector<ServerDescriptionPtr> TopologyDescription::findServers( + std::function<bool(const ServerDescriptionPtr&)> predicate) const { + std::vector<ServerDescriptionPtr> result; + std::copy_if(_servers.begin(), _servers.end(), std::back_inserter(result), predicate); + return result; +} + +const boost::optional<ServerDescriptionPtr> TopologyDescription::findServerByAddress( + ServerAddress address) const { + auto results = findServers([address](const ServerDescriptionPtr& serverDescription) { + return serverDescription->getAddress() == address; + }); + return (results.size() > 0) ? boost::make_optional(results.front()) : boost::none; +} + +boost::optional<ServerDescriptionPtr> TopologyDescription::installServerDescription( + const ServerDescriptionPtr& newServerDescription) { + boost::optional<ServerDescriptionPtr> previousDescription; + if (getType() == TopologyType::kSingle) { + // For Single, there is always one ServerDescription in TopologyDescription.servers; + // the ServerDescription in TopologyDescription.servers MUST be replaced with the new + // ServerDescription. + invariant(_servers.size() == 1); + previousDescription = _servers[0]; + _servers[0] = std::shared_ptr<ServerDescription>(newServerDescription); + } else { + for (auto it = _servers.begin(); it != _servers.end(); ++it) { + const auto& currentDescription = *it; + if (currentDescription->getAddress() == newServerDescription->getAddress()) { + previousDescription = *it; + *it = std::shared_ptr<ServerDescription>(newServerDescription); + break; + } + } + + if (!previousDescription) { + _servers.push_back(std::shared_ptr<ServerDescription>(newServerDescription)); + } + } + + checkWireCompatibilityVersions(); + calculateLogicalSessionTimeout(); + return previousDescription; +} + +void TopologyDescription::removeServerDescription(const ServerAddress& serverAddress) { + auto it = std::find_if( + _servers.begin(), _servers.end(), [serverAddress](const ServerDescriptionPtr& description) { + return description->getAddress() == serverAddress; + }); + if (it != _servers.end()) { + _servers.erase(it); + } +} + +void TopologyDescription::checkWireCompatibilityVersions() { + const WireVersionInfo supportedWireVersion = WireSpec::instance().outgoing; + std::ostringstream errorOss; + + _compatible = true; + for (const auto& serverDescription : _servers) { + if (serverDescription->getType() == ServerType::kUnknown) { + continue; + } + + if (serverDescription->getMinWireVersion() > supportedWireVersion.maxWireVersion) { + _compatible = false; + errorOss << "Server at " << serverDescription->getAddress() << " requires wire version " + << serverDescription->getMinWireVersion() + << " but this version of mongo only supports up to " + << supportedWireVersion.maxWireVersion << "."; + break; + } else if (serverDescription->getMaxWireVersion() < supportedWireVersion.minWireVersion) { + _compatible = false; + const auto& mongoVersion = + minimumRequiredMongoVersionString(supportedWireVersion.minWireVersion); + errorOss << "Server at " << serverDescription->getAddress() << " requires wire version " + << serverDescription->getMaxWireVersion() + << " but this version of mongo requires at least " + << supportedWireVersion.minWireVersion << " (MongoDB " << mongoVersion << ")."; + break; + } + } + + _compatibleError = (_compatible) ? boost::none : boost::make_optional(errorOss.str()); +} + +const std::string TopologyDescription::minimumRequiredMongoVersionString(int version) { + switch (version) { + case PLACEHOLDER_FOR_44: + return "4.4"; + case SHARDED_TRANSACTIONS: + return "4.2"; + case REPLICA_SET_TRANSACTIONS: + return "4.0"; + case SUPPORTS_OP_MSG: + return "3.6"; + case COMMANDS_ACCEPT_WRITE_CONCERN: + return "3.4"; + case BATCH_COMMANDS: + return "3.2"; + case FIND_COMMAND: + return "3.2"; + case RELEASE_2_7_7: + return "3.0"; + case AGG_RETURNS_CURSORS: + return "2.6"; + case RELEASE_2_4_AND_BEFORE: + return "2.4"; + default: + MONGO_UNREACHABLE; + } +} + +void TopologyDescription::calculateLogicalSessionTimeout() { + int min = INT_MAX; + bool foundNone = false; + bool hasDataBearingServer = false; + + invariant(_servers.size() > 0); + for (auto description : _servers) { + if (!description->isDataBearingServer()) { + continue; + } + hasDataBearingServer = true; + + auto logicalSessionTimeout = description->getLogicalSessionTimeoutMinutes(); + if (!logicalSessionTimeout) { + foundNone = true; + break; + } + min = std::min(*logicalSessionTimeout, min); + } + _logicalSessionTimeoutMinutes = + (foundNone || !hasDataBearingServer) ? boost::none : boost::make_optional(min); +} + + +//////////////////////// +// SdamConfiguration +//////////////////////// +SdamConfiguration::SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList, + TopologyType initialType, + mongo::Milliseconds heartBeatFrequencyMs, + boost::optional<std::string> setName) + : _seedList(seedList), + _initialType(initialType), + _heartBeatFrequencyMs(heartBeatFrequencyMs), + _setName(setName) { + uassert(ErrorCodes::InvalidSeedList, + "seed list size must be >= 1", + !seedList || (*seedList).size() >= 1); + + uassert(ErrorCodes::InvalidSeedList, + "TopologyType Single must have exactly one entry in the seed list.", + _initialType != TopologyType::kSingle || (*seedList).size() == 1); + + uassert( + ErrorCodes::InvalidTopologyType, + "Only ToplogyTypes ReplicaSetNoPrimary and Single are allowed when a setName is provided.", + !_setName || + (_initialType == TopologyType::kReplicaSetNoPrimary || + _initialType == TopologyType::kSingle)); + + uassert(ErrorCodes::TopologySetNameRequired, + "setName is required for ReplicaSetNoPrimary", + _initialType != TopologyType::kReplicaSetNoPrimary || _setName); + + uassert(ErrorCodes::InvalidHeartBeatFrequency, + "topology heartbeat must be >= 500ms", + _heartBeatFrequencyMs >= kMinHeartbeatFrequencyMS); +} + +const boost::optional<std::vector<ServerAddress>>& SdamConfiguration::getSeedList() const { + return _seedList; +} + +TopologyType SdamConfiguration::getInitialType() const { + return _initialType; +} + +Milliseconds SdamConfiguration::getHeartBeatFrequency() const { + return _heartBeatFrequencyMs; +} + +const boost::optional<std::string>& SdamConfiguration::getSetName() const { + return _setName; +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_description.h b/src/mongo/client/sdam/topology_description.h new file mode 100644 index 00000000000..b39bd60663b --- /dev/null +++ b/src/mongo/client/sdam/topology_description.h @@ -0,0 +1,191 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once +#include <memory> +#include <string> +#include <unordered_set> + +#include "boost/optional/optional.hpp" + +#include "mongo/bson/oid.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/client/sdam/server_description.h" +#include "mongo/platform/basic.h" + +namespace mongo::sdam { +class SdamConfiguration { +public: + SdamConfiguration() : SdamConfiguration(boost::none){}; + + /** + * Initialize the TopologyDescription. This constructor may uassert if the provided + * configuration options are not valid according to the Server Discovery & Monitoring Spec. + * + * Initial Servers + * initial servers may be set to a seed list of one or more server addresses. + * + * Initial TopologyType + * The initial TopologyType may be set to Single, Unknown, or ReplicaSetNoPrimary. + * + * Initial setName + * The client's initial replica set name is required in order to initially configure the + * topology type as ReplicaSetNoPrimary. + * + * Allowed configuration combinations + * TopologyType Single cannot be used with multiple seeds. + * If setName is not null, only TopologyType ReplicaSetNoPrimary and Single, are + * allowed. + */ + SdamConfiguration(boost::optional<std::vector<ServerAddress>> seedList, + TopologyType initialType = TopologyType::kUnknown, + mongo::Milliseconds heartBeatFrequencyMs = kDefaultHeartbeatFrequencyMs, + boost::optional<std::string> setName = boost::none); + + const boost::optional<std::vector<ServerAddress>>& getSeedList() const; + TopologyType getInitialType() const; + Milliseconds getHeartBeatFrequency() const; + const boost::optional<std::string>& getSetName() const; + + static inline const mongo::Milliseconds kDefaultHeartbeatFrequencyMs = mongo::Seconds(10); + static inline const mongo::Milliseconds kMinHeartbeatFrequencyMS = mongo::Milliseconds(500); + +private: + boost::optional<std::vector<ServerAddress>> _seedList; + TopologyType _initialType; + mongo::Milliseconds _heartBeatFrequencyMs; + boost::optional<std::string> _setName; +}; + +class TopologyDescription { +public: + TopologyDescription() : TopologyDescription(SdamConfiguration()) {} + TopologyDescription(const TopologyDescription& source) = default; + + /** + * Initialize the TopologyDescription with the given configuration. + */ + TopologyDescription(SdamConfiguration config); + + const UUID& getId() const; + TopologyType getType() const; + const boost::optional<std::string>& getSetName() const; + + const boost::optional<int>& getMaxSetVersion() const; + const boost::optional<OID>& getMaxElectionId() const; + + const std::vector<ServerDescriptionPtr>& getServers() const; + + bool isWireVersionCompatible() const; + const boost::optional<std::string>& getWireVersionCompatibleError() const; + + const boost::optional<int>& getLogicalSessionTimeoutMinutes() const; + const Milliseconds& getHeartBeatFrequency() const; + + const boost::optional<ServerDescriptionPtr> findServerByAddress(ServerAddress address) const; + bool containsServerAddress(const ServerAddress& address) const; + std::vector<ServerDescriptionPtr> findServers( + std::function<bool(const ServerDescriptionPtr&)> predicate) const; + + /** + * Adds the given ServerDescription or swaps it with an existing one + * using the description's ServerAddress as the lookup key. If present, the previous server + * description is returned. + */ + boost::optional<ServerDescriptionPtr> installServerDescription( + const ServerDescriptionPtr& newServerDescription); + void removeServerDescription(const ServerAddress& serverAddress); + + void setType(TopologyType type); + +private: + /** + * Checks if all server descriptions are compatible with this server's WireVersion. If an + * incompatible description is found, we set the topologyDescription's _compatible flag to false + * and store an error message in _compatibleError. A ServerDescription which is not Unknown is + * incompatible if: + * minWireVersion > serverMaxWireVersion, or maxWireVersion < serverMinWireVersion + */ + void checkWireCompatibilityVersions(); + + /** + * Used in error string for wire compatibility check. + * + * Source: + * https://github.com/mongodb/specifications/blob/master/source/wireversion-featurelist.rst + */ + const std::string minimumRequiredMongoVersionString(int version); + + /** + * From Server Discovery and Monitoring: + * Updates the TopologyDescription.logicalSessionTimeoutMinutes to the smallest + * logicalSessionTimeoutMinutes value among ServerDescriptions of all data-bearing server types. + * If any have a null logicalSessionTimeoutMinutes, then + * TopologyDescription.logicalSessionTimeoutMinutes is set to null. + * + * https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#logical-session-timeout + */ + void calculateLogicalSessionTimeout(); + + // unique id for this topology + UUID _id = UUID::gen(); + + // a TopologyType enum value. + TopologyType _type = TopologyType::kUnknown; + + // setName: the replica set name. Default null. + boost::optional<std::string> _setName; + + // maxSetVersion: an integer or null. The largest setVersion ever reported by a primary. + // Default null. + boost::optional<int> _maxSetVersion; + + // maxElectionId: an ObjectId or null. The largest electionId ever reported by a primary. + // Default null. + boost::optional<OID> _maxElectionId; + + // servers: a set of ServerDescription instances. Default contains one server: + // "localhost:27017", ServerType Unknown. + std::vector<ServerDescriptionPtr> _servers{ + std::make_shared<ServerDescription>("localhost:27017")}; + + // compatible: a boolean. False if any server's wire protocol version range is incompatible with + // the client's. Default true. + bool _compatible = true; + + // compatibilityError: a string. The error message if "compatible" is false, otherwise null. + boost::optional<std::string> _compatibleError; + + // logicalSessionTimeoutMinutes: integer or null. Default null. + boost::optional<int> _logicalSessionTimeoutMinutes; + + friend class TopologyStateMachine; +}; +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_description_test.cpp b/src/mongo/client/sdam/topology_description_test.cpp new file mode 100644 index 00000000000..68c18e0f560 --- /dev/null +++ b/src/mongo/client/sdam/topology_description_test.cpp @@ -0,0 +1,298 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/sdam/sdam_test_base.h" +#include "mongo/client/sdam/topology_description.h" + +#include <boost/optional/optional_io.hpp> + +#include "mongo/client/sdam/server_description.h" +#include "mongo/client/sdam/server_description_builder.h" +#include "mongo/db/wire_version.h" +#include "mongo/unittest/death_test.h" + +namespace mongo { +template std::ostream& operator<<(std::ostream& os, + const std::vector<mongo::sdam::ServerAddress>& s); + +namespace sdam { +using mongo::operator<<; + +class TopologyDescriptionTestFixture : public SdamTestFixture { +protected: + void assertDefaultConfig(const TopologyDescription& topologyDescription); + + static inline const auto kSetName = std::string("mySetName"); + + static inline const std::vector<ServerAddress> kOneServer{"foo:1234"}; + static inline const std::vector<ServerAddress> kTwoServersVaryCase{"FoO:1234", "BaR:1234"}; + static inline const std::vector<ServerAddress> kTwoServersNormalCase{"foo:1234", "bar:1234"}; + static inline const std::vector<ServerAddress> kThreeServers{ + "foo:1234", "bar:1234", "baz:1234"}; + + static inline const auto kDefaultConfig = SdamConfiguration(); + static inline const auto kSingleSeedConfig = + SdamConfiguration(kOneServer, TopologyType::kSingle); +}; + +void TopologyDescriptionTestFixture::assertDefaultConfig( + const TopologyDescription& topologyDescription) { + ASSERT_EQUALS(boost::none, topologyDescription.getSetName()); + ASSERT_EQUALS(boost::none, topologyDescription.getMaxElectionId()); + + auto expectedDefaultServer = ServerDescription("localhost:27017"); + ASSERT_EQUALS(expectedDefaultServer, *topologyDescription.getServers().front()); + ASSERT_EQUALS(static_cast<std::size_t>(1), topologyDescription.getServers().size()); + + ASSERT_EQUALS(true, topologyDescription.isWireVersionCompatible()); + ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + ASSERT_EQUALS(boost::none, topologyDescription.getLogicalSessionTimeoutMinutes()); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldHaveCorrectDefaultValues) { + assertDefaultConfig(TopologyDescription(kDefaultConfig)); + assertDefaultConfig(TopologyDescription()); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldNormalizeInitialSeedList) { + auto config = SdamConfiguration(kTwoServersVaryCase); + TopologyDescription topologyDescription(config); + + auto expectedAddresses = kTwoServersNormalCase; + + auto serverAddresses = map<ServerDescriptionPtr, ServerAddress>( + topologyDescription.getServers(), + [](const ServerDescriptionPtr& description) { return description->getAddress(); }); + + ASSERT_EQUALS(expectedAddresses, serverAddresses); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldAllowTypeSingleWithASingleSeed) { + TopologyDescription topologyDescription(kSingleSeedConfig); + + ASSERT(TopologyType::kSingle == topologyDescription.getType()); + + auto servers = map<ServerDescriptionPtr, ServerAddress>( + topologyDescription.getServers(), + [](const ServerDescriptionPtr& desc) { return desc->getAddress(); }); + ASSERT_EQUALS(kOneServer, servers); +} + +TEST_F(TopologyDescriptionTestFixture, DoesNotAllowMultipleSeedsWithSingle) { + ASSERT_THROWS_CODE( + { + auto config = SdamConfiguration(kTwoServersNormalCase, TopologyType::kSingle); + TopologyDescription topologyDescription(config); + }, + DBException, + ErrorCodes::InvalidSeedList); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldSetTheReplicaSetName) { + auto expectedSetName = kSetName; + auto config = SdamConfiguration( + kOneServer, TopologyType::kReplicaSetNoPrimary, mongo::Seconds(10), expectedSetName); + TopologyDescription topologyDescription(config); + ASSERT_EQUALS(expectedSetName, *topologyDescription.getSetName()); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowSettingTheReplicaSetNameWithWrongType) { + ASSERT_THROWS_CODE( + { + auto config = + SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10), kSetName); + TopologyDescription topologyDescription(config); + }, + DBException, + ErrorCodes::InvalidTopologyType); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowTopologyTypeRSNoPrimaryWithoutSetName) { + ASSERT_THROWS_CODE( + { + SdamConfiguration( + kOneServer, TopologyType::kReplicaSetNoPrimary, mongo::Seconds(10), boost::none); + }, + DBException, + ErrorCodes::TopologySetNameRequired); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldOnlyAllowSingleAndRsNoPrimaryWithSetName) { + auto topologyTypes = allTopologyTypes(); + topologyTypes.erase(std::remove_if(topologyTypes.begin(), + topologyTypes.end(), + [](const TopologyType& topologyType) { + return topologyType == TopologyType::kSingle || + topologyType == TopologyType::kReplicaSetNoPrimary; + }), + topologyTypes.end()); + + for (const auto topologyType : topologyTypes) { + ASSERT_THROWS_CODE( + { + std::cout << "Check TopologyType " << toString(topologyType) + << " with setName value." << std::endl; + auto config = + SdamConfiguration(kOneServer, topologyType, mongo::Seconds(10), kSetName); + // This is here to ensure that the compiler actually generates code for the above + // statement. + std::cout << "Test failed for topologyType " << config.getInitialType() + << std::endl; + MONGO_UNREACHABLE; + }, + DBException, + ErrorCodes::InvalidTopologyType); + } +} + +TEST_F(TopologyDescriptionTestFixture, ShouldDefaultHeartbeatToTenSecs) { + SdamConfiguration config; + ASSERT_EQUALS(SdamConfiguration::kDefaultHeartbeatFrequencyMs, config.getHeartBeatFrequency()); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldAllowSettingTheHeartbeatFrequency) { + const auto expectedHeartbeatFrequency = mongo::Milliseconds(20000); + SdamConfiguration config(boost::none, TopologyType::kUnknown, expectedHeartbeatFrequency); + ASSERT_EQUALS(expectedHeartbeatFrequency, config.getHeartBeatFrequency()); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldNotAllowChangingTheHeartbeatFrequencyBelow500Ms) { + auto belowThresholdFrequency = + mongo::Milliseconds(SdamConfiguration::kMinHeartbeatFrequencyMS.count() - 1); + ASSERT_THROWS_CODE( + { SdamConfiguration config(boost::none, TopologyType::kUnknown, belowThresholdFrequency); }, + DBException, + ErrorCodes::InvalidHeartBeatFrequency); +} + +TEST_F(TopologyDescriptionTestFixture, + ShouldSetWireCompatibilityErrorForMinWireVersionWhenMinWireVersionIsGreater) { + const auto outgoingMaxWireVersion = WireSpec::instance().outgoing.maxWireVersion; + const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10)); + TopologyDescription topologyDescription(config); + const auto serverDescriptionMinVersion = ServerDescriptionBuilder() + .withAddress(kOneServer[0]) + .withMe(kOneServer[0]) + .withType(ServerType::kRSSecondary) + .withMinWireVersion(outgoingMaxWireVersion + 1) + .instance(); + + ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + topologyDescription.installServerDescription(serverDescriptionMinVersion); + ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); +} + +TEST_F(TopologyDescriptionTestFixture, + ShouldSetWireCompatibilityErrorForMinWireVersionWhenMaxWireVersionIsLess) { + const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion; + const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10)); + TopologyDescription topologyDescription(config); + const auto serverDescriptionMaxVersion = ServerDescriptionBuilder() + .withAddress(kOneServer[0]) + .withMe(kOneServer[0]) + .withType(ServerType::kRSSecondary) + .withMaxWireVersion(outgoingMinWireVersion - 1) + .instance(); + + ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + topologyDescription.installServerDescription(serverDescriptionMaxVersion); + ASSERT_NOT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldNotSetWireCompatibilityErrorWhenServerTypeIsUnknown) { + const auto outgoingMinWireVersion = WireSpec::instance().outgoing.minWireVersion; + const auto config = SdamConfiguration(kOneServer, TopologyType::kUnknown, mongo::Seconds(10)); + TopologyDescription topologyDescription(config); + const auto serverDescriptionMaxVersion = + ServerDescriptionBuilder().withMaxWireVersion(outgoingMinWireVersion - 1).instance(); + + ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); + topologyDescription.installServerDescription(serverDescriptionMaxVersion); + ASSERT_EQUALS(boost::none, topologyDescription.getWireVersionCompatibleError()); +} + +TEST_F(TopologyDescriptionTestFixture, ShouldSetLogicalSessionTimeoutToMinOfAllServerDescriptions) { + const auto config = SdamConfiguration(kThreeServers); + TopologyDescription topologyDescription(config); + + const auto logicalSessionTimeouts = std::vector{300, 100, 200}; + auto timeoutIt = logicalSessionTimeouts.begin(); + const auto serverDescriptionsWithTimeouts = map<ServerDescriptionPtr, ServerDescriptionPtr>( + topologyDescription.getServers(), [&timeoutIt](const ServerDescriptionPtr& description) { + auto newInstanceBuilder = ServerDescriptionBuilder() + .withType(ServerType::kRSSecondary) + .withAddress(description->getAddress()) + .withMe(description->getAddress()) + .withLogicalSessionTimeoutMinutes(*timeoutIt); + timeoutIt++; + return newInstanceBuilder.instance(); + }); + + for (auto description : serverDescriptionsWithTimeouts) { + topologyDescription.installServerDescription(description); + } + + int expectedLogicalSessionTimeout = + *std::min_element(logicalSessionTimeouts.begin(), logicalSessionTimeouts.end()); + ASSERT_EQUALS(expectedLogicalSessionTimeout, + topologyDescription.getLogicalSessionTimeoutMinutes()); +} + + +TEST_F(TopologyDescriptionTestFixture, + ShouldSetLogicalSessionTimeoutToNoneIfAnyServerDescriptionHasNone) { + const auto config = SdamConfiguration(kThreeServers); + TopologyDescription topologyDescription(config); + + const auto logicalSessionTimeouts = std::vector{300, 100, 200}; + auto timeoutIt = logicalSessionTimeouts.begin(); + + const auto serverDescriptionsWithTimeouts = map<ServerDescriptionPtr, ServerDescriptionPtr>( + topologyDescription.getServers(), [&](const ServerDescriptionPtr& description) { + auto timeoutValue = (timeoutIt == logicalSessionTimeouts.begin()) + ? boost::none + : boost::make_optional(*timeoutIt); + + auto newInstance = ServerDescriptionBuilder() + .withType(ServerType::kRSSecondary) + .withAddress(description->getAddress()) + .withMe(description->getAddress()) + .withLogicalSessionTimeoutMinutes(timeoutValue) + .instance(); + ++timeoutIt; + return newInstance; + }); + + for (auto description : serverDescriptionsWithTimeouts) { + topologyDescription.installServerDescription(description); + } + + ASSERT_EQUALS(boost::none, topologyDescription.getLogicalSessionTimeoutMinutes()); +} +}; // namespace sdam +}; // namespace mongo diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp new file mode 100644 index 00000000000..c74190f61fb --- /dev/null +++ b/src/mongo/client/sdam/topology_manager.cpp @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/sdam/topology_manager.h" + +#include "mongo/client/sdam/topology_state_machine.h" + +namespace mongo::sdam { +TopologyManager::TopologyManager(SdamConfiguration config, ClockSource* clockSource) + : _config(std::move(config)), + _clockSource(clockSource), + _topologyDescription(std::make_unique<TopologyDescription>(_config)), + _topologyStateMachine(std::make_unique<TopologyStateMachine>(_config)) {} + +void TopologyManager::onServerDescription(const IsMasterOutcome& isMasterOutcome) { + stdx::lock_guard<mongo::Mutex> lock(_mutex); + + const auto& lastServerDescription = + _topologyDescription->findServerByAddress(isMasterOutcome.getServer()); + boost::optional<IsMasterRTT> lastRTT = + (lastServerDescription) ? (*lastServerDescription)->getRtt() : boost::none; + + auto newServerDescription = + std::make_shared<ServerDescription>(_clockSource, isMasterOutcome, lastRTT); + + auto newTopologyDescription = std::make_unique<TopologyDescription>(*_topologyDescription); + _topologyStateMachine->onServerDescription(*newTopologyDescription, newServerDescription); + _topologyDescription = std::move(newTopologyDescription); +} + +const std::shared_ptr<TopologyDescription> TopologyManager::getTopologyDescription() const { + stdx::lock_guard<mongo::Mutex> lock(_mutex); + return _topologyDescription; +} +}; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_manager.h b/src/mongo/client/sdam/topology_manager.h new file mode 100644 index 00000000000..f52729b0946 --- /dev/null +++ b/src/mongo/client/sdam/topology_manager.h @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once +#include <memory> + +#include "mongo/client/sdam/sdam_datatypes.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/client/sdam/topology_state_machine.h" + +namespace mongo::sdam { +/** + * This class serves as the public interface to the functionality described in the Service Discovery + * and Monitoring spec: + * https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst + */ +class TopologyManager { + TopologyManager() = delete; + TopologyManager(const TopologyManager&) = delete; + +public: + TopologyManager(SdamConfiguration config, ClockSource* clockSource); + + /** + * This function atomically: + * 1. Clones the current TopologyDescription + * 2. Executes the state machine logic given the cloned TopologyDescription and provided + * IsMasterOutcome (containing the new ServerDescription). + * 3. Installs the cloned (and possibly modified) TopologyDescription as the current one. + * + * Multiple threads may call this function concurrently. However, the manager will process the + * IsMasterOutcomes serially, as required by: + * https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#process-one-ismaster-outcome-at-a-time + */ + void onServerDescription(const IsMasterOutcome& isMasterOutcome); + + /** + * Get the current TopologyDescription. This is safe to call from multiple threads. + */ + const TopologyDescriptionPtr getTopologyDescription() const; + +private: + mutable mongo::Mutex _mutex = mongo::Mutex(StringData("TopologyManager")); + const SdamConfiguration _config; + ClockSource* _clockSource; + std::shared_ptr<TopologyDescription> _topologyDescription; + std::unique_ptr<TopologyStateMachine> _topologyStateMachine; +}; +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_state_machine.cpp b/src/mongo/client/sdam/topology_state_machine.cpp new file mode 100644 index 00000000000..91224427796 --- /dev/null +++ b/src/mongo/client/sdam/topology_state_machine.cpp @@ -0,0 +1,397 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/client/sdam/topology_state_machine.h" + +#include <functional> +#include <ostream> + +#include "mongo/client/sdam/sdam_test_base.h" +#include "mongo/util/log.h" + +namespace mongo::sdam { +TopologyStateMachine::TopologyStateMachine(const SdamConfiguration& config) : _config(config) { + initTransitionTable(); +} + +// This is used to make the syntax in initTransitionTable less verbose. +// Since we have enum class for TopologyType and ServerType there are no implicit int conversions. +template <typename T> +inline int idx(T enumType) { + return static_cast<int>(enumType); +} + +/** + * This function encodes the transition table specified in + * https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#topologytype-table + */ +void mongo::sdam::TopologyStateMachine::initTransitionTable() { + using namespace std::placeholders; + + // init the table to No-ops + const TransitionAction NO_OP([](const TopologyDescription&, const ServerDescriptionPtr&) {}); + _stt.resize(allTopologyTypes().size() + 1); + for (auto& row : _stt) { + row.resize(allServerTypes().size() + 1, NO_OP); + } + + // From TopologyType: Unknown + _stt[idx(TopologyType::kUnknown)][idx(ServerType::kStandalone)] = + std::bind(&TopologyStateMachine::updateUnknownWithStandalone, this, _1, _2); + _stt[idx(TopologyType::kUnknown)][idx(ServerType::kMongos)] = + setTopologyTypeAction(TopologyType::kSharded); + _stt[idx(TopologyType::kUnknown)][idx(ServerType::kRSPrimary)] = + setTopologyTypeAndUpdateRSFromPrimary(TopologyType::kReplicaSetWithPrimary); + + { + const auto serverTypes = std::vector<ServerType>{ + ServerType::kRSSecondary, ServerType::kRSArbiter, ServerType::kRSOther}; + for (auto newServerType : serverTypes) { + _stt[idx(TopologyType::kUnknown)][idx(newServerType)] = std::bind( + &TopologyStateMachine::setTopologyTypeAndUpdateRSWithoutPrimary, this, _1, _2); + } + } + + // From TopologyType: Sharded + { + const auto serverTypes = std::vector<ServerType>{ServerType::kStandalone, + ServerType::kRSPrimary, + ServerType::kRSSecondary, + ServerType::kRSArbiter, + ServerType::kRSOther, + ServerType::kRSGhost}; + for (auto newServerType : serverTypes) { + _stt[idx(TopologyType::kSharded)][idx(newServerType)] = + std::bind(&TopologyStateMachine::removeAndStopMonitoring, this, _1, _2); + } + } + + // From TopologyType: ReplicaSetNoPrimary + { + const auto serverTypes = + std::vector<ServerType>{ServerType::kStandalone, ServerType::kMongos}; + for (auto serverType : serverTypes) { + _stt[idx(TopologyType::kReplicaSetNoPrimary)][idx(serverType)] = + std::bind(&TopologyStateMachine::removeAndStopMonitoring, this, _1, _2); + } + } + + _stt[idx(TopologyType::kReplicaSetNoPrimary)][idx(ServerType::kRSPrimary)] = + setTopologyTypeAndUpdateRSFromPrimary(TopologyType::kReplicaSetWithPrimary); + + { + const auto serverTypes = std::vector<ServerType>{ + ServerType::kRSSecondary, ServerType::kRSArbiter, ServerType::kRSOther}; + for (auto serverType : serverTypes) { + _stt[idx(TopologyType::kReplicaSetNoPrimary)][idx(serverType)] = + std::bind(&TopologyStateMachine::updateRSWithoutPrimary, this, _1, _2); + } + } + + // From TopologyType: ReplicaSetWithPrimary + { + const auto serverTypes = + std::vector<ServerType>{ServerType::kUnknown, ServerType::kRSGhost}; + for (auto serverType : serverTypes) { + _stt[idx(TopologyType::kReplicaSetWithPrimary)][idx(serverType)] = + std::bind(&TopologyStateMachine::checkIfHasPrimary, this, _1, _2); + } + } + + { + const auto serverTypes = + std::vector<ServerType>{ServerType::kStandalone, ServerType::kMongos}; + for (auto serverType : serverTypes) { + _stt[idx(TopologyType::kReplicaSetWithPrimary)][idx(serverType)] = + std::bind(&TopologyStateMachine::removeAndCheckIfHasPrimary, this, _1, _2); + } + } + + _stt[idx(TopologyType::kReplicaSetWithPrimary)][idx(ServerType::kRSPrimary)] = + std::bind(&TopologyStateMachine::updateRSFromPrimary, this, _1, _2); + + { + const auto serverTypes = std::vector<ServerType>{ + ServerType::kRSSecondary, ServerType::kRSArbiter, ServerType::kRSOther}; + for (auto serverType : serverTypes) { + _stt[idx(TopologyType::kReplicaSetWithPrimary)][idx(serverType)] = + std::bind(&TopologyStateMachine::updateRSWithPrimaryFromMember, this, _1, _2); + } + } +} + +void TopologyStateMachine::onServerDescription(TopologyDescription& topologyDescription, + const ServerDescriptionPtr& serverDescription) { + if (!topologyDescription.containsServerAddress(serverDescription->getAddress())) { + LOG(0) << kLogPrefix << "ignoring ismaster reply from server that is not in the topology: " + << serverDescription->getAddress() << std::endl; + return; + } + + installServerDescription(topologyDescription, serverDescription, false); + + if (topologyDescription.getType() != TopologyType::kSingle) { + auto& action = _stt[idx(topologyDescription.getType())][idx(serverDescription->getType())]; + action(topologyDescription, serverDescription); + } +} + +void TopologyStateMachine::updateUnknownWithStandalone( + TopologyDescription& topologyDescription, const ServerDescriptionPtr& serverDescription) { + if (!topologyDescription.containsServerAddress(serverDescription->getAddress())) + return; + + if (_config.getSeedList() && (*_config.getSeedList()).size() == 1) { + modifyTopologyType(topologyDescription, TopologyType::kSingle); + } else { + removeServerDescription(topologyDescription, serverDescription->getAddress()); + } +} + +void TopologyStateMachine::updateRSWithoutPrimary(TopologyDescription& topologyDescription, + const ServerDescriptionPtr& serverDescription) { + const auto& serverDescAddress = serverDescription->getAddress(); + + if (!topologyDescription.containsServerAddress(serverDescAddress)) + return; + + const auto& currentSetName = topologyDescription.getSetName(); + const auto& serverDescSetName = serverDescription->getSetName(); + if (currentSetName == boost::none) { + modifySetName(topologyDescription, serverDescSetName); + } else if (currentSetName != serverDescSetName) { + removeServerDescription(topologyDescription, serverDescription->getAddress()); + return; + } + + addUnknownServers(topologyDescription, serverDescription); + + if (serverDescAddress != serverDescription->getMe()) { + removeServerDescription(topologyDescription, serverDescription->getAddress()); + } +} + +void TopologyStateMachine::addUnknownServers(TopologyDescription& topologyDescription, + const ServerDescriptionPtr& serverDescription) { + const std::set<ServerAddress>* addressSets[3]{&serverDescription->getHosts(), + &serverDescription->getPassives(), + &serverDescription->getArbiters()}; + for (const auto addresses : addressSets) { + for (const auto& addressFromSet : *addresses) { + if (!topologyDescription.containsServerAddress(addressFromSet)) { + installServerDescription( + topologyDescription, std::make_shared<ServerDescription>(addressFromSet), true); + } + } + } +} + +void TopologyStateMachine::updateRSWithPrimaryFromMember( + TopologyDescription& topologyDescription, const ServerDescriptionPtr& serverDescription) { + const auto& serverDescAddress = serverDescription->getAddress(); + if (!topologyDescription.containsServerAddress(serverDescAddress)) { + return; + } + + invariant(serverDescription->getSetName() != boost::none); + if (topologyDescription.getSetName() != serverDescription->getSetName()) { + removeAndCheckIfHasPrimary(topologyDescription, serverDescription); + return; + } + + if (serverDescription->getAddress() != serverDescription->getMe()) { + removeAndCheckIfHasPrimary(topologyDescription, serverDescription); + return; + } + + auto primaries = topologyDescription.findServers([](const ServerDescriptionPtr& description) { + return description->getType() == ServerType::kRSPrimary; + }); + if (primaries.size() == 0) { + modifyTopologyType(topologyDescription, TopologyType::kReplicaSetNoPrimary); + } +} + +void TopologyStateMachine::updateRSFromPrimary(TopologyDescription& topologyDescription, + const ServerDescriptionPtr& serverDescription) { + const auto& serverDescAddress = serverDescription->getAddress(); + if (!topologyDescription.containsServerAddress(serverDescAddress)) { + return; + } + + auto topologySetName = topologyDescription.getSetName(); + auto serverDescSetName = serverDescription->getSetName(); + if (!topologySetName && serverDescSetName) { + modifySetName(topologyDescription, serverDescSetName); + } else if (topologySetName != serverDescSetName) { + // We found a primary but it doesn't have the setName + // provided by the user or previously discovered. + removeAndCheckIfHasPrimary(topologyDescription, serverDescription); + return; + } + + auto serverDescSetVersion = serverDescription->getSetVersion(); + auto serverDescElectionId = serverDescription->getElectionId(); + auto topologyMaxSetVersion = topologyDescription.getMaxSetVersion(); + auto topologyMaxElectionId = topologyDescription.getMaxElectionId(); + if (serverDescSetVersion && serverDescElectionId) { + if (topologyMaxSetVersion && topologyMaxElectionId && + ((topologyMaxSetVersion > serverDescSetVersion) || + (topologyMaxSetVersion == serverDescSetVersion && + (*topologyMaxElectionId).compare(*serverDescElectionId) > 0))) { + // stale primary + installServerDescription( + topologyDescription, std::make_shared<ServerDescription>(serverDescAddress), false); + checkIfHasPrimary(topologyDescription, serverDescription); + return; + } + modifyMaxElectionId(topologyDescription, *serverDescription->getElectionId()); + } + + if (serverDescSetVersion && + (!topologyMaxSetVersion || (serverDescSetVersion > topologyMaxSetVersion))) { + modifyMaxSetVersion(topologyDescription, *serverDescSetVersion); + } + + auto oldPrimaries = topologyDescription.findServers( + [serverDescAddress](const ServerDescriptionPtr& description) { + return (description->getAddress() != serverDescAddress && + description->getType() == ServerType::kRSPrimary); + }); + invariant(oldPrimaries.size() <= 1); + for (const auto& server : oldPrimaries) { + installServerDescription( + topologyDescription, std::make_shared<ServerDescription>(server->getAddress()), false); + } + + addUnknownServers(topologyDescription, serverDescription); + for (const auto& currentServerDescription : topologyDescription.getServers()) { + const auto currentServerAddress = currentServerDescription->getAddress(); + auto hosts = serverDescription->getHosts().find(currentServerAddress); + auto passives = serverDescription->getPassives().find(currentServerAddress); + auto arbiters = serverDescription->getArbiters().find(currentServerAddress); + + if (hosts == serverDescription->getHosts().end() && + passives == serverDescription->getPassives().end() && + arbiters == serverDescription->getArbiters().end()) { + removeServerDescription(topologyDescription, currentServerDescription->getAddress()); + } + } + + checkIfHasPrimary(topologyDescription, serverDescription); +} + +void TopologyStateMachine::removeAndStopMonitoring(TopologyDescription& topologyDescription, + const ServerDescriptionPtr& serverDescription) { + removeServerDescription(topologyDescription, serverDescription->getAddress()); +} + +void TopologyStateMachine::checkIfHasPrimary(TopologyDescription& topologyDescription, + const ServerDescriptionPtr& serverDescription) { + auto foundPrimaries = + topologyDescription.findServers([](const ServerDescriptionPtr& description) { + return description->getType() == ServerType::kRSPrimary; + }); + if (foundPrimaries.size() > 0) { + modifyTopologyType(topologyDescription, TopologyType::kReplicaSetWithPrimary); + } else { + modifyTopologyType(topologyDescription, TopologyType::kReplicaSetNoPrimary); + } +} + +void TopologyStateMachine::removeAndCheckIfHasPrimary( + TopologyDescription& topologyDescription, const ServerDescriptionPtr& serverDescription) { + // Since serverDescription is passed by reference, make a copy of the ServerDescription + // shared_ptr so that the underlying pointer is still valid for the call to checkIfHasPrimary. + ServerDescriptionPtr serverDescriptionNoGC(serverDescription); + removeAndStopMonitoring(topologyDescription, serverDescriptionNoGC); + checkIfHasPrimary(topologyDescription, serverDescriptionNoGC); +} + +TransitionAction TopologyStateMachine::setTopologyTypeAction(TopologyType type) { + return [this, type](TopologyDescription& topologyDescription, + const ServerDescriptionPtr& newServerDescription) { + modifyTopologyType(topologyDescription, type); + }; +} + +TransitionAction TopologyStateMachine::setTopologyTypeAndUpdateRSFromPrimary(TopologyType type) { + return [this, type](TopologyDescription& topologyDescription, + const ServerDescriptionPtr& newServerDescription) { + modifyTopologyType(topologyDescription, type); + updateRSFromPrimary(topologyDescription, newServerDescription); + }; +} + +void TopologyStateMachine::setTopologyTypeAndUpdateRSWithoutPrimary( + TopologyDescription& topologyDescription, const ServerDescriptionPtr& serverDescription) { + modifyTopologyType(topologyDescription, TopologyType::kReplicaSetNoPrimary); + updateRSWithoutPrimary(topologyDescription, serverDescription); +} + +void TopologyStateMachine::removeServerDescription(TopologyDescription& topologyDescription, + const ServerAddress serverAddress) { + topologyDescription.removeServerDescription(serverAddress); + LOG(0) << kLogPrefix << "server '" << serverAddress << "' was removed from the topology." + << std::endl; +} + +void TopologyStateMachine::modifyTopologyType(TopologyDescription& topologyDescription, + TopologyType topologyType) { + topologyDescription._type = topologyType; + LOG(0) << kLogPrefix << "the topology type was set to " << toString(topologyType) << std::endl; +} + +void TopologyStateMachine::modifySetName(TopologyDescription& topologyDescription, + const boost::optional<std::string>& setName) { + topologyDescription._setName = setName; + LOG(0) << kLogPrefix << "the topology setName was set to " << ((setName) ? *setName : "[null]") + << std::endl; +} + +void TopologyStateMachine::installServerDescription(TopologyDescription& topologyDescription, + ServerDescriptionPtr newServerDescription, + bool newServer) { + topologyDescription.installServerDescription(newServerDescription); + LOG(1) << kLogPrefix << ((newServer) ? "installed new" : "updated existing") + << " server description: " << newServerDescription->toString() << std::endl; +} + +void TopologyStateMachine::modifyMaxElectionId(TopologyDescription& topologyDescription, + const OID& newMaxElectionId) { + topologyDescription._maxElectionId = newMaxElectionId; + LOG(0) << kLogPrefix << "topology max election id set to " << newMaxElectionId << std::endl; +} + +void TopologyStateMachine::modifyMaxSetVersion(TopologyDescription& topologyDescription, + int& newMaxSetVersion) { + topologyDescription._maxSetVersion = newMaxSetVersion; + LOG(0) << kLogPrefix << "topology max set version set to " << newMaxSetVersion << std::endl; +} +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_state_machine.h b/src/mongo/client/sdam/topology_state_machine.h new file mode 100644 index 00000000000..abed9bc854f --- /dev/null +++ b/src/mongo/client/sdam/topology_state_machine.h @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include <vector> + +#include "mongo/client/sdam/server_description.h" +#include "mongo/client/sdam/topology_description.h" +#include "mongo/platform/mutex.h" + +namespace mongo::sdam { +// Actions that mutate the state of the topology description via events. +using TransitionAction = std::function<void(TopologyDescription&, const ServerDescriptionPtr&)>; + +// indexed by ServerType +using StateTransitionTableRow = std::vector<TransitionAction>; + +/** + * StateTransitionTable[t][s] returns the action to + * take given that the topology currently has type t, and we receive a ServerDescription + * with type s. + */ +using StateTransitionTable = std::vector<StateTransitionTableRow>; + +class TopologyStateMachine { +public: + TopologyStateMachine(const SdamConfiguration& config); + + /** + * Provides input to the state machine, and triggers the correct action based on the current + * TopologyDescription and the incoming ServerDescription. The topologyDescription instance may + * be modified as a result. + */ + void onServerDescription(TopologyDescription& topologyDescription, + const ServerDescriptionPtr& serverDescription); + +private: + void initTransitionTable(); + + // State machine actions + // These are implemented, in an almost verbatim fashion, from the description + // here: + // https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#actions + void updateUnknownWithStandalone(TopologyDescription&, const ServerDescriptionPtr&); + void updateRSWithoutPrimary(TopologyDescription&, const ServerDescriptionPtr&); + void updateRSWithPrimaryFromMember(TopologyDescription&, const ServerDescriptionPtr&); + void updateRSFromPrimary(TopologyDescription&, const ServerDescriptionPtr&); + void removeAndStopMonitoring(TopologyDescription&, const ServerDescriptionPtr&); + void checkIfHasPrimary(TopologyDescription&, const ServerDescriptionPtr&); + void removeAndCheckIfHasPrimary(TopologyDescription&, const ServerDescriptionPtr&); + void setTopologyTypeAndUpdateRSWithoutPrimary(TopologyDescription&, + const ServerDescriptionPtr&); + TransitionAction setTopologyTypeAction(TopologyType type); + TransitionAction setTopologyTypeAndUpdateRSFromPrimary(TopologyType type); + + void addUnknownServers(TopologyDescription& topologyDescription, + const ServerDescriptionPtr& serverDescription); + + // The functions below mutate the state of the topology description + void installServerDescription(TopologyDescription& topologyDescription, + ServerDescriptionPtr newServerDescription, + bool newServer); + void removeServerDescription(TopologyDescription& topologyDescription, + const ServerAddress serverAddress); + + void modifyTopologyType(TopologyDescription& topologyDescription, TopologyType topologyType); + void modifySetName(TopologyDescription& topologyDescription, + const boost::optional<std::string>& setName); + + void modifyMaxElectionId(TopologyDescription& topologyDescription, const OID& newMaxElectionId); + void modifyMaxSetVersion(TopologyDescription& topologyDescription, int& newMaxSetVersion); + + StateTransitionTable _stt; + SdamConfiguration _config; + + static inline auto kLogPrefix = "sdam : "; +}; +} // namespace mongo::sdam diff --git a/src/mongo/client/sdam/topology_state_machine_test.cpp b/src/mongo/client/sdam/topology_state_machine_test.cpp new file mode 100644 index 00000000000..34629594200 --- /dev/null +++ b/src/mongo/client/sdam/topology_state_machine_test.cpp @@ -0,0 +1,390 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#include "mongo/client/sdam/topology_state_machine.h" + +#include <boost/optional/optional_io.hpp> + +#include "mongo/client/sdam/sdam_test_base.h" +#include "mongo/client/sdam/server_description.h" +#include "mongo/client/sdam/server_description_builder.h" +#include "mongo/client/sdam/topology_description.h" + +namespace mongo::sdam { +class TopologyStateMachineTestFixture : public SdamTestFixture { +protected: + static inline const auto kReplicaSetName = "replica_set"; + static inline const auto kLocalServer = "localhost:123"; + static inline const auto kLocalServer2 = "localhost:456"; + + static inline const auto kTwoSeedConfig = + SdamConfiguration(std::vector<ServerAddress>{kLocalServer, kLocalServer2}, + TopologyType::kUnknown, + mongo::Milliseconds(500)); + static inline const auto kTwoSeedReplicaSetNoPrimaryConfig = + SdamConfiguration(std::vector<ServerAddress>{kLocalServer, kLocalServer2}, + TopologyType::kReplicaSetNoPrimary, + mongo::Milliseconds(500), + std::string("setName")); + static inline const auto kSingleConfig = + SdamConfiguration(std::vector<ServerAddress>{kLocalServer}, TopologyType::kSingle); + + // Given we in 'starting' state with initial config 'initialConfig'. We receive a + // ServerDescription with type 'incoming', and expected the ending topology state to be + // 'ending'. + struct TopologyTypeTestCase { + SdamConfiguration initialConfig; + TopologyType starting; + ServerType incoming; + TopologyType ending; + }; + + // This function sets up the test scenario defined by the given TopologyTypeTestCase. It + // simulates receiving a ServerDescription, and asserts that the final topology type is in the + // correct state. + void assertTopologyTypeTestCase(TopologyTypeTestCase testCase) { + TopologyStateMachine stateMachine(testCase.initialConfig); + + // setup the initial state + TopologyDescription topologyDescription(testCase.initialConfig); + topologyDescription.setType(testCase.starting); + + // create new ServerDescription and + auto serverDescriptionBuilder = + ServerDescriptionBuilder().withType(testCase.incoming).withAddress(kLocalServer); + + // update the known hosts in the ServerDescription + if (testCase.initialConfig.getSeedList()) { + for (auto address : *testCase.initialConfig.getSeedList()) { + serverDescriptionBuilder.withHost(address); + } + } + + // set the primary if we are creating one + if (testCase.incoming == ServerType::kRSPrimary) { + serverDescriptionBuilder.withPrimary(kLocalServer); + } + + // set the replica set name if appropriate + const std::vector<ServerType>& replicaSetServerTypes = std::vector<ServerType>{ + ServerType::kRSOther, ServerType::kRSSecondary, ServerType::kRSArbiter}; + if (std::find(replicaSetServerTypes.begin(), + replicaSetServerTypes.end(), + testCase.incoming) != replicaSetServerTypes.end()) { + serverDescriptionBuilder.withSetName(kReplicaSetName); + } + + const auto serverDescription = serverDescriptionBuilder.instance(); + + // simulate the ServerDescription being received + stateMachine.onServerDescription(topologyDescription, serverDescription); + + ASSERT_EQUALS(topologyDescription.getType(), testCase.ending); + } + + std::vector<ServerType> allServerTypesExceptPrimary() { + auto allExceptPrimary = allServerTypes(); + allExceptPrimary.erase( + std::remove_if(allExceptPrimary.begin(), + allExceptPrimary.end(), + [](const ServerType t) { return t == ServerType::kRSPrimary; }), + allExceptPrimary.end()); + return allExceptPrimary; + } +}; + +TEST_F(TopologyStateMachineTestFixture, ShouldInstallServerDescriptionInSingleTopology) { + TopologyStateMachine stateMachine(kSingleConfig); + TopologyDescription topologyDescription(kSingleConfig); + + auto updatedMeAddress = "foo:1234"; + auto serverDescription = ServerDescriptionBuilder() + .withAddress(kLocalServer) + .withMe(updatedMeAddress) + .withType(ServerType::kStandalone) + .instance(); + + stateMachine.onServerDescription(topologyDescription, serverDescription); + ASSERT_EQUALS(static_cast<size_t>(1), topologyDescription.getServers().size()); + + auto result = topologyDescription.findServerByAddress(kLocalServer); + ASSERT(result); + ASSERT_EQUALS(serverDescription, *result); +} + +TEST_F(TopologyStateMachineTestFixture, ShouldRemoveServerDescriptionIfNotInHostsList) { + const auto primary = (*kTwoSeedConfig.getSeedList()).front(); + const auto expectedRemovedServer = (*kTwoSeedConfig.getSeedList()).back(); + + TopologyStateMachine stateMachine(kTwoSeedConfig); + TopologyDescription topologyDescription(kTwoSeedConfig); + + auto serverDescription = ServerDescriptionBuilder() + .withAddress(primary) + .withType(ServerType::kRSPrimary) + .withPrimary(primary) + .withHost(primary) + .instance(); + + ASSERT_EQUALS(static_cast<size_t>(2), topologyDescription.getServers().size()); + stateMachine.onServerDescription(topologyDescription, serverDescription); + ASSERT_EQUALS(static_cast<size_t>(1), topologyDescription.getServers().size()); + ASSERT_EQUALS(serverDescription, topologyDescription.getServers().front()); +} + +TEST_F(TopologyStateMachineTestFixture, + ShouldRemoveNonPrimaryServerWhenTopologyIsReplicaSetNoPrimaryAndMeDoesntMatchAddress) { + const auto serverAddress = (*kTwoSeedReplicaSetNoPrimaryConfig.getSeedList()).front(); + const auto expectedRemainingServerAddress = + (*kTwoSeedReplicaSetNoPrimaryConfig.getSeedList()).back(); + const auto me = std::string("foo") + serverAddress; + + TopologyStateMachine stateMachine(kTwoSeedReplicaSetNoPrimaryConfig); + TopologyDescription topologyDescription(kTwoSeedReplicaSetNoPrimaryConfig); + + auto serverDescription = ServerDescriptionBuilder() + .withAddress(serverAddress) + .withMe(me) + .withType(ServerType::kRSSecondary) + .instance(); + + ASSERT_EQUALS(static_cast<size_t>(2), topologyDescription.getServers().size()); + stateMachine.onServerDescription(topologyDescription, serverDescription); + ASSERT_EQUALS(static_cast<size_t>(1), topologyDescription.getServers().size()); + ASSERT_EQUALS(expectedRemainingServerAddress, + topologyDescription.getServers().front()->getAddress()); +} + +TEST_F(TopologyStateMachineTestFixture, + ShouldAddServerDescriptionIfInHostsListButNotInTopologyDescription) { + const auto primary = (*kTwoSeedConfig.getSeedList()).front(); + const auto secondary = (*kTwoSeedConfig.getSeedList()).back(); + const auto newHost = ServerAddress("newhost:123"); + + TopologyStateMachine stateMachine(kTwoSeedConfig); + TopologyDescription topologyDescription(kTwoSeedConfig); + + auto serverDescription = ServerDescriptionBuilder() + .withAddress(primary) + .withType(ServerType::kRSPrimary) + .withPrimary(primary) + .withHost(primary) + .withHost(secondary) + .withHost(newHost) + .instance(); + + ASSERT_EQUALS(static_cast<size_t>(2), topologyDescription.getServers().size()); + stateMachine.onServerDescription(topologyDescription, serverDescription); + ASSERT_EQUALS(static_cast<size_t>(3), topologyDescription.getServers().size()); + + auto newHostResult = topologyDescription.findServerByAddress(newHost); + ASSERT(newHostResult); + ASSERT_EQUALS(newHost, (*newHostResult)->getAddress()); + ASSERT_EQUALS(ServerType::kUnknown, (*newHostResult)->getType()); +} + +TEST_F(TopologyStateMachineTestFixture, ShouldSaveNewMaxSetVersion) { + const auto primary = (*kTwoSeedConfig.getSeedList()).front(); + + TopologyDescription topologyDescription(kTwoSeedConfig); + TopologyStateMachine stateMachine(kTwoSeedConfig); + + auto serverDescription = ServerDescriptionBuilder() + .withType(ServerType::kRSPrimary) + .withPrimary(primary) + .withMe(primary) + .withAddress(primary) + .withHost(primary) + .withSetVersion(100) + .instance(); + + stateMachine.onServerDescription(topologyDescription, serverDescription); + ASSERT_EQUALS(100, topologyDescription.getMaxSetVersion()); + + auto serverDescriptionEvenBiggerSetVersion = ServerDescriptionBuilder() + .withType(ServerType::kRSPrimary) + .withPrimary(primary) + .withMe(primary) + .withAddress(primary) + .withHost(primary) + .withSetVersion(200) + .instance(); + + stateMachine.onServerDescription(topologyDescription, serverDescriptionEvenBiggerSetVersion); + ASSERT_EQUALS(200, topologyDescription.getMaxSetVersion()); +} + +TEST_F(TopologyStateMachineTestFixture, ShouldSaveNewMaxElectionId) { + const auto primary = (*kTwoSeedConfig.getSeedList()).front(); + TopologyDescription topologyDescription(kTwoSeedConfig); + TopologyStateMachine stateMachine(kTwoSeedConfig); + + const OID oidOne(std::string("000000000000000000000001")); + const OID oidTwo(std::string("000000000000000000000002")); + + auto serverDescription = ServerDescriptionBuilder() + .withType(ServerType::kRSPrimary) + .withPrimary(primary) + .withMe(primary) + .withAddress(primary) + .withHost(primary) + .withSetVersion(1) + .withElectionId(oidOne) + .instance(); + + stateMachine.onServerDescription(topologyDescription, serverDescription); + ASSERT_EQUALS(oidOne, topologyDescription.getMaxElectionId()); + + auto serverDescriptionEvenBiggerElectionId = ServerDescriptionBuilder() + .withType(ServerType::kRSPrimary) + .withPrimary(primary) + .withMe(primary) + .withAddress(primary) + .withHost(primary) + .withSetVersion(1) + .withElectionId(oidTwo) + .instance(); + + stateMachine.onServerDescription(topologyDescription, serverDescriptionEvenBiggerElectionId); + ASSERT_EQUALS(oidTwo, topologyDescription.getMaxElectionId()); +} + +// The following two tests (ShouldNotUpdateToplogyType, ShouldUpdateToCorrectToplogyType) assert +// that the topology type is correct given an initial state and a ServerType. Together, they +// cover all the cases specified in the SDAM spec here: +// https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#topologytype-table + +TEST_F(TopologyStateMachineTestFixture, ShouldNotUpdateToplogyType) { + using T = TopologyTypeTestCase; + + // test cases that should not change TopologyType + std::vector<TopologyTypeTestCase> testCases{ + T{kTwoSeedConfig, TopologyType::kUnknown, ServerType::kUnknown, TopologyType::kUnknown}, + T{kTwoSeedConfig, TopologyType::kUnknown, ServerType::kStandalone, TopologyType::kUnknown}, + T{kTwoSeedConfig, TopologyType::kUnknown, ServerType::kRSGhost, TopologyType::kUnknown}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetNoPrimary, + ServerType::kUnknown, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetNoPrimary, + ServerType::kUnknown, + TopologyType::kReplicaSetNoPrimary}, + }; + for (auto serverType : allServerTypes()) { + testCases.push_back( + T{kTwoSeedConfig, TopologyType::kSharded, serverType, TopologyType::kSharded}); + } + + const auto& allExceptPrimary = allServerTypesExceptPrimary(); + for (auto serverType : allExceptPrimary) { + testCases.push_back(T{kTwoSeedConfig, + TopologyType::kReplicaSetNoPrimary, + serverType, + TopologyType::kReplicaSetNoPrimary}); + } + + int count = 0; + for (auto testCase : testCases) { + std::cout << "case " << ++count << " starting TopologyType: " << toString(testCase.starting) + << "; incoming ServerType: " << toString(testCase.incoming) + << "; expect ending TopologyType: " << toString(testCase.ending) << std::endl; + + assertTopologyTypeTestCase(testCase); + } +} + +TEST_F(TopologyStateMachineTestFixture, ShouldUpdateToCorrectToplogyType) { + using T = TopologyTypeTestCase; + + // test cases that should change TopologyType + const std::vector<TopologyTypeTestCase> testCases{ + T{kTwoSeedConfig, TopologyType::kUnknown, ServerType::kMongos, TopologyType::kSharded}, + T{kTwoSeedConfig, + TopologyType::kUnknown, + ServerType::kRSPrimary, + TopologyType::kReplicaSetWithPrimary}, + T{kTwoSeedConfig, + TopologyType::kUnknown, + ServerType::kRSSecondary, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kUnknown, + ServerType::kRSArbiter, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kUnknown, + ServerType::kRSOther, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetNoPrimary, + ServerType::kRSPrimary, + TopologyType::kReplicaSetWithPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetWithPrimary, + ServerType::kUnknown, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetWithPrimary, + ServerType::kStandalone, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetWithPrimary, + ServerType::kMongos, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetWithPrimary, + ServerType::kRSPrimary, + TopologyType::kReplicaSetWithPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetWithPrimary, + ServerType::kRSSecondary, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetWithPrimary, + ServerType::kRSOther, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetWithPrimary, + ServerType::kRSArbiter, + TopologyType::kReplicaSetNoPrimary}, + T{kTwoSeedConfig, + TopologyType::kReplicaSetWithPrimary, + ServerType::kRSGhost, + TopologyType::kReplicaSetNoPrimary}}; + + int count = 0; + for (auto testCase : testCases) { + std::cout << "case " << ++count << " starting TopologyType: " << toString(testCase.starting) + << "; incoming ServerType: " << toString(testCase.incoming) + << "; expect ending TopologyType: " << toString(testCase.ending) << std::endl; + + assertTopologyTypeTestCase(testCase); + } +} +} // namespace mongo::sdam |