summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/replication_info.cpp8
-rw-r--r--src/mongo/executor/connection_pool.cpp1
-rw-r--r--src/mongo/s/SConscript2
-rw-r--r--src/mongo/s/sharding_initialization.cpp48
-rw-r--r--src/mongo/s/sharding_task_executor_pool.idl55
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.cpp249
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.h186
8 files changed, 487 insertions, 63 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index a9531a25cac..eda9c1c6315 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1693,6 +1693,7 @@ env.Library(
'$BUILD_DIR/mongo/db/auth/saslauth',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/query_exec',
+ "$BUILD_DIR/mongo/util/fail_point",
'oplog',
'oplogreader',
'repl_coordinator_interface',
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index 855356c8c9e..e3f26e73513 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -58,10 +58,14 @@
#include "mongo/executor/network_interface.h"
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/log.h"
#include "mongo/util/map_util.h"
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(waitInIsMaster);
+
using std::unique_ptr;
using std::list;
using std::string;
@@ -236,6 +240,10 @@ public:
const BSONObj& cmdObj,
BSONObjBuilder& result) final {
CommandHelpers::handleMarkKillOnClientDisconnect(opCtx);
+
+ // TODO Unwind after SERVER-41070
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, waitInIsMaster);
+
/* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not
authenticated.
*/
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 492db4c5e14..c139d40fe88 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -806,6 +806,7 @@ void ConnectionPool::SpecificPool::triggerShutdown(const Status& status) {
processFailure(status);
_droppedProcessingPool.clear();
+ _eventTimer->cancelTimeout();
}
// Drop connections and fail all requests
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 6a7aaf77606..00a42bcc683 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -104,6 +104,7 @@ env.Library(
target='sharding_initialization',
source=[
'sharding_initialization.cpp',
+ 'sharding_task_executor_pool_controller.cpp',
env.Idlc('sharding_task_executor_pool.idl')[0],
'client/sharding_connection_hook.cpp',
'client/sharding_network_connection_hook.cpp',
@@ -118,6 +119,7 @@ env.Library(
LIBDEPS_PRIVATE=[
"$BUILD_DIR/mongo/idl/server_parameter",
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
+ '$BUILD_DIR/mongo/executor/connection_pool_executor',
'coreshard',
'sharding_task_executor',
],
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 83fd96367c8..ed9364ea133 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -65,6 +65,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/sharding_task_executor.h"
+#include "mongo/s/sharding_task_executor_pool_controller.h"
#include "mongo/s/sharding_task_executor_pool_gen.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
@@ -171,52 +172,11 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
return {ErrorCodes::BadValue, "Unrecognized connection string."};
}
- // We don't set the ConnectionPool's static const variables to be the default value in
- // MONGO_EXPORT_STARTUP_SERVER_PARAMETER because it's not guaranteed to be initialized.
- // The following code is a workaround.
ConnectionPool::Options connPoolOptions;
+ connPoolOptions.controller = std::make_shared<ShardingTaskExecutorPoolController>();
- connPoolOptions.minConnections = gShardingTaskExecutorPoolMinConnections;
- connPoolOptions.maxConnections = (gShardingTaskExecutorPoolMaxConnections >= 0)
- ? gShardingTaskExecutorPoolMaxConnections
- : ConnectionPool::kDefaultMaxConns;
- connPoolOptions.maxConnecting = (gShardingTaskExecutorPoolMaxConnecting >= 0)
- ? gShardingTaskExecutorPoolMaxConnecting
- : ConnectionPool::kDefaultMaxConnecting;
-
- connPoolOptions.hostTimeout = Milliseconds(gShardingTaskExecutorPoolHostTimeoutMS);
- connPoolOptions.refreshRequirement =
- Milliseconds(gShardingTaskExecutorPoolRefreshRequirementMS);
- connPoolOptions.refreshTimeout = Milliseconds(gShardingTaskExecutorPoolRefreshTimeoutMS);
-
- if (connPoolOptions.refreshRequirement <= connPoolOptions.refreshTimeout) {
- auto newRefreshTimeout = connPoolOptions.refreshRequirement - Milliseconds(1);
- warning() << "ShardingTaskExecutorPoolRefreshRequirementMS ("
- << connPoolOptions.refreshRequirement
- << ") set below ShardingTaskExecutorPoolRefreshTimeoutMS ("
- << connPoolOptions.refreshTimeout
- << "). Adjusting ShardingTaskExecutorPoolRefreshTimeoutMS to "
- << newRefreshTimeout;
- connPoolOptions.refreshTimeout = newRefreshTimeout;
- }
-
- if (connPoolOptions.hostTimeout <=
- connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout) {
- auto newHostTimeout =
- connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout + Milliseconds(1);
- warning() << "ShardingTaskExecutorPoolHostTimeoutMS (" << connPoolOptions.hostTimeout
- << ") set below ShardingTaskExecutorPoolRefreshRequirementMS ("
- << connPoolOptions.refreshRequirement
- << ") + ShardingTaskExecutorPoolRefreshTimeoutMS ("
- << connPoolOptions.refreshTimeout
- << "). Adjusting ShardingTaskExecutorPoolHostTimeoutMS to " << newHostTimeout;
- connPoolOptions.hostTimeout = newHostTimeout;
- }
-
- auto network = executor::makeNetworkInterface("ShardRegistry",
- std::make_unique<ShardingNetworkConnectionHook>(),
- hookBuilder(),
- connPoolOptions);
+ auto network = executor::makeNetworkInterface(
+ "ShardRegistry", std::make_unique<ShardingNetworkConnectionHook>(), hookBuilder());
auto networkPtr = network.get();
auto executorPool = makeShardingTaskExecutorPool(
std::move(network), hookBuilder, connPoolOptions, taskExecutorPoolSize);
diff --git a/src/mongo/s/sharding_task_executor_pool.idl b/src/mongo/s/sharding_task_executor_pool.idl
index 390dbc5a17b..dbced832b1b 100644
--- a/src/mongo/s/sharding_task_executor_pool.idl
+++ b/src/mongo/s/sharding_task_executor_pool.idl
@@ -28,49 +28,66 @@
global:
cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/s/sharding_task_executor_pool_controller.h"
server_parameters:
ShardingTaskExecutorPoolMinSize:
description: <-
The minimum number of connections for each executor in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolMinConnections"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.minConnections"
+ validator:
+ gte: 0
default: 1
ShardingTaskExecutorPoolMaxSize:
description: <-
The maximum number of connections for each executor in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolMaxConnections"
- default: -1
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnections"
+ validator:
+ gte: 1
+ default: 32767
ShardingTaskExecutorPoolMaxConnecting:
description: <-
The maximum number of in-flight connections for each executor
in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolMaxConnecting"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnecting"
+ validator:
+ gte: 1
default: 2
ShardingTaskExecutorPoolHostTimeoutMS:
description: <-
The timeout for dropping a host for each executor in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolHostTimeoutMS"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.hostTimeoutMS"
+ validator:
+ callback: "ShardingTaskExecutorPoolController::validateHostTimeout"
+ gte: 1
default: 300000 # 5mins
ShardingTaskExecutorPoolRefreshRequirementMS:
description: <-
The timeout before a connection needs to be refreshed for each executor
in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolRefreshRequirementMS"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.toRefreshTimeoutMS"
+ validator:
+ gte: 1
default: 60000 # 1min
ShardingTaskExecutorPoolRefreshTimeoutMS:
description: <-
The timeout for refreshing a connection for each executor in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolRefreshTimeoutMS"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.pendingTimeoutMS"
+ validator:
+ callback: "ShardingTaskExecutorPoolController::validatePendingTimeout"
+ gte: 1
default: 20000 # 20secs
+ ShardingTaskExecutorPoolReplicaSetMatching:
+ description: <-
+ Enables ReplicaSet member connection matching.
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.matchingStrategyString"
+ on_update: "ShardingTaskExecutorPoolController::onUpdateMatchingStrategy"
+ default: "disabled"
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp
new file mode 100644
index 00000000000..77d694e2f16
--- /dev/null
+++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp
@@ -0,0 +1,249 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kConnectionPool
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/s/sharding_task_executor_pool_controller.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+Status ShardingTaskExecutorPoolController::validateHostTimeout(const int& hostTimeoutMS) {
+ auto toRefreshTimeoutMS = gParameters.toRefreshTimeoutMS.load();
+ auto pendingTimeoutMS = gParameters.pendingTimeoutMS.load();
+ if (hostTimeoutMS >= (toRefreshTimeoutMS + pendingTimeoutMS)) {
+ return Status::OK();
+ }
+
+ std::string msg = str::stream()
+ << "ShardingTaskExecutorPoolHostTimeoutMS (" << hostTimeoutMS
+ << ") set below ShardingTaskExecutorPoolRefreshRequirementMS (" << toRefreshTimeoutMS
+ << ") + ShardingTaskExecutorPoolRefreshTimeoutMS (" << pendingTimeoutMS << ").";
+ return Status(ErrorCodes::BadValue, msg);
+}
+
+Status ShardingTaskExecutorPoolController::validatePendingTimeout(const int& pendingTimeoutMS) {
+ auto toRefreshTimeoutMS = gParameters.toRefreshTimeoutMS.load();
+ if (pendingTimeoutMS < toRefreshTimeoutMS) {
+ return Status::OK();
+ }
+
+ std::string msg = str::stream()
+ << "ShardingTaskExecutorPoolRefreshRequirementMS (" << toRefreshTimeoutMS
+ << ") set below ShardingTaskExecutorPoolRefreshTimeoutMS (" << pendingTimeoutMS << ").";
+ return Status(ErrorCodes::BadValue, msg);
+}
+
+Status ShardingTaskExecutorPoolController::onUpdateMatchingStrategy(const std::string& str) {
+ // TODO Fix up after SERVER-40224
+ if (str == "disabled") {
+ gParameters.matchingStrategy.store(MatchingStrategy::kDisabled);
+ } else if (str == "matchPrimaryNode") {
+ gParameters.matchingStrategy.store(MatchingStrategy::kMatchPrimaryNode);
+ // TODO Reactive once the prediction pattern is fixed in SERVER-41602
+ //} else if (str == "matchBusiestNode") {
+ // gParameters.matchingStrategy.store(MatchingStrategy::kMatchBusiestNode);
+ } else {
+ return Status{ErrorCodes::BadValue,
+ str::stream() << "Unrecognized matching strategy '" << str << "'"};
+ }
+
+ return Status::OK();
+}
+
+void ShardingTaskExecutorPoolController::_addGroup(WithLock,
+ const ReplicaSetChangeNotifier::State& state) {
+ // Replace the last group
+ auto& group = _hostGroups[state.connStr.getSetName()];
+ group = std::make_shared<HostGroupData>();
+ group->state = state;
+
+ // Mark each host with this group
+ for (auto& host : state.connStr.getServers()) {
+ _hostGroupsByHost[host] = group;
+ }
+}
+
+void ShardingTaskExecutorPoolController::_removeGroup(WithLock, const std::string& name) {
+ auto it = _hostGroups.find(name);
+ if (it == _hostGroups.end()) {
+ return;
+ }
+
+ auto& hostGroup = it->second;
+ for (auto& host : hostGroup->state.connStr.getServers()) {
+ _hostGroupsByHost.erase(host);
+ }
+ _hostGroups.erase(it);
+}
+
+class ShardingTaskExecutorPoolController::ReplicaSetChangeListener final
+ : public ReplicaSetChangeNotifier::Listener {
+public:
+ explicit ReplicaSetChangeListener(ShardingTaskExecutorPoolController* controller)
+ : _controller(controller) {}
+
+ void onFoundSet(const Key& key) override {
+ // Do nothing
+ }
+
+ void onConfirmedSet(const State& state) override {
+ stdx::lock_guard lk(_controller->_mutex);
+
+ _controller->_removeGroup(lk, state.connStr.getSetName());
+ _controller->_addGroup(lk, state);
+ }
+
+ void onPossibleSet(const State& state) override {
+ // Do nothing
+ }
+
+ void onDroppedSet(const Key& key) override {
+ stdx::lock_guard lk(_controller->_mutex);
+
+ _controller->_removeGroup(lk, key);
+ }
+
+private:
+ ShardingTaskExecutorPoolController* const _controller;
+};
+
+void ShardingTaskExecutorPoolController::init(ConnectionPool* parent) {
+ ControllerInterface::init(parent);
+ _listener = ReplicaSetMonitor::getNotifier().makeListener<ReplicaSetChangeListener>(this);
+}
+
+auto ShardingTaskExecutorPoolController::updateHost(const SpecificPool* pool,
+ const HostAndPort& host,
+ const HostState& stats) -> HostGroup {
+ stdx::lock_guard lk(_mutex);
+
+ auto& data = _poolData[pool];
+
+ const size_t minConns = gParameters.minConnections.load();
+ const size_t maxConns = gParameters.maxConnections.load();
+
+ // Update the target for just the pool first
+ data.target = stats.requests + stats.active;
+
+ if (data.target < minConns) {
+ data.target = minConns;
+ } else if (data.target > maxConns) {
+ data.target = maxConns;
+ }
+
+ data.isAbleToShutdown = stats.health.isExpired;
+
+ // If the pool isn't in a group, we can return now
+ auto it = _hostGroupsByHost.find(host);
+ if (it == _hostGroupsByHost.end()) {
+ return {{host}, data.isAbleToShutdown};
+ }
+
+ // If the pool has a group, then update the group
+ auto& hostGroup = it->second;
+ data.hostGroup = hostGroup;
+
+ // Make sure we're part of the group
+ hostGroup->pools.insert(pool);
+
+ switch (gParameters.matchingStrategy.load()) {
+ case MatchingStrategy::kMatchPrimaryNode: {
+ if (hostGroup->state.primary == host) {
+ hostGroup->target = data.target;
+ }
+ } break;
+ case MatchingStrategy::kMatchBusiestNode: {
+ hostGroup->target = std::max(hostGroup->target, data.target);
+ } break;
+ case MatchingStrategy::kDisabled: {
+ // Nothing
+ } break;
+ };
+
+ if (hostGroup->target < minConns) {
+ hostGroup->target = minConns;
+ } else if (hostGroup->target > maxConns) {
+ hostGroup->target = maxConns;
+ }
+
+ auto shouldShutdown = data.isAbleToShutdown &&
+ std::all_of(hostGroup->pools.begin(), hostGroup->pools.end(), [&](auto otherPool) {
+ return _poolData[otherPool].isAbleToShutdown;
+ });
+ return {hostGroup->state.connStr.getServers(), shouldShutdown};
+}
+
+void ShardingTaskExecutorPoolController::removeHost(const SpecificPool* pool) {
+ stdx::lock_guard lk(_mutex);
+ auto it = _poolData.find(pool);
+ if (it == _poolData.end()) {
+ // It's possible that a host immediately needs to go before it updates even once
+ return;
+ }
+
+ auto& data = it->second;
+ if (auto hostGroup = data.hostGroup.lock()) {
+ hostGroup->pools.erase(pool);
+ }
+ _poolData.erase(it);
+}
+
+auto ShardingTaskExecutorPoolController::getControls(const SpecificPool* pool)
+ -> ConnectionControls {
+ stdx::lock_guard lk(_mutex);
+ auto& data = _poolData[pool];
+
+ const size_t maxPending = gParameters.maxConnecting.load();
+
+ auto hostGroup = data.hostGroup.lock();
+ if (!hostGroup || gParameters.matchingStrategy.load() == MatchingStrategy::kDisabled) {
+ return {maxPending, data.target};
+ }
+
+ auto target = std::max(data.target, hostGroup->target);
+ return {maxPending, target};
+}
+
+Milliseconds ShardingTaskExecutorPoolController::hostTimeout() const {
+ return Milliseconds{gParameters.hostTimeoutMS.load()};
+}
+
+Milliseconds ShardingTaskExecutorPoolController::pendingTimeout() const {
+ return Milliseconds{gParameters.pendingTimeoutMS.load()};
+}
+
+Milliseconds ShardingTaskExecutorPoolController::toRefreshTimeout() const {
+ return Milliseconds{gParameters.toRefreshTimeoutMS.load()};
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.h b/src/mongo/s/sharding_task_executor_pool_controller.h
new file mode 100644
index 00000000000..d61f707de73
--- /dev/null
+++ b/src/mongo/s/sharding_task_executor_pool_controller.h
@@ -0,0 +1,186 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status.h"
+#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/executor/connection_pool.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+
+namespace mongo {
+
+/**
+ * A special Controller for the sharding ConnectionPool
+ *
+ * This class has two special members:
+ * * A global set of synchronized Parameters for the ShardingTaskExecutorPool server parameters
+ * * A ReplicaSetChangeListener to inform it of changes to replica set membership
+ *
+ * When the MatchingStrategy from its Parameters is kDisabled, this class operates much like the
+ * LimitController but with its limits allowed to shift at runtime (via Parameters).
+ *
+ * When the MatchingStrategy is kMatchPrimaryNode, the limits are obeyed but, when the pool for a
+ * primary member calls updateHost, it can increase the targetConnections for the pool of each other
+ * member of its replica set. Note that this will, at time of writing, follow the "hosts" field
+ * from the primary isMaster combined with the seed list for the replica set. If the seed list were
+ * to include arbiters or hidden members, then they would also be subject to these constraints.
+ *
+ * When the MatchingStrategy is kMatchBusiestNode, it operates like kMatchPrimaryNode, but any pool
+ * can be responsible for increasing the targetConnections of each member of its set.
+ *
+ * Note that, in essence, there are three outside elements that can mutate the state of this class:
+ * * The ReplicaSetChangeNotifier can notify the listener which updates the host groups
+ * * The ServerParameters can update the Parameters which will used in the next update
+ * * The SpecificPools for its ConnectionPool can updateHost with their individual States
+ */
+class ShardingTaskExecutorPoolController final
+ : public executor::ConnectionPool::ControllerInterface {
+ class ReplicaSetChangeListener;
+
+public:
+ using ConnectionPool = executor::ConnectionPool;
+
+ enum class MatchingStrategy {
+ kDisabled,
+ kMatchPrimaryNode,
+ kMatchBusiestNode,
+ };
+
+ class Parameters {
+ public:
+ AtomicWord<int> minConnections;
+ AtomicWord<int> maxConnections;
+ AtomicWord<int> maxConnecting;
+
+ AtomicWord<int> hostTimeoutMS;
+ AtomicWord<int> pendingTimeoutMS;
+ AtomicWord<int> toRefreshTimeoutMS;
+
+ synchronized_value<std::string> matchingStrategyString;
+ AtomicWord<MatchingStrategy> matchingStrategy;
+ };
+
+ static inline Parameters gParameters;
+
+ /**
+ * Validate that hostTimeoutMS is greater than the sum of pendingTimeoutMS and
+ * toRefreshTimeoutMS
+ */
+ static Status validateHostTimeout(const int& hostTimeoutMS);
+
+ /**
+ * Validate that pendingTimeoutMS is less than toRefreshTimeoutMS
+ */
+ static Status validatePendingTimeout(const int& pendingTimeoutMS);
+
+ /**
+ * Matches the matching strategy string against a set of literals
+ * and either sets gParameters.matchingStrategy or returns !Status::isOK().
+ */
+ static Status onUpdateMatchingStrategy(const std::string& str);
+
+ ShardingTaskExecutorPoolController() = default;
+ ShardingTaskExecutorPoolController& operator=(ShardingTaskExecutorPoolController&&) = delete;
+
+ void init(ConnectionPool* parent) override;
+
+ HostGroup updateHost(const SpecificPool* pool,
+ const HostAndPort& host,
+ const HostState& stats) override;
+ void removeHost(const SpecificPool* pool) override;
+
+ ConnectionControls getControls(const SpecificPool* pool) override;
+
+ Milliseconds hostTimeout() const override;
+ Milliseconds pendingTimeout() const override;
+ Milliseconds toRefreshTimeout() const override;
+
+ StringData name() const override {
+ return "ShardingTaskExecutorPoolController"_sd;
+ }
+
+private:
+ void _addGroup(WithLock, const ReplicaSetChangeNotifier::State& state);
+ void _removeGroup(WithLock, const std::string& key);
+
+ /**
+ * HostGroup is a shared state for a set of hosts (a replica set).
+ *
+ * When the ReplicaSetChangeListener is informed of a change to a replica set,
+ * it creates a new HostGroup and fills it into _hostGroups[setName] and
+ * _hostGroupsByHost[memberHost]. This does not immediately affect the results of getControls.
+ *
+ * When a SpecificPool calls updateHost, it checks _hostGroupsByHost to see if it belongs to
+ * any group and pushes itself into hostData for that group. It then will update target for its
+ * group according to the MatchingStrategy. It will also set shouldShutdown to true if every
+ * member of the group has shouldShutdown at true.
+ *
+ * Note that a HostData can find itself orphaned from its HostGroup during a reconfig.
+ */
+ struct HostGroupData {
+ // The ReplicaSet state for this set
+ ReplicaSetChangeNotifier::State state;
+
+ // Pointer index for each pool in the set
+ stdx::unordered_set<const SpecificPool*> pools;
+
+ // The number of connections that all hosts in the group should maintain
+ size_t target = 0;
+ };
+
+ /**
+ * HostData represents the current state for a specific HostAndPort/SpecificPool.
+ *
+ * It is mutated by updateHost/removeHost and used along with Parameters to form Controls
+ * for getControls.
+ */
+ struct HostData {
+ // The HostGroup associated with this pool.
+ // Note that this will be invalid if there was a replica set change
+ std::weak_ptr<HostGroupData> hostGroup;
+
+ // The number of connections the host should maintain
+ size_t target = 0;
+
+ // This host is able to shutdown
+ bool isAbleToShutdown = false;
+ };
+
+ ReplicaSetChangeListenerHandle _listener;
+
+ stdx::mutex _mutex;
+
+ stdx::unordered_map<const SpecificPool*, HostData> _poolData;
+ stdx::unordered_map<std::string, std::shared_ptr<HostGroupData>> _hostGroups;
+ stdx::unordered_map<HostAndPort, std::shared_ptr<HostGroupData>> _hostGroupsByHost;
+};
+} // namespace mongo