summaryrefslogtreecommitdiff
path: root/src/mongo/client/streamable_replica_set_monitor.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/streamable_replica_set_monitor.h')
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h163
1 files changed, 140 insertions, 23 deletions
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index 8c27301660e..9fbdf858f6e 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -38,7 +38,10 @@
#include "mongo/client/mongo_uri.h"
#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/client/sdam/sdam.h"
+#include "mongo/client/server_is_master_monitor.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/logger/log_component.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/duration.h"
#include "mongo/util/net/hostandport.h"
@@ -46,48 +49,162 @@
namespace mongo {
-class StreamableReplicaSetMonitor : public ReplicaSetMonitor {
- StreamableReplicaSetMonitor(const StreamableReplicaSetMonitor&) = delete;
- StreamableReplicaSetMonitor& operator=(const StreamableReplicaSetMonitor&) = delete;
+class BSONObj;
+class ReplicaSetMonitor;
+class ReplicaSetMonitorTest;
+struct ReadPreferenceSetting;
+using ReplicaSetMonitorPtr = std::shared_ptr<ReplicaSetMonitor>;
+
+/**
+ * Replica set monitor implementation backed by the classes in the mongo::sdam namespace.
+ *
+ * All methods perform the required synchronization to allow callers from multiple threads.
+ */
+class StreamableReplicaSetMonitor :
+ public ReplicaSetMonitor,
+ public sdam::TopologyListener,
+ public std::enable_shared_from_this<StreamableReplicaSetMonitor> {
+
+ StreamableReplicaSetMonitor(const ReplicaSetMonitor&) = delete;
+ StreamableReplicaSetMonitor& operator=(const ReplicaSetMonitor&) = delete;
public:
- StreamableReplicaSetMonitor(const MongoURI& uri);
+ class Refresher;
+
+ static constexpr auto kExpeditedRefreshPeriod = Milliseconds(500);
+ static constexpr auto kCheckTimeout = Seconds(5);
+
+ StreamableReplicaSetMonitor(const MongoURI& uri,
+ std::shared_ptr<executor::TaskExecutor> executor);
+
+ void init();
- void init() override;
+ void drop();
- void drop() override;
+ static ReplicaSetMonitorPtr make(const MongoURI& uri,
+ std::shared_ptr<executor::TaskExecutor> executor = nullptr);
- SemiFuture<HostAndPort> getHostOrRefresh(
- const ReadPreferenceSetting& readPref,
- Milliseconds maxWait = kDefaultFindHostTimeout) override;
+ SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
+ Milliseconds maxWait = kDefaultFindHostTimeout);
SemiFuture<std::vector<HostAndPort>> getHostsOrRefresh(
- const ReadPreferenceSetting& readPref,
- Milliseconds maxWait = kDefaultFindHostTimeout) override;
+ const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout);
+
+ HostAndPort getMasterOrUassert();
+
+ void failedHost(const HostAndPort& host, const Status& status);
+ void failedHost(const HostAndPort& host, BSONObj bson, const Status& status);
+
+ bool isPrimary(const HostAndPort& host) const;
+
+ bool isHostUp(const HostAndPort& host) const;
+
+ int getMinWireVersion() const;
+
+ int getMaxWireVersion() const;
+
+ std::string getName() const;
+
+ std::string getServerAddress() const;
+
+ const MongoURI& getOriginalUri() const;
+
+ bool contains(const HostAndPort& server) const;
+
+ void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const;
+
+ bool isKnownToHaveGoodPrimary() const;
+
+private:
+ class StreamableReplicaSetMonitorQueryProcessor;
+ using StreamableReplicaSetMontiorQueryProcessorPtr =
+ std::shared_ptr<StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor>;
+
+ struct HostQuery {
+ Date_t deadline;
+ executor::TaskExecutor::CallbackHandle deadlineHandle;
+ ReadPreferenceSetting criteria;
+ Date_t start = Date_t::now();
+ bool done = false;
+ Promise<std::vector<HostAndPort>> promise;
+ };
+ using HostQueryPtr = std::shared_ptr<HostQuery>;
+
+ SemiFuture<std::vector<HostAndPort>> _enqueueOutstandingQuery(
+ WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline);
+
+ std::vector<HostAndPort> _extractHosts(
+ const std::vector<sdam::ServerDescriptionPtr>& serverDescriptions);
+ boost::optional<std::vector<HostAndPort>> _getHosts(const TopologyDescriptionPtr& topology,
+ const ReadPreferenceSetting& criteria);
+ boost::optional<std::vector<HostAndPort>> _getHosts(const ReadPreferenceSetting& criteria);
+
+ void onTopologyDescriptionChangedEvent(UUID topologyId,
+ sdam::TopologyDescriptionPtr previousDescription,
+ sdam::TopologyDescriptionPtr newDescription) override;
+
+ void onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs,
+ const sdam::ServerAddress& hostAndPort,
+ const BSONObj reply) override;
+
+ void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
+ Status errorStatus,
+ const ServerAddress& hostAndPort,
+ const BSONObj reply) override;
+
+ void onServerPingFailedEvent(const sdam::ServerAddress& hostAndPort,
+ const Status& status) override;
+
+ void onServerPingSucceededEvent(sdam::IsMasterRTT durationMS,
+ const sdam::ServerAddress& hostAndPort) override;
+
+ // Get a pointer to the current primary's ServerDescription
+ // To ensure a consistent view of the Topology either _currentPrimary or _currentTopology should
+ // be called (not both) since the topology can change between the function invocations.
+ boost::optional<sdam::ServerDescriptionPtr> _currentPrimary() const;
- HostAndPort getMasterOrUassert() override;
+ // Get the current TopologyDescription
+ // Note that most functions will want to save the result of this function once per computation
+ // so that we are operating on a consistent read-only view of the topology.
+ sdam::TopologyDescriptionPtr _currentTopology() const;
- void failedHost(const HostAndPort& host, const Status& status) override;
+ std::string _logPrefix();
- bool isPrimary(const HostAndPort& host) const override;
+ void _failOutstandingWithStatus(WithLock, Status status);
+ bool _hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription,
+ sdam::TopologyDescriptionPtr newDescription);
- bool isHostUp(const HostAndPort& host) const override;
+ Status _makeUnsatisfiedReadPrefError(const ReadPreferenceSetting& criteria) const;
+ Status _makeReplicaSetMonitorRemovedError() const;
- int getMinWireVersion() const override;
+ // Try to satisfy the outstanding queries for this instance with the given topology information.
+ void _processOutstanding(const TopologyDescriptionPtr& topologyDescription);
- int getMaxWireVersion() const override;
+ sdam::SdamConfiguration _sdamConfig;
+ sdam::TopologyManagerPtr _topologyManager;
+ sdam::ServerSelectorPtr _serverSelector;
+ sdam::TopologyEventsPublisherPtr _eventsPublisher;
+ ServerIsMasterMonitorPtr _isMasterMonitor;
- std::string getName() const override;
+ // This object will be registered as a TopologyListener if there are
+ // any outstanding queries for this RSM instance.
+ StreamableReplicaSetMontiorQueryProcessorPtr _queryProcessor;
- std::string getServerAddress() const override;
+ const MongoURI _uri;
- const MongoURI& getOriginalUri() const override;
+ std::shared_ptr<executor::TaskExecutor> _executor;
- bool contains(const HostAndPort& server) const override;
+ AtomicWord<bool> _isDropped{true};
- void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const override;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicaSetMonitor");
+ std::vector<HostQueryPtr> _outstandingQueries;
+ mutable PseudoRandom _random;
- bool isKnownToHaveGoodPrimary() const override;
+ static inline const auto kServerSelectionConfig =
+ sdam::ServerSelectionConfiguration::defaultConfiguration();
+ static inline const auto kDefaultLogLevel = logger::LogSeverity::Debug(1);
+ static inline const auto kLowerLogLevel = kDefaultLogLevel.lessSevere();
+ static constexpr auto kLogPrefix = "[ReplicaSetMonitor]";
};
} // namespace mongo