diff options
-rw-r--r-- | src/mongo/client/SConscript | 7 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.cpp | 301 | ||||
-rw-r--r-- | src/mongo/client/scanning_replica_set_monitor.cpp | 2 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_description.cpp | 4 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_manager.cpp | 7 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.cpp | 7 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.h | 5 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor_query_processor.cpp | 2 |
8 files changed, 170 insertions, 165 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index dd7c410279c..d8323999d08 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -194,14 +194,13 @@ clientDriverEnv.Library( env.Idlc('global_conn_pool.idl')[0], 'replica_set_change_notifier.cpp', 'replica_set_monitor.cpp', - 'scanning_replica_set_monitor.cpp', - 'streamable_replica_set_monitor.cpp', 'replica_set_monitor_manager.cpp', + 'scanning_replica_set_monitor.cpp', env.Idlc('replica_set_monitor_params.idl')[0], - 'server_ping_monitor.cpp', - 'server_is_master_monitor.cpp', 'streamable_replica_set_monitor.cpp', 'streamable_replica_set_monitor_query_processor.cpp', + 'server_is_master_monitor.cpp', + 'server_ping_monitor.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/write_concern_options', diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index 193a799806d..57749b142a7 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -38,8 +38,9 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/connection_string.h" #include "mongo/client/mongo_uri.h" -#include "mongo/client/scanning_replica_set_monitor.h" #include "mongo/client/replica_set_monitor.h" +#include "mongo/client/replica_set_monitor_params_gen.h" +#include "mongo/client/scanning_replica_set_monitor.h" #include "mongo/client/streamable_replica_set_monitor.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface_factory.h" @@ -47,196 +48,196 @@ #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/logv2/log.h" #include "mongo/platform/mutex.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/util/map_util.h" -#include "mongo/client/replica_set_monitor_params_gen.h" - namespace mongo { +namespace mongo { - using std::set; - using std::shared_ptr; - using std::string; - using std::vector; +using std::set; +using std::shared_ptr; +using std::string; +using std::vector; - using executor::NetworkInterface; - using executor::NetworkInterfaceThreadPool; - using executor::TaskExecutor; - using executor::TaskExecutorPool; - using executor::ThreadPoolTaskExecutor; +using executor::NetworkInterface; +using executor::NetworkInterfaceThreadPool; +using executor::TaskExecutor; +using executor::TaskExecutorPool; +using executor::ThreadPoolTaskExecutor; - namespace { - const auto getGlobalRSMMonitorManager = - ServiceContext::declareDecoration<ReplicaSetMonitorManager>(); - } // namespace +namespace { +const auto getGlobalRSMMonitorManager = + ServiceContext::declareDecoration<ReplicaSetMonitorManager>(); +} // namespace - ReplicaSetMonitorManager::~ReplicaSetMonitorManager() { - shutdown(); - } +ReplicaSetMonitorManager::~ReplicaSetMonitorManager() { + shutdown(); +} - ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() { - return &getGlobalRSMMonitorManager(getGlobalServiceContext()); - } +ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() { + return &getGlobalRSMMonitorManager(getGlobalServiceContext()); +} - shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData setName) { - stdx::lock_guard<Latch> lk(_mutex); +shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData setName) { + stdx::lock_guard<Latch> lk(_mutex); - if (auto monitor = _monitors[setName].lock()) { - return monitor; - } else { - return shared_ptr<ReplicaSetMonitor>(); - } + if (auto monitor = _monitors[setName].lock()) { + return monitor; + } else { + return shared_ptr<ReplicaSetMonitor>(); } +} - void ReplicaSetMonitorManager::_setupTaskExecutorInLock() { - if (_isShutdown || _taskExecutor) { - // do not restart taskExecutor if is in shutdown - return; - } - - // construct task executor - auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); - auto net = executor::makeNetworkInterface( - "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList)); - auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); - _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); - _taskExecutor->startup(); +void ReplicaSetMonitorManager::_setupTaskExecutorInLock() { + if (_isShutdown || _taskExecutor) { + // do not restart taskExecutor if is in shutdown + return; } - namespace { - void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) { - uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b); + // construct task executor + auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); + auto net = executor::makeNetworkInterface( + "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList)); + auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); + _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _taskExecutor->startup(); +} + +namespace { +void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) { + uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b); +} +} // namespace + +shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( + const ConnectionString& connStr) { + return getOrCreateMonitor(MongoURI(connStr)); +} + +shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const MongoURI& uri) { + invariant(uri.type() == ConnectionString::SET); + stdx::lock_guard<Latch> lk(_mutex); + uassert(ErrorCodes::ShutdownInProgress, + str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown", + !_isShutdown); + + _setupTaskExecutorInLock(); + const auto& setName = uri.getSetName(); + auto monitor = _monitors[setName].lock(); + if (monitor) { + uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode()); + return monitor; } - } // namespace - shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( - const ConnectionString& connStr) { - return getOrCreateMonitor(MongoURI(connStr)); + LOGV2(20186, "Starting new replica set monitor for {uri}", "uri"_attr = uri.toString()); + + std::shared_ptr<ReplicaSetMonitor> newMonitor; + if (disableStreamableReplicaSetMonitor.load()) { + newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri); + newMonitor->init(); + } else { + newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor()); } + _monitors[setName] = newMonitor; + return newMonitor; +} - shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( - const MongoURI& uri) { - invariant(uri.type() == ConnectionString::SET); - stdx::lock_guard<Latch> lk(_mutex); - uassert(ErrorCodes::ShutdownInProgress, - str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown", - !_isShutdown); - - _setupTaskExecutorInLock(); - const auto& setName = uri.getSetName(); - auto monitor = _monitors[setName].lock(); - if (monitor) { - uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode()); - return monitor; - } +vector<string> ReplicaSetMonitorManager::getAllSetNames() { + vector<string> allNames; - log() << "Starting new replica set monitor for " << uri.toString(); + stdx::lock_guard<Latch> lk(_mutex); - std::shared_ptr<ReplicaSetMonitor> newMonitor; - if (disableStreamableReplicaSetMonitor.load()) { - newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri); - newMonitor->init(); - } else { - newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor()); - } - _monitors[setName] = newMonitor; - return newMonitor; + for (const auto& entry : _monitors) { + allNames.push_back(entry.first); } - vector<string> ReplicaSetMonitorManager::getAllSetNames() { - vector<string> allNames; - - stdx::lock_guard<Latch> lk(_mutex); + return allNames; +} - for (const auto& entry : _monitors) { - allNames.push_back(entry.first); +void ReplicaSetMonitorManager::removeMonitor(StringData setName) { + stdx::lock_guard<Latch> lk(_mutex); + ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName); + if (it != _monitors.end()) { + if (auto monitor = it->second.lock()) { + monitor->drop(); } - - return allNames; + _monitors.erase(it); + LOGV2( + 20187, "Removed ReplicaSetMonitor for replica set {setName}", "setName"_attr = setName); } +} - void ReplicaSetMonitorManager::removeMonitor(StringData setName) { +void ReplicaSetMonitorManager::shutdown() { + decltype(_monitors) monitors; + decltype(_taskExecutor) taskExecutor; + { stdx::lock_guard<Latch> lk(_mutex); - ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName); - if (it != _monitors.end()) { - if (auto monitor = it->second.lock()) { - monitor->drop(); - } - _monitors.erase(it); - log() << "Removed ReplicaSetMonitor for replica set " << setName; + if (std::exchange(_isShutdown, true)) { + return; } - } - void ReplicaSetMonitorManager::shutdown() { - decltype(_monitors) monitors; - decltype(_taskExecutor) taskExecutor; - { - stdx::lock_guard<Latch> lk(_mutex); - if (std::exchange(_isShutdown, true)) { - return; - } - - monitors = std::exchange(_monitors, {}); - taskExecutor = std::exchange(_taskExecutor, {}); - } + monitors = std::exchange(_monitors, {}); + taskExecutor = std::exchange(_taskExecutor, {}); + } - if (taskExecutor) { - LOG(1) << "Shutting down task executor used for monitoring replica sets"; - taskExecutor->shutdown(); - } + if (taskExecutor) { + LOGV2_DEBUG(20188, 1, "Shutting down task executor used for monitoring replica sets"); + taskExecutor->shutdown(); + } - for (auto& [name, monitor] : monitors) { - auto anchor = monitor.lock(); - if (!anchor) { - continue; - } - anchor->drop(); + for (auto& [name, monitor] : monitors) { + auto anchor = monitor.lock(); + if (!anchor) { + continue; } + anchor->drop(); + } - if (taskExecutor) { - taskExecutor->join(); - } + if (taskExecutor) { + taskExecutor->join(); } +} - void ReplicaSetMonitorManager::removeAllMonitors() { - shutdown(); +void ReplicaSetMonitorManager::removeAllMonitors() { + shutdown(); - { - stdx::lock_guard<Latch> lk(_mutex); - _isShutdown = false; - } + { + stdx::lock_guard<Latch> lk(_mutex); + _isShutdown = false; } - - void ReplicaSetMonitorManager::report(BSONObjBuilder * builder, bool forFTDC) { - // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the - // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and - // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook - // potentially calling ShardRegistry::updateConfigServerConnectionString. - auto setNames = getAllSetNames(); - - BSONObjBuilder setStats( - builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets")); - - for (const auto& setName : setNames) { - auto monitor = getMonitor(setName); - if (!monitor) { - continue; - } - monitor->appendInfo(setStats, forFTDC); +} + +void ReplicaSetMonitorManager::report(BSONObjBuilder* builder, bool forFTDC) { + // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the + // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and + // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook + // potentially calling ShardRegistry::updateConfigServerConnectionString. + auto setNames = getAllSetNames(); + + BSONObjBuilder setStats( + builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets")); + + for (const auto& setName : setNames) { + auto monitor = getMonitor(setName); + if (!monitor) { + continue; } + monitor->appendInfo(setStats, forFTDC); } +} - std::shared_ptr<executor::TaskExecutor> ReplicaSetMonitorManager::getExecutor() { - invariant(_taskExecutor); - return _taskExecutor; - } +std::shared_ptr<executor::TaskExecutor> ReplicaSetMonitorManager::getExecutor() { + invariant(_taskExecutor); + return _taskExecutor; +} - ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() { - return _notifier; - } +ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() { + return _notifier; +} - bool ReplicaSetMonitorManager::isShutdown() const { - stdx::lock_guard<Latch> lk(_mutex); - return _isShutdown; - } +bool ReplicaSetMonitorManager::isShutdown() const { + stdx::lock_guard<Latch> lk(_mutex); + return _isShutdown; +} } // namespace mongo diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp index d29259d4bb7..af019559cef 100644 --- a/src/mongo/client/scanning_replica_set_monitor.cpp +++ b/src/mongo/client/scanning_replica_set_monitor.cpp @@ -173,7 +173,7 @@ ScanningReplicaSetMonitor::ScanningReplicaSetMonitor(const MongoURI& uri) : ScanningReplicaSetMonitor( std::make_shared<SetState>(uri, &ReplicaSetMonitorManager::get()->getNotifier(), - ReplicaSetMonitorManager::get()->getExecutor())) {} + ReplicaSetMonitorManager::get()->getExecutor().get())) {} void ScanningReplicaSetMonitor::init() { if (areRefreshRetriesDisabledForTest()) { diff --git a/src/mongo/client/sdam/topology_description.cpp b/src/mongo/client/sdam/topology_description.cpp index ecf5a2b389d..ac25e940ef4 100644 --- a/src/mongo/client/sdam/topology_description.cpp +++ b/src/mongo/client/sdam/topology_description.cpp @@ -31,6 +31,7 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/client/sdam/server_description.h" #include "mongo/db/wire_version.h" +#include "mongo/logv2/log.h" namespace mongo::sdam { //////////////////////// @@ -107,8 +108,7 @@ const boost::optional<ServerDescriptionPtr> TopologyDescription::findServerByAdd boost::optional<ServerDescriptionPtr> TopologyDescription::installServerDescription( const ServerDescriptionPtr& newServerDescription) { - LOG(2) << "(" << getSetName() << ") install ServerDescription " - << newServerDescription->toString(); + LOGV2_DEBUG(4333202, 2, "install server description {description}", "description"_attr = newServerDescription->toString()); boost::optional<ServerDescriptionPtr> previousDescription; if (getType() == TopologyType::kSingle) { diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp index 64d74e114d9..81de9cf16f6 100644 --- a/src/mongo/client/sdam/topology_manager.cpp +++ b/src/mongo/client/sdam/topology_manager.cpp @@ -30,6 +30,8 @@ #include "mongo/client/sdam/topology_manager.h" +#include <string> + #include "mongo/client/sdam/topology_state_machine.h" #include "mongo/logv2/log.h" #include "mongo/rpc/topology_version_gen.h" @@ -143,9 +145,8 @@ void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT } // otherwise, the server was removed from the topology. Nothing to do. - LOGV2(433301, - str::stream() << "Not updating RTT. Server {server}" << hostAndPort - << " does not exist in ", + LOGV2(4333201, + "Not updating RTT. Server {server} does not exist in {setName}", "server"_attr = hostAndPort, "setName"_attr = getTopologyDescription()->getSetName()); } diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index abe5e23f3dd..f716bb71d67 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -112,8 +112,9 @@ int32_t pingTimeMillis(const ServerDescriptionPtr& serverDescription) { constexpr auto kZeroMs = Milliseconds(0); } // namespace -StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri, std::shared_ptr<TaskExecutor> executor) : - _serverSelector(std::make_unique<SdamServerSelector>(kServerSelectionConfig)), +StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri, + std::shared_ptr<TaskExecutor> executor) + : _serverSelector(std::make_unique<SdamServerSelector>(kServerSelectionConfig)), _queryProcessor(std::make_shared<StreamableReplicaSetMonitorQueryProcessor>()), _uri(uri), _executor(executor), @@ -591,4 +592,6 @@ Status StreamableReplicaSetMonitor::_makeReplicaSetMonitorRemovedError() const { return Status(ErrorCodes::ReplicaSetMonitorRemoved, str::stream() << "ReplicaSetMonitor for set " << getName() << " is removed"); } + +void StreamableReplicaSetMonitor::runScanForMockReplicaSet() {} } // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index 9fbdf858f6e..1a72da2aecf 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -60,8 +60,8 @@ using ReplicaSetMonitorPtr = std::shared_ptr<ReplicaSetMonitor>; * * All methods perform the required synchronization to allow callers from multiple threads. */ -class StreamableReplicaSetMonitor : - public ReplicaSetMonitor, +class StreamableReplicaSetMonitor + : public ReplicaSetMonitor, public sdam::TopologyListener, public std::enable_shared_from_this<StreamableReplicaSetMonitor> { @@ -114,6 +114,7 @@ public: void appendInfo(BSONObjBuilder& b, bool forFTDC = false) const; bool isKnownToHaveGoodPrimary() const; + void runScanForMockReplicaSet() override; private: class StreamableReplicaSetMonitorQueryProcessor; diff --git a/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp b/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp index 2cb0633036d..84ea2489b3a 100644 --- a/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor_query_processor.cpp @@ -53,7 +53,7 @@ void StreamableReplicaSetMonitor::StreamableReplicaSetMonitorQueryProcessor:: const auto& setName = newDescription->getSetName(); if (setName) { auto replicaSetMonitor = std::static_pointer_cast<StreamableReplicaSetMonitor>( - globalRSMonitorManager.getMonitor(*setName)); + ReplicaSetMonitorManager::get()->getMonitor(*setName)); if (!replicaSetMonitor) { LOG(kLogLevel) << "could not find rsm instance " << *setName << " for query processing."; |