summaryrefslogtreecommitdiff
path: root/src/mongo/client/replica_set_monitor_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/replica_set_monitor_manager.cpp')
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp305
1 files changed, 150 insertions, 155 deletions
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index 1e81bbbb688..193a799806d 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2020-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -38,8 +38,8 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/client/connection_string.h"
#include "mongo/client/mongo_uri.h"
-#include "mongo/client/replica_set_monitor_params_gen.h"
#include "mongo/client/scanning_replica_set_monitor.h"
+#include "mongo/client/replica_set_monitor.h"
#include "mongo/client/streamable_replica_set_monitor.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface_factory.h"
@@ -47,201 +47,196 @@
#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
-#include "mongo/logv2/log.h"
#include "mongo/platform/mutex.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/util/map_util.h"
+#include "mongo/client/replica_set_monitor_params_gen.h"
-namespace mongo {
+ namespace mongo {
-using std::set;
-using std::shared_ptr;
-using std::string;
-using std::vector;
+ using std::set;
+ using std::shared_ptr;
+ using std::string;
+ using std::vector;
-using executor::NetworkInterface;
-using executor::NetworkInterfaceThreadPool;
-using executor::TaskExecutor;
-using executor::TaskExecutorPool;
-using executor::ThreadPoolTaskExecutor;
+ using executor::NetworkInterface;
+ using executor::NetworkInterfaceThreadPool;
+ using executor::TaskExecutor;
+ using executor::TaskExecutorPool;
+ using executor::ThreadPoolTaskExecutor;
-namespace {
-const auto getGlobalRSMMonitorManager =
- ServiceContext::declareDecoration<ReplicaSetMonitorManager>();
-} // namespace
+ namespace {
+ const auto getGlobalRSMMonitorManager =
+ ServiceContext::declareDecoration<ReplicaSetMonitorManager>();
+ } // namespace
-ReplicaSetMonitorManager::~ReplicaSetMonitorManager() {
- shutdown();
-}
+ ReplicaSetMonitorManager::~ReplicaSetMonitorManager() {
+ shutdown();
+ }
-ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() {
- return &getGlobalRSMMonitorManager(getGlobalServiceContext());
-}
+ ReplicaSetMonitorManager* ReplicaSetMonitorManager::get() {
+ return &getGlobalRSMMonitorManager(getGlobalServiceContext());
+ }
-shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData setName) {
- stdx::lock_guard<Latch> lk(_mutex);
+ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getMonitor(StringData setName) {
+ stdx::lock_guard<Latch> lk(_mutex);
- if (auto monitor = _monitors[setName].lock()) {
- return monitor;
- } else {
- return shared_ptr<ReplicaSetMonitor>();
+ if (auto monitor = _monitors[setName].lock()) {
+ return monitor;
+ } else {
+ return shared_ptr<ReplicaSetMonitor>();
+ }
}
-}
-void ReplicaSetMonitorManager::_setupTaskExecutorInLock() {
- if (_isShutdown || _taskExecutor) {
- // do not restart taskExecutor if is in shutdown
- return;
- }
+ void ReplicaSetMonitorManager::_setupTaskExecutorInLock() {
+ if (_isShutdown || _taskExecutor) {
+ // do not restart taskExecutor if is in shutdown
+ return;
+ }
- // construct task executor
- auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
- auto net = executor::makeNetworkInterface(
- "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList));
- auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
- _taskExecutor = std::make_unique<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
- _taskExecutor->startup();
-}
-
-namespace {
-void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) {
- uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b);
-}
-} // namespace
-
-shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
- const ConnectionString& connStr) {
- return getOrCreateMonitor(MongoURI(connStr));
-}
-
-shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const MongoURI& uri) {
- invariant(uri.type() == ConnectionString::SET);
-
- stdx::lock_guard<Latch> lk(_mutex);
- uassert(ErrorCodes::ShutdownInProgress,
- str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown",
- !_isShutdown);
-
- _setupTaskExecutorInLock();
- const auto& setName = uri.getSetName();
- auto monitor = _monitors[setName].lock();
- if (monitor) {
- uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode());
- return monitor;
+ // construct task executor
+ auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
+ auto net = executor::makeNetworkInterface(
+ "ReplicaSetMonitor-TaskExecutor", nullptr, std::move(hookList));
+ auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
+ _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _taskExecutor->startup();
}
- LOGV2(20186, "Starting new replica set monitor for {uri}", "uri"_attr = uri.toString());
+ namespace {
+ void uassertNotMixingSSL(transport::ConnectSSLMode a, transport::ConnectSSLMode b) {
+ uassert(51042, "Mixing ssl modes with a single replica set is disallowed", a == b);
+ }
+ } // namespace
- if (disableStreamableReplicaSetMonitor.load()) {
- auto newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri);
- _monitors[setName] = newMonitor;
- newMonitor->init();
- return newMonitor;
- } else {
- uasserted(31451, "StreamableReplicaSetMonitor is not yet implemented");
+ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
+ const ConnectionString& connStr) {
+ return getOrCreateMonitor(MongoURI(connStr));
}
-}
-vector<string> ReplicaSetMonitorManager::getAllSetNames() {
- vector<string> allNames;
+ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(
+ const MongoURI& uri) {
+ invariant(uri.type() == ConnectionString::SET);
+ stdx::lock_guard<Latch> lk(_mutex);
+ uassert(ErrorCodes::ShutdownInProgress,
+ str::stream() << "Unable to get monitor for '" << uri << "' due to shutdown",
+ !_isShutdown);
+
+ _setupTaskExecutorInLock();
+ const auto& setName = uri.getSetName();
+ auto monitor = _monitors[setName].lock();
+ if (monitor) {
+ uassertNotMixingSSL(monitor->getOriginalUri().getSSLMode(), uri.getSSLMode());
+ return monitor;
+ }
- stdx::lock_guard<Latch> lk(_mutex);
+ log() << "Starting new replica set monitor for " << uri.toString();
- for (const auto& entry : _monitors) {
- allNames.push_back(entry.first);
+ std::shared_ptr<ReplicaSetMonitor> newMonitor;
+ if (disableStreamableReplicaSetMonitor.load()) {
+ newMonitor = std::make_shared<ScanningReplicaSetMonitor>(uri);
+ newMonitor->init();
+ } else {
+ newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor());
+ }
+ _monitors[setName] = newMonitor;
+ return newMonitor;
}
- return allNames;
-}
+ vector<string> ReplicaSetMonitorManager::getAllSetNames() {
+ vector<string> allNames;
+
+ stdx::lock_guard<Latch> lk(_mutex);
-void ReplicaSetMonitorManager::removeMonitor(StringData setName) {
- stdx::lock_guard<Latch> lk(_mutex);
- ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName);
- if (it != _monitors.end()) {
- if (auto monitor = it->second.lock()) {
- monitor->drop();
+ for (const auto& entry : _monitors) {
+ allNames.push_back(entry.first);
}
- _monitors.erase(it);
- LOGV2(
- 20187, "Removed ReplicaSetMonitor for replica set {setName}", "setName"_attr = setName);
+
+ return allNames;
}
-}
-void ReplicaSetMonitorManager::shutdown() {
- decltype(_monitors) monitors;
- decltype(_taskExecutor) taskExecutor;
- {
+ void ReplicaSetMonitorManager::removeMonitor(StringData setName) {
stdx::lock_guard<Latch> lk(_mutex);
- if (std::exchange(_isShutdown, true)) {
- return;
+ ReplicaSetMonitorsMap::const_iterator it = _monitors.find(setName);
+ if (it != _monitors.end()) {
+ if (auto monitor = it->second.lock()) {
+ monitor->drop();
+ }
+ _monitors.erase(it);
+ log() << "Removed ReplicaSetMonitor for replica set " << setName;
}
-
- monitors = std::exchange(_monitors, {});
- taskExecutor = std::exchange(_taskExecutor, {});
}
- if (taskExecutor) {
- LOGV2_DEBUG(20188, 1, "Shutting down task executor used for monitoring replica sets");
- taskExecutor->shutdown();
- }
+ void ReplicaSetMonitorManager::shutdown() {
+ decltype(_monitors) monitors;
+ decltype(_taskExecutor) taskExecutor;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (std::exchange(_isShutdown, true)) {
+ return;
+ }
+
+ monitors = std::exchange(_monitors, {});
+ taskExecutor = std::exchange(_taskExecutor, {});
+ }
- if (monitors.size()) {
- LOGV2(20189, "Dropping all ongoing scans against replica sets");
- }
- for (auto& [name, monitor] : monitors) {
- auto anchor = monitor.lock();
- if (!anchor) {
- continue;
+ if (taskExecutor) {
+ LOG(1) << "Shutting down task executor used for monitoring replica sets";
+ taskExecutor->shutdown();
}
- anchor->drop();
- }
+ for (auto& [name, monitor] : monitors) {
+ auto anchor = monitor.lock();
+ if (!anchor) {
+ continue;
+ }
+ anchor->drop();
+ }
- if (taskExecutor) {
- taskExecutor->join();
+ if (taskExecutor) {
+ taskExecutor->join();
+ }
}
-}
-void ReplicaSetMonitorManager::removeAllMonitors() {
- shutdown();
+ void ReplicaSetMonitorManager::removeAllMonitors() {
+ shutdown();
- {
- stdx::lock_guard<Latch> lk(_mutex);
- _isShutdown = false;
- }
-}
-
-void ReplicaSetMonitorManager::report(BSONObjBuilder* builder, bool forFTDC) {
- // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the
- // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and
- // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook potentially
- // calling ShardRegistry::updateConfigServerConnectionString.
- auto setNames = getAllSetNames();
-
- BSONObjBuilder setStats(
- builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets"));
-
- for (const auto& setName : setNames) {
- auto monitor = getMonitor(setName);
- if (!monitor) {
- continue;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _isShutdown = false;
}
- monitor->appendInfo(setStats, forFTDC);
}
-}
-TaskExecutor* ReplicaSetMonitorManager::getExecutor() {
- invariant(_taskExecutor);
- return _taskExecutor.get();
-}
+ void ReplicaSetMonitorManager::report(BSONObjBuilder * builder, bool forFTDC) {
+ // Don't hold _mutex the whole time to avoid ever taking a monitor's mutex while holding the
+ // manager's mutex. Otherwise we could get a deadlock between the manager's, monitor's, and
+ // ShardRegistry's mutex due to the ReplicaSetMonitor's AsynchronousConfigChangeHook
+ // potentially calling ShardRegistry::updateConfigServerConnectionString.
+ auto setNames = getAllSetNames();
+
+ BSONObjBuilder setStats(
+ builder->subobjStart(forFTDC ? "replicaSetPingTimesMillis" : "replicaSets"));
+
+ for (const auto& setName : setNames) {
+ auto monitor = getMonitor(setName);
+ if (!monitor) {
+ continue;
+ }
+ monitor->appendInfo(setStats, forFTDC);
+ }
+ }
-ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() {
- return _notifier;
-}
+ std::shared_ptr<executor::TaskExecutor> ReplicaSetMonitorManager::getExecutor() {
+ invariant(_taskExecutor);
+ return _taskExecutor;
+ }
-bool ReplicaSetMonitorManager::isShutdown() const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _isShutdown;
-}
+ ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() {
+ return _notifier;
+ }
+ bool ReplicaSetMonitorManager::isShutdown() const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _isShutdown;
+ }
} // namespace mongo