diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_info.cpp | 8 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 48 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool.idl | 55 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.cpp | 249 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.h | 186 |
8 files changed, 487 insertions, 63 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index a9531a25cac..eda9c1c6315 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1693,6 +1693,7 @@ env.Library( '$BUILD_DIR/mongo/db/auth/saslauth', '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/query_exec', + "$BUILD_DIR/mongo/util/fail_point", 'oplog', 'oplogreader', 'repl_coordinator_interface', diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index 855356c8c9e..e3f26e73513 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -58,10 +58,14 @@ #include "mongo/executor/network_interface.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" #include "mongo/util/map_util.h" namespace mongo { +MONGO_FAIL_POINT_DEFINE(waitInIsMaster); + using std::unique_ptr; using std::list; using std::string; @@ -236,6 +240,10 @@ public: const BSONObj& cmdObj, BSONObjBuilder& result) final { CommandHelpers::handleMarkKillOnClientDisconnect(opCtx); + + // TODO Unwind after SERVER-41070 + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, waitInIsMaster); + /* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not authenticated. */ diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 492db4c5e14..c139d40fe88 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -806,6 +806,7 @@ void ConnectionPool::SpecificPool::triggerShutdown(const Status& status) { processFailure(status); _droppedProcessingPool.clear(); + _eventTimer->cancelTimeout(); } // Drop connections and fail all requests diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 6a7aaf77606..00a42bcc683 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -104,6 +104,7 @@ env.Library( target='sharding_initialization', source=[ 'sharding_initialization.cpp', + 'sharding_task_executor_pool_controller.cpp', env.Idlc('sharding_task_executor_pool.idl')[0], 'client/sharding_connection_hook.cpp', 'client/sharding_network_connection_hook.cpp', @@ -118,6 +119,7 @@ env.Library( LIBDEPS_PRIVATE=[ "$BUILD_DIR/mongo/idl/server_parameter", '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + '$BUILD_DIR/mongo/executor/connection_pool_executor', 'coreshard', 'sharding_task_executor', ], diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 83fd96367c8..ed9364ea133 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -65,6 +65,7 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/sharding_task_executor.h" +#include "mongo/s/sharding_task_executor_pool_controller.h" #include "mongo/s/sharding_task_executor_pool_gen.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" @@ -171,52 +172,11 @@ Status initializeGlobalShardingState(OperationContext* opCtx, return {ErrorCodes::BadValue, "Unrecognized connection string."}; } - // We don't set the ConnectionPool's static const variables to be the default value in - // MONGO_EXPORT_STARTUP_SERVER_PARAMETER because it's not guaranteed to be initialized. - // The following code is a workaround. ConnectionPool::Options connPoolOptions; + connPoolOptions.controller = std::make_shared<ShardingTaskExecutorPoolController>(); - connPoolOptions.minConnections = gShardingTaskExecutorPoolMinConnections; - connPoolOptions.maxConnections = (gShardingTaskExecutorPoolMaxConnections >= 0) - ? gShardingTaskExecutorPoolMaxConnections - : ConnectionPool::kDefaultMaxConns; - connPoolOptions.maxConnecting = (gShardingTaskExecutorPoolMaxConnecting >= 0) - ? gShardingTaskExecutorPoolMaxConnecting - : ConnectionPool::kDefaultMaxConnecting; - - connPoolOptions.hostTimeout = Milliseconds(gShardingTaskExecutorPoolHostTimeoutMS); - connPoolOptions.refreshRequirement = - Milliseconds(gShardingTaskExecutorPoolRefreshRequirementMS); - connPoolOptions.refreshTimeout = Milliseconds(gShardingTaskExecutorPoolRefreshTimeoutMS); - - if (connPoolOptions.refreshRequirement <= connPoolOptions.refreshTimeout) { - auto newRefreshTimeout = connPoolOptions.refreshRequirement - Milliseconds(1); - warning() << "ShardingTaskExecutorPoolRefreshRequirementMS (" - << connPoolOptions.refreshRequirement - << ") set below ShardingTaskExecutorPoolRefreshTimeoutMS (" - << connPoolOptions.refreshTimeout - << "). Adjusting ShardingTaskExecutorPoolRefreshTimeoutMS to " - << newRefreshTimeout; - connPoolOptions.refreshTimeout = newRefreshTimeout; - } - - if (connPoolOptions.hostTimeout <= - connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout) { - auto newHostTimeout = - connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout + Milliseconds(1); - warning() << "ShardingTaskExecutorPoolHostTimeoutMS (" << connPoolOptions.hostTimeout - << ") set below ShardingTaskExecutorPoolRefreshRequirementMS (" - << connPoolOptions.refreshRequirement - << ") + ShardingTaskExecutorPoolRefreshTimeoutMS (" - << connPoolOptions.refreshTimeout - << "). Adjusting ShardingTaskExecutorPoolHostTimeoutMS to " << newHostTimeout; - connPoolOptions.hostTimeout = newHostTimeout; - } - - auto network = executor::makeNetworkInterface("ShardRegistry", - std::make_unique<ShardingNetworkConnectionHook>(), - hookBuilder(), - connPoolOptions); + auto network = executor::makeNetworkInterface( + "ShardRegistry", std::make_unique<ShardingNetworkConnectionHook>(), hookBuilder()); auto networkPtr = network.get(); auto executorPool = makeShardingTaskExecutorPool( std::move(network), hookBuilder, connPoolOptions, taskExecutorPoolSize); diff --git a/src/mongo/s/sharding_task_executor_pool.idl b/src/mongo/s/sharding_task_executor_pool.idl index 390dbc5a17b..dbced832b1b 100644 --- a/src/mongo/s/sharding_task_executor_pool.idl +++ b/src/mongo/s/sharding_task_executor_pool.idl @@ -28,49 +28,66 @@ global: cpp_namespace: "mongo" + cpp_includes: + - "mongo/s/sharding_task_executor_pool_controller.h" server_parameters: ShardingTaskExecutorPoolMinSize: description: <- The minimum number of connections for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolMinConnections" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.minConnections" + validator: + gte: 0 default: 1 ShardingTaskExecutorPoolMaxSize: description: <- The maximum number of connections for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolMaxConnections" - default: -1 + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnections" + validator: + gte: 1 + default: 32767 ShardingTaskExecutorPoolMaxConnecting: description: <- The maximum number of in-flight connections for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolMaxConnecting" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnecting" + validator: + gte: 1 default: 2 ShardingTaskExecutorPoolHostTimeoutMS: description: <- The timeout for dropping a host for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolHostTimeoutMS" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.hostTimeoutMS" + validator: + callback: "ShardingTaskExecutorPoolController::validateHostTimeout" + gte: 1 default: 300000 # 5mins ShardingTaskExecutorPoolRefreshRequirementMS: description: <- The timeout before a connection needs to be refreshed for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolRefreshRequirementMS" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.toRefreshTimeoutMS" + validator: + gte: 1 default: 60000 # 1min ShardingTaskExecutorPoolRefreshTimeoutMS: description: <- The timeout for refreshing a connection for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolRefreshTimeoutMS" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.pendingTimeoutMS" + validator: + callback: "ShardingTaskExecutorPoolController::validatePendingTimeout" + gte: 1 default: 20000 # 20secs + ShardingTaskExecutorPoolReplicaSetMatching: + description: <- + Enables ReplicaSet member connection matching. + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.matchingStrategyString" + on_update: "ShardingTaskExecutorPoolController::onUpdateMatchingStrategy" + default: "disabled" diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp new file mode 100644 index 00000000000..77d694e2f16 --- /dev/null +++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp @@ -0,0 +1,249 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kConnectionPool + +#include "mongo/platform/basic.h" + +#include "mongo/client/replica_set_monitor.h" +#include "mongo/s/sharding_task_executor_pool_controller.h" +#include "mongo/util/log.h" + +namespace mongo { + +Status ShardingTaskExecutorPoolController::validateHostTimeout(const int& hostTimeoutMS) { + auto toRefreshTimeoutMS = gParameters.toRefreshTimeoutMS.load(); + auto pendingTimeoutMS = gParameters.pendingTimeoutMS.load(); + if (hostTimeoutMS >= (toRefreshTimeoutMS + pendingTimeoutMS)) { + return Status::OK(); + } + + std::string msg = str::stream() + << "ShardingTaskExecutorPoolHostTimeoutMS (" << hostTimeoutMS + << ") set below ShardingTaskExecutorPoolRefreshRequirementMS (" << toRefreshTimeoutMS + << ") + ShardingTaskExecutorPoolRefreshTimeoutMS (" << pendingTimeoutMS << ")."; + return Status(ErrorCodes::BadValue, msg); +} + +Status ShardingTaskExecutorPoolController::validatePendingTimeout(const int& pendingTimeoutMS) { + auto toRefreshTimeoutMS = gParameters.toRefreshTimeoutMS.load(); + if (pendingTimeoutMS < toRefreshTimeoutMS) { + return Status::OK(); + } + + std::string msg = str::stream() + << "ShardingTaskExecutorPoolRefreshRequirementMS (" << toRefreshTimeoutMS + << ") set below ShardingTaskExecutorPoolRefreshTimeoutMS (" << pendingTimeoutMS << ")."; + return Status(ErrorCodes::BadValue, msg); +} + +Status ShardingTaskExecutorPoolController::onUpdateMatchingStrategy(const std::string& str) { + // TODO Fix up after SERVER-40224 + if (str == "disabled") { + gParameters.matchingStrategy.store(MatchingStrategy::kDisabled); + } else if (str == "matchPrimaryNode") { + gParameters.matchingStrategy.store(MatchingStrategy::kMatchPrimaryNode); + // TODO Reactive once the prediction pattern is fixed in SERVER-41602 + //} else if (str == "matchBusiestNode") { + // gParameters.matchingStrategy.store(MatchingStrategy::kMatchBusiestNode); + } else { + return Status{ErrorCodes::BadValue, + str::stream() << "Unrecognized matching strategy '" << str << "'"}; + } + + return Status::OK(); +} + +void ShardingTaskExecutorPoolController::_addGroup(WithLock, + const ReplicaSetChangeNotifier::State& state) { + // Replace the last group + auto& group = _hostGroups[state.connStr.getSetName()]; + group = std::make_shared<HostGroupData>(); + group->state = state; + + // Mark each host with this group + for (auto& host : state.connStr.getServers()) { + _hostGroupsByHost[host] = group; + } +} + +void ShardingTaskExecutorPoolController::_removeGroup(WithLock, const std::string& name) { + auto it = _hostGroups.find(name); + if (it == _hostGroups.end()) { + return; + } + + auto& hostGroup = it->second; + for (auto& host : hostGroup->state.connStr.getServers()) { + _hostGroupsByHost.erase(host); + } + _hostGroups.erase(it); +} + +class ShardingTaskExecutorPoolController::ReplicaSetChangeListener final + : public ReplicaSetChangeNotifier::Listener { +public: + explicit ReplicaSetChangeListener(ShardingTaskExecutorPoolController* controller) + : _controller(controller) {} + + void onFoundSet(const Key& key) override { + // Do nothing + } + + void onConfirmedSet(const State& state) override { + stdx::lock_guard lk(_controller->_mutex); + + _controller->_removeGroup(lk, state.connStr.getSetName()); + _controller->_addGroup(lk, state); + } + + void onPossibleSet(const State& state) override { + // Do nothing + } + + void onDroppedSet(const Key& key) override { + stdx::lock_guard lk(_controller->_mutex); + + _controller->_removeGroup(lk, key); + } + +private: + ShardingTaskExecutorPoolController* const _controller; +}; + +void ShardingTaskExecutorPoolController::init(ConnectionPool* parent) { + ControllerInterface::init(parent); + _listener = ReplicaSetMonitor::getNotifier().makeListener<ReplicaSetChangeListener>(this); +} + +auto ShardingTaskExecutorPoolController::updateHost(const SpecificPool* pool, + const HostAndPort& host, + const HostState& stats) -> HostGroup { + stdx::lock_guard lk(_mutex); + + auto& data = _poolData[pool]; + + const size_t minConns = gParameters.minConnections.load(); + const size_t maxConns = gParameters.maxConnections.load(); + + // Update the target for just the pool first + data.target = stats.requests + stats.active; + + if (data.target < minConns) { + data.target = minConns; + } else if (data.target > maxConns) { + data.target = maxConns; + } + + data.isAbleToShutdown = stats.health.isExpired; + + // If the pool isn't in a group, we can return now + auto it = _hostGroupsByHost.find(host); + if (it == _hostGroupsByHost.end()) { + return {{host}, data.isAbleToShutdown}; + } + + // If the pool has a group, then update the group + auto& hostGroup = it->second; + data.hostGroup = hostGroup; + + // Make sure we're part of the group + hostGroup->pools.insert(pool); + + switch (gParameters.matchingStrategy.load()) { + case MatchingStrategy::kMatchPrimaryNode: { + if (hostGroup->state.primary == host) { + hostGroup->target = data.target; + } + } break; + case MatchingStrategy::kMatchBusiestNode: { + hostGroup->target = std::max(hostGroup->target, data.target); + } break; + case MatchingStrategy::kDisabled: { + // Nothing + } break; + }; + + if (hostGroup->target < minConns) { + hostGroup->target = minConns; + } else if (hostGroup->target > maxConns) { + hostGroup->target = maxConns; + } + + auto shouldShutdown = data.isAbleToShutdown && + std::all_of(hostGroup->pools.begin(), hostGroup->pools.end(), [&](auto otherPool) { + return _poolData[otherPool].isAbleToShutdown; + }); + return {hostGroup->state.connStr.getServers(), shouldShutdown}; +} + +void ShardingTaskExecutorPoolController::removeHost(const SpecificPool* pool) { + stdx::lock_guard lk(_mutex); + auto it = _poolData.find(pool); + if (it == _poolData.end()) { + // It's possible that a host immediately needs to go before it updates even once + return; + } + + auto& data = it->second; + if (auto hostGroup = data.hostGroup.lock()) { + hostGroup->pools.erase(pool); + } + _poolData.erase(it); +} + +auto ShardingTaskExecutorPoolController::getControls(const SpecificPool* pool) + -> ConnectionControls { + stdx::lock_guard lk(_mutex); + auto& data = _poolData[pool]; + + const size_t maxPending = gParameters.maxConnecting.load(); + + auto hostGroup = data.hostGroup.lock(); + if (!hostGroup || gParameters.matchingStrategy.load() == MatchingStrategy::kDisabled) { + return {maxPending, data.target}; + } + + auto target = std::max(data.target, hostGroup->target); + return {maxPending, target}; +} + +Milliseconds ShardingTaskExecutorPoolController::hostTimeout() const { + return Milliseconds{gParameters.hostTimeoutMS.load()}; +} + +Milliseconds ShardingTaskExecutorPoolController::pendingTimeout() const { + return Milliseconds{gParameters.pendingTimeoutMS.load()}; +} + +Milliseconds ShardingTaskExecutorPoolController::toRefreshTimeout() const { + return Milliseconds{gParameters.toRefreshTimeoutMS.load()}; +} + +} // namespace mongo diff --git a/src/mongo/s/sharding_task_executor_pool_controller.h b/src/mongo/s/sharding_task_executor_pool_controller.h new file mode 100644 index 00000000000..d61f707de73 --- /dev/null +++ b/src/mongo/s/sharding_task_executor_pool_controller.h @@ -0,0 +1,186 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" +#include "mongo/client/replica_set_change_notifier.h" +#include "mongo/executor/connection_pool.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/unordered_map.h" + +namespace mongo { + +/** + * A special Controller for the sharding ConnectionPool + * + * This class has two special members: + * * A global set of synchronized Parameters for the ShardingTaskExecutorPool server parameters + * * A ReplicaSetChangeListener to inform it of changes to replica set membership + * + * When the MatchingStrategy from its Parameters is kDisabled, this class operates much like the + * LimitController but with its limits allowed to shift at runtime (via Parameters). + * + * When the MatchingStrategy is kMatchPrimaryNode, the limits are obeyed but, when the pool for a + * primary member calls updateHost, it can increase the targetConnections for the pool of each other + * member of its replica set. Note that this will, at time of writing, follow the "hosts" field + * from the primary isMaster combined with the seed list for the replica set. If the seed list were + * to include arbiters or hidden members, then they would also be subject to these constraints. + * + * When the MatchingStrategy is kMatchBusiestNode, it operates like kMatchPrimaryNode, but any pool + * can be responsible for increasing the targetConnections of each member of its set. + * + * Note that, in essence, there are three outside elements that can mutate the state of this class: + * * The ReplicaSetChangeNotifier can notify the listener which updates the host groups + * * The ServerParameters can update the Parameters which will used in the next update + * * The SpecificPools for its ConnectionPool can updateHost with their individual States + */ +class ShardingTaskExecutorPoolController final + : public executor::ConnectionPool::ControllerInterface { + class ReplicaSetChangeListener; + +public: + using ConnectionPool = executor::ConnectionPool; + + enum class MatchingStrategy { + kDisabled, + kMatchPrimaryNode, + kMatchBusiestNode, + }; + + class Parameters { + public: + AtomicWord<int> minConnections; + AtomicWord<int> maxConnections; + AtomicWord<int> maxConnecting; + + AtomicWord<int> hostTimeoutMS; + AtomicWord<int> pendingTimeoutMS; + AtomicWord<int> toRefreshTimeoutMS; + + synchronized_value<std::string> matchingStrategyString; + AtomicWord<MatchingStrategy> matchingStrategy; + }; + + static inline Parameters gParameters; + + /** + * Validate that hostTimeoutMS is greater than the sum of pendingTimeoutMS and + * toRefreshTimeoutMS + */ + static Status validateHostTimeout(const int& hostTimeoutMS); + + /** + * Validate that pendingTimeoutMS is less than toRefreshTimeoutMS + */ + static Status validatePendingTimeout(const int& pendingTimeoutMS); + + /** + * Matches the matching strategy string against a set of literals + * and either sets gParameters.matchingStrategy or returns !Status::isOK(). + */ + static Status onUpdateMatchingStrategy(const std::string& str); + + ShardingTaskExecutorPoolController() = default; + ShardingTaskExecutorPoolController& operator=(ShardingTaskExecutorPoolController&&) = delete; + + void init(ConnectionPool* parent) override; + + HostGroup updateHost(const SpecificPool* pool, + const HostAndPort& host, + const HostState& stats) override; + void removeHost(const SpecificPool* pool) override; + + ConnectionControls getControls(const SpecificPool* pool) override; + + Milliseconds hostTimeout() const override; + Milliseconds pendingTimeout() const override; + Milliseconds toRefreshTimeout() const override; + + StringData name() const override { + return "ShardingTaskExecutorPoolController"_sd; + } + +private: + void _addGroup(WithLock, const ReplicaSetChangeNotifier::State& state); + void _removeGroup(WithLock, const std::string& key); + + /** + * HostGroup is a shared state for a set of hosts (a replica set). + * + * When the ReplicaSetChangeListener is informed of a change to a replica set, + * it creates a new HostGroup and fills it into _hostGroups[setName] and + * _hostGroupsByHost[memberHost]. This does not immediately affect the results of getControls. + * + * When a SpecificPool calls updateHost, it checks _hostGroupsByHost to see if it belongs to + * any group and pushes itself into hostData for that group. It then will update target for its + * group according to the MatchingStrategy. It will also set shouldShutdown to true if every + * member of the group has shouldShutdown at true. + * + * Note that a HostData can find itself orphaned from its HostGroup during a reconfig. + */ + struct HostGroupData { + // The ReplicaSet state for this set + ReplicaSetChangeNotifier::State state; + + // Pointer index for each pool in the set + stdx::unordered_set<const SpecificPool*> pools; + + // The number of connections that all hosts in the group should maintain + size_t target = 0; + }; + + /** + * HostData represents the current state for a specific HostAndPort/SpecificPool. + * + * It is mutated by updateHost/removeHost and used along with Parameters to form Controls + * for getControls. + */ + struct HostData { + // The HostGroup associated with this pool. + // Note that this will be invalid if there was a replica set change + std::weak_ptr<HostGroupData> hostGroup; + + // The number of connections the host should maintain + size_t target = 0; + + // This host is able to shutdown + bool isAbleToShutdown = false; + }; + + ReplicaSetChangeListenerHandle _listener; + + stdx::mutex _mutex; + + stdx::unordered_map<const SpecificPool*, HostData> _poolData; + stdx::unordered_map<std::string, std::shared_ptr<HostGroupData>> _hostGroups; + stdx::unordered_map<HostAndPort, std::shared_ptr<HostGroupData>> _hostGroupsByHost; +}; +} // namespace mongo |