diff options
Diffstat (limited to 'src/mongo/client/replica_set_monitor_manager.cpp')
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.cpp | 305 |
1 files changed, 150 insertions, 155 deletions
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index 1e81bbbb688..193a799806d 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2020-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -38,8 +38,8 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/connection_string.h" #include "mongo/client/mongo_uri.h" -#include "mongo/client/replica_set_monitor_params_gen.h" #include "mongo/client/scanning_replica_set_monitor.h" +#include "mongo/client/replica_set_monitor.h" #include "mongo/client/streamable_replica_set_monitor.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface_factory.h" @@ -47,201 +47,196 @@ #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" -#include "mongo/logv2/log.h" #include "mongo/platform/mutex.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/util/map_util.h" +#include "mongo/client/replica_set_monitor_params_gen.h" -namespace mongo { + namespace mongo { -using std::set; -using std::shared_ptr; -using std::string; -using std::vector; + using std::set; + using std::shared_ptr; + using std::string; + using std::vector; -using executor::NetworkInterface; -using executor::NetworkInterfaceThreadPool; -using executor::TaskExecutor; -using executor::TaskExecutorPool; -using executor::ThreadPoolTaskExecutor; + using executor::NetworkInterface; + using executor::NetworkInterfaceThreadPool; + using executor::TaskExecutor; + using executor::TaskExecutorPool; + using executor::ThreadPoolTaskExecutor; -namespace { -const auto getGlobalRSMMonitorManager = - ServiceContext::declareDecoration<ReplicaSetMonitorManager>(); -} // namespace + namespace { + const auto getGlobalRSMMonitorManager = + ServiceContext::declareDecoration<ReplicaSetMonitorManager>(); + } // namespace -ReplicaSetMonitorManager::~ReplicaSetMonitorManager() { - shutdown(); -} + ReplicaSetMonitorManager::~ReplicaSetMonitorManager() { + shutdown(); + } -ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() { - return &getGlobalRSMMonitorManager(getGlobalServiceContext()); -} + ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() { + return &getGlobalRSMMonitorManager(getGlobalServiceContext()); + } -shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData setName) { - stdx::lock_guard<Latch> lk(_mutex); + shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData setName) { + stdx::lock_guard<Latch> lk(_mutex); - if (auto monitor = _monitors[setName].lock()) { - return monitor; - } else { - return shared_ptr<ReplicaSetMonitor>(); + if (auto monitor = _monitors[setName].lock()) { + return monitor; + } else { + return shared_ptr<ReplicaSetMonitor>(); + } } -} -void ReplicaSetMonitorManager::_setupTaskExecutorInLock() { - if (_isShutdown || _taskExecutor) { - // do not restart taskExecutor if is in shutdown - return; - } + void ReplicaSetMonitorManager::_setupTaskExecutorInLock() { + if (_isShutdown || _taskExecutor) { + // do not restart taskExecutor if is in shutdown + return; + } - // construct task executor - auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); - auto net = executor::makeNetworkInterface( - "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList)); - auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); - _taskExecutor = std::make_unique<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); - _taskExecutor->startup(); -} - -namespace { -void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) { - uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b); -} -} // namespace - -shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( - const ConnectionString& connStr) { - return getOrCreateMonitor(MongoURI(connStr)); -} - -shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const MongoURI& uri) { - invariant(uri.type() == ConnectionString::SET); - - stdx::lock_guard<Latch> lk(_mutex); - uassert(ErrorCodes::ShutdownInProgress, - str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown", - !_isShutdown); - - _setupTaskExecutorInLock(); - const auto& setName = uri.getSetName(); - auto monitor = _monitors[setName].lock(); - if (monitor) { - uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode()); - return monitor; + // construct task executor + auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); + auto net = executor::makeNetworkInterface( + "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList)); + auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); + _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _taskExecutor->startup(); } - LOGV2(20186, "Starting new replica set monitor for {uri}", "uri"_attr = uri.toString()); + namespace { + void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) { + uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b); + } + } // namespace - if (disableStreamableReplicaSetMonitor.load()) { - auto newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri); - _monitors[setName] = newMonitor; - newMonitor->init(); - return newMonitor; - } else { - uasserted(31451, "StreamableReplicaSetMonitor is not yet implemented"); + shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( + const ConnectionString& connStr) { + return getOrCreateMonitor(MongoURI(connStr)); } -} -vector<string> ReplicaSetMonitorManager::getAllSetNames() { - vector<string> allNames; + shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor( + const MongoURI& uri) { + invariant(uri.type() == ConnectionString::SET); + stdx::lock_guard<Latch> lk(_mutex); + uassert(ErrorCodes::ShutdownInProgress, + str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown", + !_isShutdown); + + _setupTaskExecutorInLock(); + const auto& setName = uri.getSetName(); + auto monitor = _monitors[setName].lock(); + if (monitor) { + uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode()); + return monitor; + } - stdx::lock_guard<Latch> lk(_mutex); + log() << "Starting new replica set monitor for " << uri.toString(); - for (const auto& entry : _monitors) { - allNames.push_back(entry.first); + std::shared_ptr<ReplicaSetMonitor> newMonitor; + if (disableStreamableReplicaSetMonitor.load()) { + newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri); + newMonitor->init(); + } else { + newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor()); + } + _monitors[setName] = newMonitor; + return newMonitor; } - return allNames; -} + vector<string> ReplicaSetMonitorManager::getAllSetNames() { + vector<string> allNames; + + stdx::lock_guard<Latch> lk(_mutex); -void ReplicaSetMonitorManager::removeMonitor(StringData setName) { - stdx::lock_guard<Latch> lk(_mutex); - ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName); - if (it != _monitors.end()) { - if (auto monitor = it->second.lock()) { - monitor->drop(); + for (const auto& entry : _monitors) { + allNames.push_back(entry.first); } - _monitors.erase(it); - LOGV2( - 20187, "Removed ReplicaSetMonitor for replica set {setName}", "setName"_attr = setName); + + return allNames; } -} -void ReplicaSetMonitorManager::shutdown() { - decltype(_monitors) monitors; - decltype(_taskExecutor) taskExecutor; - { + void ReplicaSetMonitorManager::removeMonitor(StringData setName) { stdx::lock_guard<Latch> lk(_mutex); - if (std::exchange(_isShutdown, true)) { - return; + ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName); + if (it != _monitors.end()) { + if (auto monitor = it->second.lock()) { + monitor->drop(); + } + _monitors.erase(it); + log() << "Removed ReplicaSetMonitor for replica set " << setName; } - - monitors = std::exchange(_monitors, {}); - taskExecutor = std::exchange(_taskExecutor, {}); } - if (taskExecutor) { - LOGV2_DEBUG(20188, 1, "Shutting down task executor used for monitoring replica sets"); - taskExecutor->shutdown(); - } + void ReplicaSetMonitorManager::shutdown() { + decltype(_monitors) monitors; + decltype(_taskExecutor) taskExecutor; + { + stdx::lock_guard<Latch> lk(_mutex); + if (std::exchange(_isShutdown, true)) { + return; + } + + monitors = std::exchange(_monitors, {}); + taskExecutor = std::exchange(_taskExecutor, {}); + } - if (monitors.size()) { - LOGV2(20189, "Dropping all ongoing scans against replica sets"); - } - for (auto& [name, monitor] : monitors) { - auto anchor = monitor.lock(); - if (!anchor) { - continue; + if (taskExecutor) { + LOG(1) << "Shutting down task executor used for monitoring replica sets"; + taskExecutor->shutdown(); } - anchor->drop(); - } + for (auto& [name, monitor] : monitors) { + auto anchor = monitor.lock(); + if (!anchor) { + continue; + } + anchor->drop(); + } - if (taskExecutor) { - taskExecutor->join(); + if (taskExecutor) { + taskExecutor->join(); + } } -} -void ReplicaSetMonitorManager::removeAllMonitors() { - shutdown(); + void ReplicaSetMonitorManager::removeAllMonitors() { + shutdown(); - { - stdx::lock_guard<Latch> lk(_mutex); - _isShutdown = false; - } -} - -void ReplicaSetMonitorManager::report(BSONObjBuilder* builder, bool forFTDC) { - // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the - // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and - // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook potentially - // calling ShardRegistry::updateConfigServerConnectionString. - auto setNames = getAllSetNames(); - - BSONObjBuilder setStats( - builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets")); - - for (const auto& setName : setNames) { - auto monitor = getMonitor(setName); - if (!monitor) { - continue; + { + stdx::lock_guard<Latch> lk(_mutex); + _isShutdown = false; } - monitor->appendInfo(setStats, forFTDC); } -} -TaskExecutor* ReplicaSetMonitorManager::getExecutor() { - invariant(_taskExecutor); - return _taskExecutor.get(); -} + void ReplicaSetMonitorManager::report(BSONObjBuilder * builder, bool forFTDC) { + // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the + // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and + // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook + // potentially calling ShardRegistry::updateConfigServerConnectionString. + auto setNames = getAllSetNames(); + + BSONObjBuilder setStats( + builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets")); + + for (const auto& setName : setNames) { + auto monitor = getMonitor(setName); + if (!monitor) { + continue; + } + monitor->appendInfo(setStats, forFTDC); + } + } -ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() { - return _notifier; -} + std::shared_ptr<executor::TaskExecutor> ReplicaSetMonitorManager::getExecutor() { + invariant(_taskExecutor); + return _taskExecutor; + } -bool ReplicaSetMonitorManager::isShutdown() const { - stdx::lock_guard<Latch> lk(_mutex); - return _isShutdown; -} + ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() { + return _notifier; + } + bool ReplicaSetMonitorManager::isShutdown() const { + stdx::lock_guard<Latch> lk(_mutex); + return _isShutdown; + } } // namespace mongo |