diff options
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.cpp | 80 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.h | 10 |
2 files changed, 72 insertions, 18 deletions
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 0621cbc8793..9edd169b578 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -82,6 +82,11 @@ bool secondaryPredicate(const ServerDescriptionPtr& server) { return server->getType() == ServerType::kRSSecondary; } +bool primaryOrSecondaryPredicate(const ServerDescriptionPtr& server) { + const auto serverType = server->getType(); + return serverType == ServerType::kRSPrimary || serverType == ServerType::kRSSecondary; +} + std::string readPrefToStringWithMinOpTime(const ReadPreferenceSetting& readPref) { BSONObjBuilder builder; readPref.toInnerBSON(&builder); @@ -226,12 +231,6 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::getHostsOrRefr // try to satisfy query immediately auto immediateResult = _getHosts(criteria); if (immediateResult) { - LOGV2_DEBUG(4333211, - kLowerLogLevel, - "RSM {setName} getHosts: {readPref} -> {result}", - "readPref"_attr = readPrefToStringWithMinOpTime(criteria), - "setName"_attr = getName(), - "result"_attr = hostListToString(immediateResult)); return {*immediateResult}; } @@ -478,34 +477,79 @@ sdam::TopologyDescriptionPtr StreamableReplicaSetMonitor::_currentTopology() con return _topologyManager->getTopologyDescription(); } +void StreamableReplicaSetMonitor::_setConfirmedNotifierState( + WithLock, const ServerDescriptionPtr& primaryDescription) { + invariant(primaryDescription && primaryDescription->getType() == sdam::ServerType::kRSPrimary); + + const auto& hosts = primaryDescription->getHosts(); + const auto& passives = primaryDescription->getPassives(); + + // TODO SERVER-45395: remove need for HostAndPort conversion + std::set<HostAndPort> confirmedHosts; + std::transform(hosts.begin(), + hosts.end(), + std::inserter(confirmedHosts, confirmedHosts.end()), + [](const ServerAddress& addr) -> HostAndPort { return HostAndPort(addr); }); + + std::set<HostAndPort> confirmedPassives; + std::transform(passives.begin(), + passives.end(), + std::inserter(confirmedPassives, confirmedPassives.end()), + [](const ServerAddress& addr) -> HostAndPort { return HostAndPort(addr); }); + + confirmedHosts.insert(confirmedPassives.begin(), confirmedPassives.end()); + + _confirmedNotifierState = ChangeNotifierState{ + HostAndPort(primaryDescription->getAddress()), + confirmedPassives, + ConnectionString::forReplicaSet( + getName(), std::vector<HostAndPort>(confirmedHosts.begin(), confirmedHosts.end()))}; +} + void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent( UUID topologyId, TopologyDescriptionPtr previousDescription, TopologyDescriptionPtr newDescription) { + stdx::lock_guard lock(_mutex); + if (_isDropped.load()) + return; - // notify external components, if there are membership - // changes in the topology. + // Notify external components if there are membership changes in the topology. if (_hasMembershipChange(previousDescription, newDescription)) { LOGV2(4333213, "RSM {setName} Topology Change: {topologyDescription}", "setName"_attr = getName(), "topologyDescription"_attr = newDescription->toString()); - // TODO SERVER-45395: remove when HostAndPort conversion is done - std::vector<HostAndPort> servers = _extractHosts(newDescription->getServers()); - - auto connectionString = ConnectionString::forReplicaSet(getName(), servers); auto maybePrimary = newDescription->getPrimary(); if (maybePrimary) { - // TODO SERVER-45395: remove need for HostAndPort conversion - auto hostList = _extractHosts(newDescription->findServers(secondaryPredicate)); - std::set<HostAndPort> secondaries(hostList.begin(), hostList.end()); + _setConfirmedNotifierState(lock, *maybePrimary); - auto primaryAddress = HostAndPort((*maybePrimary)->getAddress()); ReplicaSetMonitorManager::get()->getNotifier().onConfirmedSet( - connectionString, primaryAddress, secondaries); + _confirmedNotifierState->connectionString, + _confirmedNotifierState->primaryAddress, + _confirmedNotifierState->passives); } else { - ReplicaSetMonitorManager::get()->getNotifier().onPossibleSet(connectionString); + if (_confirmedNotifierState) { + const auto& connectionString = _confirmedNotifierState->connectionString; + ReplicaSetMonitorManager::get()->getNotifier().onPossibleSet(connectionString); + } else { + // No confirmed hosts yet, just send list of hosts that are routable base on type. + const auto& primaryAndSecondaries = + newDescription->findServers(primaryOrSecondaryPredicate); + if (primaryAndSecondaries.size() == 0) { + LOGV2_DEBUG(4645401, + kLowerLogLevel, + "Skip publishing unconfirmed replica set members since there are " + "no primaries or secondaries in the new topology", + "replicaSetName"_attr = getName()); + return; + } + + const auto connectionString = ConnectionString::forReplicaSet( + getName(), _extractHosts(primaryAndSecondaries)); + ReplicaSetMonitorManager::get()->getNotifier().onPossibleSet(connectionString); + } } } } diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index 59006e0221c..a20f9a80931 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -131,6 +131,14 @@ private: }; using HostQueryPtr = std::shared_ptr<HostQuery>; + // Information collected from the primary ServerDescription to be published via the + // ReplicaSetChangeNotifier + struct ChangeNotifierState { + HostAndPort primaryAddress; + std::set<HostAndPort> passives; + ConnectionString connectionString; + }; + SemiFuture<std::vector<HostAndPort>> _enqueueOutstandingQuery( WithLock, const ReadPreferenceSetting& criteria, const Date_t& deadline); @@ -174,6 +182,7 @@ private: void _failOutstandingWithStatus(WithLock, Status status); bool _hasMembershipChange(sdam::TopologyDescriptionPtr oldDescription, sdam::TopologyDescriptionPtr newDescription); + void _setConfirmedNotifierState(WithLock, const ServerDescriptionPtr& primaryDescription); Status _makeUnsatisfiedReadPrefError(const ReadPreferenceSetting& criteria) const; Status _makeReplicaSetMonitorRemovedError() const; @@ -199,6 +208,7 @@ private: mutable Mutex _mutex = MONGO_MAKE_LATCH("ReplicaSetMonitor"); std::vector<HostQueryPtr> _outstandingQueries; + boost::optional<ChangeNotifierState> _confirmedNotifierState; mutable PseudoRandom _random; static inline const auto kServerSelectionConfig = |