summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-04-25 17:33:34 -0400
committerBen Caimano <ben.caimano@10gen.com>2019-06-19 14:40:45 -0400
commitf2b968173b5ac64ba55cf85703e7eb3fefea9c44 (patch)
tree72674f8c00b93330d453fd55ba1ebe4dc807e963
parent629f276dbe0a6a65dc51ad237cef31a7e0c516d8 (diff)
downloadmongo-f2b968173b5ac64ba55cf85703e7eb3fefea9c44.tar.gz
SERVER-39817 SERVER-39819 SERVER-38920 Add ConnectionPoolControllers
See JIRA tickets listed for individual commits that landed on v4.3
-rw-r--r--jstests/noPassthrough/drop_connections_sharded.js9
-rw-r--r--jstests/noPassthrough/predictive_connpool.js159
-rw-r--r--jstests/noPassthrough/set_step_params.js270
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/replication_info.cpp8
-rw-r--r--src/mongo/executor/connection_pool.cpp603
-rw-r--r--src/mongo/executor/connection_pool.h154
-rw-r--r--src/mongo/s/SConscript2
-rw-r--r--src/mongo/s/sharding_initialization.cpp49
-rw-r--r--src/mongo/s/sharding_task_executor_pool.idl55
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.cpp249
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.h186
12 files changed, 1468 insertions, 277 deletions
diff --git a/jstests/noPassthrough/drop_connections_sharded.js b/jstests/noPassthrough/drop_connections_sharded.js
index 7ac8e2e8d4d..dd90f587b42 100644
--- a/jstests/noPassthrough/drop_connections_sharded.js
+++ b/jstests/noPassthrough/drop_connections_sharded.js
@@ -6,7 +6,14 @@
(function() {
"use strict";
- const st = new ShardingTest({shards: 1, rs0: {nodes: 3}, mongos: 1});
+ // TODO ShardingTaskExecutorPoolMinSize is set to 0 so that we can clearly observe
+ // dropConnections. This will no longer be necessary with SERVER-41460
+ const st = new ShardingTest({
+ config: {nodes: 1},
+ shards: 1,
+ rs0: {nodes: 3},
+ mongos: [{setParameter: {ShardingTaskExecutorPoolMinSize: 0}}]
+ });
const mongos = st.s0;
const rst = st.rs0;
const primary = rst.getPrimary();
diff --git a/jstests/noPassthrough/predictive_connpool.js b/jstests/noPassthrough/predictive_connpool.js
new file mode 100644
index 00000000000..c38d01601e2
--- /dev/null
+++ b/jstests/noPassthrough/predictive_connpool.js
@@ -0,0 +1,159 @@
+load("jstests/libs/parallelTester.js");
+
+/**
+ * @tags: [requires_sharding]
+ */
+
+(function() {
+ "use strict";
+
+ const st = new ShardingTest({mongos: 1, shards: 1, rs: {nodes: 2, protocolVersion: 1}});
+ const kDbName = 'test';
+ const mongosClient = st.s;
+ const mongos = mongosClient.getDB(kDbName);
+ const rst = st.rs0;
+ const primary = rst.getPrimary();
+ const secondary = rst.getSecondaries()[0];
+
+ const cfg = primary.getDB('local').system.replset.findOne();
+ const allHosts = cfg.members.map(x => x.host);
+ const primaryOnly = [primary.name];
+ const secondaryOnly = [secondary.name];
+
+ function configureReplSetFailpoint(name, modeValue) {
+ st.rs0.nodes.forEach(function(node) {
+ assert.commandWorked(node.getDB("admin").runCommand({
+ configureFailPoint: name,
+ mode: modeValue,
+ data: {shouldCheckForInterrupt: true},
+ }));
+ });
+ }
+
+ var threads = [];
+
+ function launchFinds({times, readPref, shouldFail}) {
+ jsTestLog("Starting " + times + " connections");
+ for (var i = 0; i < times; i++) {
+ var thread = new Thread(function(connStr, readPref, dbName, shouldFail) {
+ var client = new Mongo(connStr);
+ const ret = client.getDB(dbName).runCommand(
+ {find: "test", limit: 1, "$readPreference": {mode: readPref}});
+
+ if (shouldFail) {
+ assert.commandFailed(ret);
+ } else {
+ assert.commandWorked(ret);
+ }
+ }, st.s.host, readPref, kDbName, shouldFail);
+ thread.start();
+ threads.push(thread);
+ }
+ }
+
+ function updateSetParameters(params) {
+ var cmd = Object.assign({"setParameter": 1}, params);
+ assert.commandWorked(mongos.adminCommand(cmd));
+ }
+
+ function dropConnections() {
+ assert.commandWorked(mongos.adminCommand({dropConnections: 1, hostAndPort: allHosts}));
+ }
+
+ var currentCheckNum = 0;
+ function hasConnPoolStats(args) {
+ const checkNum = currentCheckNum++;
+ jsTestLog("Check #" + checkNum + ": " + tojson(args));
+ var {ready, pending, active, hosts, isAbsent} = args;
+
+ ready = ready ? ready : 0;
+ pending = pending ? pending : 0;
+ active = active ? active : 0;
+ hosts = hosts ? hosts : allHosts;
+
+ function checkStats(res, host) {
+ var stats = res.hosts[host];
+ if (!stats) {
+ jsTestLog("Connection stats for " + host + " are absent");
+ return isAbsent;
+ }
+
+ jsTestLog("Connection stats for " + host + ": " + tojson(stats));
+ return stats.available == ready && stats.refreshing == pending && stats.inUse == active;
+ }
+
+ function checkAllStats() {
+ var res = mongos.adminCommand({connPoolStats: 1});
+ return hosts.map(host => checkStats(res, host)).every(x => x);
+ }
+
+ assert.soon(checkAllStats, "Check #" + checkNum + " failed", 10000);
+
+ jsTestLog("Check #" + checkNum + " successful");
+ }
+
+ function checkConnPoolStats() {
+ const ret = mongos.runCommand({"connPoolStats": 1});
+ const poolStats = ret["pools"]["NetworkInterfaceTL-TaskExecutorPool-0"];
+ jsTestLog(poolStats);
+ }
+
+ function walkThroughBehavior({primaryFollows, secondaryFollows}) {
+ // Start pooling with a ping
+ mongos.adminCommand({multicast: {ping: 0}});
+ checkConnPoolStats();
+
+ // Block connections from finishing
+ configureReplSetFailpoint("waitInFindBeforeMakingBatch", "alwaysOn");
+
+ // Launch a bunch of primary finds
+ launchFinds({times: 10, readPref: "primary"});
+
+ // Confirm we follow
+ hasConnPoolStats({active: 10, hosts: primaryOnly});
+ if (secondaryFollows) {
+ hasConnPoolStats({ready: 10, hosts: secondaryOnly});
+ }
+ checkConnPoolStats();
+
+ // Launch a bunch of secondary finds
+ launchFinds({times: 20, readPref: "secondary"});
+
+ // Confirm we follow
+ hasConnPoolStats({active: 20, hosts: secondaryOnly});
+ if (primaryFollows) {
+ hasConnPoolStats({ready: 10, active: 10, hosts: primaryOnly});
+ }
+ checkConnPoolStats();
+
+ configureReplSetFailpoint("waitInFindBeforeMakingBatch", "off");
+
+ dropConnections();
+ }
+
+ assert.writeOK(mongos.test.insert({x: 1}));
+ assert.writeOK(mongos.test.insert({x: 2}));
+ assert.writeOK(mongos.test.insert({x: 3}));
+ st.rs0.awaitReplication();
+
+ jsTestLog("Following disabled");
+ walkThroughBehavior({primaryFollows: false, secondaryFollows: false});
+
+ jsTestLog("Following primary node");
+ updateSetParameters({ShardingTaskExecutorPoolReplicaSetMatching: "matchPrimaryNode"});
+ walkThroughBehavior({primaryFollows: false, secondaryFollows: true});
+
+ // jsTestLog("Following busiest node");
+ // updateSetParameters({ShardingTaskExecutorPoolReplicaSetMatching: "matchBusiestNode"});
+ // walkThroughBehavior({primaryFollows: true, secondaryFollows: true});
+
+ jsTestLog("Reseting to disabled");
+ updateSetParameters({ShardingTaskExecutorPoolReplicaSetMatching: "disabled"});
+ walkThroughBehavior({primaryFollows: false, secondaryFollows: false});
+
+ threads.forEach(function(thread) {
+ thread.join();
+ });
+
+ st.stop();
+})();
diff --git a/jstests/noPassthrough/set_step_params.js b/jstests/noPassthrough/set_step_params.js
new file mode 100644
index 00000000000..948ec5c117e
--- /dev/null
+++ b/jstests/noPassthrough/set_step_params.js
@@ -0,0 +1,270 @@
+load("jstests/libs/parallelTester.js");
+
+/**
+ * @tags: [requires_replication, requires_sharding]
+ */
+
+(function() {
+ "use strict";
+
+ const kDbName = 'test';
+
+ const minConns = 4;
+ var stepParams = {
+ ShardingTaskExecutorPoolMinSize: minConns,
+ ShardingTaskExecutorPoolMaxSize: 10,
+ ShardingTaskExecutorPoolMaxConnecting: 5,
+ ShardingTaskExecutorPoolHostTimeoutMS: 300000,
+ ShardingTaskExecutorPoolRefreshRequirementMS: 60000,
+ ShardingTaskExecutorPoolRefreshTimeoutMS: 20000,
+ ShardingTaskExecutorPoolReplicaSetMatching: "disabled",
+ };
+
+ const st = new ShardingTest({
+ config: {nodes: 1},
+ shards: 1,
+ rs0: {nodes: 1},
+ mongos: [{setParameter: stepParams}],
+ });
+ const mongos = st.s0;
+ const rst = st.rs0;
+ const primary = rst.getPrimary();
+
+ const cfg = primary.getDB('local').system.replset.findOne();
+ const allHosts = cfg.members.map(x => x.host);
+ const mongosDB = mongos.getDB(kDbName);
+ const primaryOnly = [primary.name];
+
+ function configureReplSetFailpoint(name, modeValue) {
+ st.rs0.nodes.forEach(function(node) {
+ assert.commandWorked(node.getDB("admin").runCommand({
+ configureFailPoint: name,
+ mode: modeValue,
+ data: {shouldCheckForInterrupt: true},
+ }));
+ });
+ }
+
+ var threads = [];
+ function launchFinds({times, readPref, shouldFail}) {
+ jsTestLog("Starting " + times + " connections");
+ for (var i = 0; i < times; i++) {
+ var thread = new Thread(function(connStr, readPref, dbName, shouldFail) {
+ var client = new Mongo(connStr);
+ const ret = client.getDB(dbName).runCommand(
+ {find: "test", limit: 1, "$readPreference": {mode: readPref}});
+
+ if (shouldFail) {
+ assert.commandFailed(ret);
+ } else {
+ assert.commandWorked(ret);
+ }
+ }, st.s.host, readPref, kDbName, shouldFail);
+ thread.start();
+ threads.push(thread);
+ }
+ }
+
+ var currentCheckNum = 0;
+ function hasConnPoolStats(args) {
+ const checkNum = currentCheckNum++;
+ jsTestLog("Check #" + checkNum + ": " + tojson(args));
+ var {ready, pending, active, hosts, isAbsent} = args;
+
+ ready = ready ? ready : 0;
+ pending = pending ? pending : 0;
+ active = active ? active : 0;
+ hosts = hosts ? hosts : allHosts;
+
+ function checkStats(res, host) {
+ var stats = res.hosts[host];
+ if (!stats) {
+ jsTestLog("Connection stats for " + host + " are absent");
+ return isAbsent;
+ }
+
+ jsTestLog("Connection stats for " + host + ": " + tojson(stats));
+ return stats.available == ready && stats.refreshing == pending && stats.inUse == active;
+ }
+
+ function checkAllStats() {
+ var res = mongos.adminCommand({connPoolStats: 1});
+ return hosts.map(host => checkStats(res, host)).every(x => x);
+ }
+
+ assert.soon(checkAllStats, "Check #" + checkNum + " failed", 10000);
+
+ jsTestLog("Check #" + checkNum + " successful");
+ }
+
+ function updateSetParameters(params) {
+ var cmd = Object.assign({"setParameter": 1}, params);
+ assert.commandWorked(mongos.adminCommand(cmd));
+ }
+
+ function dropConnections() {
+ assert.commandWorked(mongos.adminCommand({dropConnections: 1, hostAndPort: allHosts}));
+ }
+
+ function resetPools() {
+ dropConnections();
+ mongos.adminCommand({multicast: {ping: 0}});
+ hasConnPoolStats({ready: 4});
+ dropConnections();
+ hasConnPoolStats({});
+ }
+
+ function runSubTest(name, fun) {
+ jsTestLog("Running test for " + name);
+
+ resetPools();
+
+ fun();
+
+ updateSetParameters(stepParams);
+ }
+
+ assert.writeOK(mongosDB.test.insert({x: 1}));
+ assert.writeOK(mongosDB.test.insert({x: 2}));
+ assert.writeOK(mongosDB.test.insert({x: 3}));
+ st.rs0.awaitReplication();
+
+ runSubTest("MinSize", function() {
+ // Launch an initial find to trigger to min
+ launchFinds({times: 1, readPref: "primary"});
+ hasConnPoolStats({ready: minConns});
+
+ // Increase by one
+ updateSetParameters({ShardingTaskExecutorPoolMinSize: 5});
+ hasConnPoolStats({ready: 5});
+
+ // Increase to MaxSize
+ updateSetParameters({ShardingTaskExecutorPoolMinSize: 10});
+ hasConnPoolStats({ready: 10});
+
+ // Decrease to zero
+ updateSetParameters({ShardingTaskExecutorPoolMinSize: 0});
+ });
+
+ runSubTest("MaxSize", function() {
+ configureReplSetFailpoint("waitInFindBeforeMakingBatch", "alwaysOn");
+
+ // Launch 10 blocked finds
+ launchFinds({times: 10, readPref: "primary"});
+ hasConnPoolStats({active: 10, hosts: primaryOnly});
+
+ // Increase by 5 and Launch another 4 blocked finds
+ updateSetParameters({ShardingTaskExecutorPoolMaxSize: 15});
+ launchFinds({times: 4, readPref: "primary"});
+ hasConnPoolStats({active: 14, hosts: primaryOnly});
+
+ // Launch yet another 2, these should add only 1 connection
+ launchFinds({times: 2, readPref: "primary"});
+ hasConnPoolStats({active: 15, hosts: primaryOnly});
+
+ configureReplSetFailpoint("waitInFindBeforeMakingBatch", "off");
+ hasConnPoolStats({ready: 15, pending: 0, hosts: primaryOnly});
+ });
+
+ // Test maxConnecting
+ runSubTest("MaxConnecting", function() {
+ const maxPending1 = 2;
+ const maxPending2 = 4;
+ const conns = 6;
+
+ updateSetParameters({
+ ShardingTaskExecutorPoolMaxSize: 100,
+ ShardingTaskExecutorPoolMaxConnecting: maxPending1,
+ });
+
+ configureReplSetFailpoint("waitInIsMaster", "alwaysOn");
+ configureReplSetFailpoint("waitInFindBeforeMakingBatch", "alwaysOn");
+
+ // Go to the limit of maxConnecting, so we're stuck here
+ launchFinds({times: maxPending1, readPref: "primary"});
+ hasConnPoolStats({pending: maxPending1});
+
+ // More won't run right now
+ launchFinds({times: conns - maxPending1, readPref: "primary"});
+ hasConnPoolStats({pending: maxPending1});
+
+ // If we increase our limit, it should fill in some of the connections
+ updateSetParameters({ShardingTaskExecutorPoolMaxConnecting: maxPending2});
+ hasConnPoolStats({pending: maxPending2});
+
+ // Dropping the limit doesn't cause us to drop pending
+ updateSetParameters({ShardingTaskExecutorPoolMaxConnecting: maxPending1});
+ hasConnPoolStats({pending: maxPending2});
+
+ // Release our pending and walk away
+ configureReplSetFailpoint("waitInIsMaster", "off");
+ hasConnPoolStats({active: conns});
+ configureReplSetFailpoint("waitInFindBeforeMakingBatch", "off");
+ });
+
+ runSubTest("Timeouts", function() {
+ const conns = minConns;
+ const pendingTimeoutMS = 5000;
+ const toRefreshTimeoutMS = 1000;
+ const idleTimeoutMS1 = 20000;
+ const idleTimeoutMS2 = 15500;
+
+ // Updating separately since the validation depends on existing params
+ updateSetParameters({
+ ShardingTaskExecutorPoolRefreshTimeoutMS: pendingTimeoutMS,
+ });
+ updateSetParameters({
+ ShardingTaskExecutorPoolRefreshRequirementMS: toRefreshTimeoutMS,
+ });
+ updateSetParameters({
+ ShardingTaskExecutorPoolHostTimeoutMS: idleTimeoutMS1,
+ });
+
+ // Make ready connections
+ configureReplSetFailpoint("waitInFindBeforeMakingBatch", "alwaysOn");
+ launchFinds({times: conns, readPref: "primary"});
+ configureReplSetFailpoint("waitInFindBeforeMakingBatch", "off");
+ hasConnPoolStats({ready: conns});
+
+ // Block refreshes and wait for the toRefresh timeout
+ configureReplSetFailpoint("waitInIsMaster", "alwaysOn");
+ sleep(toRefreshTimeoutMS);
+
+ // Confirm that we're in pending for all of our conns
+ hasConnPoolStats({pending: conns});
+
+ // Set our min conns to 0 to make sure we don't refresh after pending timeout
+ updateSetParameters({
+ ShardingTaskExecutorPoolMinSize: 0,
+ });
+
+ // Wait for our pending timeout
+ sleep(pendingTimeoutMS);
+ hasConnPoolStats({});
+
+ configureReplSetFailpoint("waitInIsMaster", "off");
+
+ // Reset the min conns to make sure normal refresh doesn't extend the timeout
+ updateSetParameters({
+ ShardingTaskExecutorPoolMinSize: minConns,
+ });
+
+ // Wait for our host timeout and confirm the pool drops
+ sleep(idleTimeoutMS1);
+ hasConnPoolStats({isAbsent: true});
+
+ // Reset the pool
+ resetPools();
+
+ // Sleep for a shorter timeout and then update so we're already expired
+ sleep(idleTimeoutMS2);
+ updateSetParameters({ShardingTaskExecutorPoolHostTimeoutMS: idleTimeoutMS2});
+ hasConnPoolStats({isAbsent: true});
+ });
+
+ threads.forEach(function(thread) {
+ thread.join();
+ });
+
+ st.stop();
+})();
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index a9531a25cac..eda9c1c6315 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1693,6 +1693,7 @@ env.Library(
'$BUILD_DIR/mongo/db/auth/saslauth',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/query_exec',
+ "$BUILD_DIR/mongo/util/fail_point",
'oplog',
'oplogreader',
'repl_coordinator_interface',
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index 855356c8c9e..e3f26e73513 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -58,10 +58,14 @@
#include "mongo/executor/network_interface.h"
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/log.h"
#include "mongo/util/map_util.h"
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(waitInIsMaster);
+
using std::unique_ptr;
using std::list;
using std::string;
@@ -236,6 +240,10 @@ public:
const BSONObj& cmdObj,
BSONObjBuilder& result) final {
CommandHelpers::handleMarkKillOnClientDisconnect(opCtx);
+
+ // TODO Unwind after SERVER-41070
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, waitInIsMaster);
+
/* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not
authenticated.
*/
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 3c4454110c4..034c369afd4 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -33,6 +33,8 @@
#include "mongo/executor/connection_pool.h"
+#include <fmt/format.h>
+
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/remote_command_request.h"
@@ -42,6 +44,8 @@
#include "mongo/util/lru_cache.h"
#include "mongo/util/scopeguard.h"
+using namespace fmt::literals;
+
// One interesting implementation note herein concerns how setup() and
// refresh() are invoked outside of the global lock, but setTimeout is not.
// This implementation detail simplifies mocks, allowing them to return
@@ -82,6 +86,84 @@ size_t ConnectionPool::ConnectionInterface::getGeneration() const {
return _generation;
}
+void ConnectionPool::ControllerInterface::init(ConnectionPool* pool) {
+ invariant(pool);
+
+ LOG(2) << "Controller for " << pool->_name << " is " << name();
+ _pool = pool;
+}
+
+std::string ConnectionPool::ConnectionControls::toString() const {
+ return "{{ maxPending: {}, target: {}, }}"_format(maxPendingConnections, targetConnections);
+}
+
+std::string ConnectionPool::HostState::toString() const {
+ return "{{ requests: {}, ready: {}, pending: {}, active: {}, isExpired: {} }}"_format(
+ requests, ready, pending, active, health.isExpired);
+}
+
+/**
+ * Standard controller for the ConnectionPool
+ *
+ * This class uses the Options struct in the ConnectionPool to determine its controls.
+ */
+class ConnectionPool::LimitController final : public ConnectionPool::ControllerInterface {
+public:
+ HostGroup updateHost(const SpecificPool* pool,
+ const HostAndPort& host,
+ const HostState& stats) override {
+ stdx::lock_guard lk(_mutex);
+ auto& data = _poolData[pool];
+
+ const auto minConns = getPool()->_options.minConnections;
+ const auto maxConns = getPool()->_options.maxConnections;
+
+ data.target = stats.requests + stats.active;
+ if (data.target < minConns) {
+ data.target = minConns;
+ } else if (data.target > maxConns) {
+ data.target = maxConns;
+ }
+
+ return {{host}, stats.health.isExpired};
+ }
+ void removeHost(const SpecificPool* pool) override {
+ stdx::lock_guard lk(_mutex);
+ _poolData.erase(pool);
+ }
+
+ ConnectionControls getControls(const SpecificPool* pool) override {
+ stdx::lock_guard lk(_mutex);
+ const auto& data = _poolData[pool];
+
+ return {
+ getPool()->_options.maxConnecting, data.target,
+ };
+ }
+
+ Milliseconds hostTimeout() const override {
+ return getPool()->_options.hostTimeout;
+ }
+ Milliseconds pendingTimeout() const override {
+ return getPool()->_options.refreshTimeout;
+ }
+ Milliseconds toRefreshTimeout() const override {
+ return getPool()->_options.refreshRequirement;
+ }
+
+ StringData name() const override {
+ return "LimitController"_sd;
+ }
+
+protected:
+ struct Data {
+ size_t target = 0;
+ };
+
+ stdx::mutex _mutex;
+ stdx::unordered_map<const SpecificPool*, Data> _poolData;
+};
+
/**
* A pool for a specific HostAndPort
*
@@ -91,6 +173,8 @@ size_t ConnectionPool::ConnectionInterface::getGeneration() const {
*/
class ConnectionPool::SpecificPool final
: public std::enable_shared_from_this<ConnectionPool::SpecificPool> {
+ static constexpr int kDiagnosticLogLevel = 3;
+
public:
/**
* Whenever a function enters a specific pool, the function needs to be guarded by the lock.
@@ -120,15 +204,30 @@ public:
~SpecificPool();
/**
+ * Create and initialize a SpecificPool
+ */
+ static auto make(std::shared_ptr<ConnectionPool> parent,
+ const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode);
+
+ /**
+ * Triggers a controller update, potentially changes the request timer,
+ * and maybe delists from pool
+ *
+ * This should only be called by the ConnectionPool or StateLock
+ */
+ void updateState();
+
+ /**
* Gets a connection from the specific pool. Sinks a unique_lock from the
* parent to preserve the lock on _mutex
*/
Future<ConnectionHandle> getConnection(Milliseconds timeout);
/**
- * Triggers the shutdown procedure. This function marks the state as kInShutdown
- * and calls processFailure below with the status provided. This may not immediately
- * delist or destruct this pool. However, both will happen eventually as ConnectionHandles
+ * Triggers the shutdown procedure. This function sets isShutdown to true
+ * and calls processFailure below with the status provided. This immediately removes this pool
+ * from the ConnectionPool. The actual destruction will happen eventually as ConnectionHandles
* are deleted.
*/
void triggerShutdown(const Status& status);
@@ -141,37 +240,43 @@ public:
void processFailure(const Status& status);
/**
- * Returns a connection to a specific pool. Sinks a unique_lock from the
- * parent to preserve the lock on _mutex
- */
- void returnConnection(ConnectionInterface* connection);
-
- /**
* Returns the number of connections currently checked out of the pool.
*/
- size_t inUseConnections();
+ size_t inUseConnections() const;
/**
* Returns the number of available connections in the pool.
*/
- size_t availableConnections();
+ size_t availableConnections() const;
/**
* Returns the number of in progress connections in the pool.
*/
- size_t refreshingConnections();
+ size_t refreshingConnections() const;
/**
* Returns the total number of connections ever created in this pool.
*/
- size_t createdConnections();
+ size_t createdConnections() const;
/**
* Returns the total number of connections currently open that belong to
* this pool. This is the sum of refreshingConnections, availableConnections,
* and inUseConnections.
*/
- size_t openConnections();
+ size_t openConnections() const;
+
+ /**
+ * Returns the number of unfulfilled requests pending.
+ */
+ size_t requestsPending() const;
+
+ /**
+ * Returns the HostAndPort for this pool.
+ */
+ const HostAndPort& host() const {
+ return _hostAndPort;
+ }
/**
* Return true if the tags on the specific pool match the passed in tags
@@ -195,8 +300,6 @@ public:
}
}
- void spawnConnections();
-
template <typename CallableT>
void runOnExecutor(CallableT&& cb) {
ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) //
@@ -207,8 +310,6 @@ public:
});
}
- void updateState();
-
private:
using OwnedConnection = std::shared_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
@@ -222,13 +323,20 @@ private:
ConnectionHandle makeHandle(ConnectionInterface* connection);
+ /**
+ * Establishes connections until the ControllerInterface's target is met.
+ */
+ void spawnConnections();
+
void finishRefresh(ConnectionInterface* connPtr, Status status);
void addToReady(OwnedConnection conn);
void fulfillRequests();
- // This internal helper is used both by get and by fulfillRequests and differs in that it
+ void returnConnection(ConnectionInterface* connPtr);
+
+ // This internal helper is used both by get and by _fulfillRequests and differs in that it
// skips some bookkeeping that the other callers do on their own
ConnectionHandle tryGetConnection();
@@ -238,6 +346,15 @@ private:
OwnedConnection takeFromProcessingPool(ConnectionInterface* connection);
+ // Update the health struct and related variables
+ void updateHealth();
+
+ // Update the event timer for this host pool
+ void updateEventTimer();
+
+ // Update the controller and potentially change the controls
+ void updateController();
+
private:
const std::shared_ptr<ConnectionPool> _parent;
@@ -250,43 +367,36 @@ private:
OwnershipPool _checkedOutPool;
std::vector<Request> _requests;
+ Date_t _lastActiveTime;
- std::shared_ptr<TimerInterface> _requestTimer;
- Date_t _requestTimerExpiration;
- size_t _generation;
- bool _inFulfillRequests;
- bool _inSpawnConnections;
+ std::shared_ptr<TimerInterface> _eventTimer;
+ Date_t _eventTimerExpiration;
+ Date_t _hostExpiration;
- size_t _created;
+ // The _generation is the set of connection objects we believe are healthy.
+ // It increases when we process a failure. If a connection is from a previous generation,
+ // it will be discarded on return/refresh.
+ size_t _generation = 0;
- transport::Session::TagMask _tags = transport::Session::kPending;
-
- /**
- * The current state of the pool
- *
- * The pool begins in a running state. Moves to idle when no requests
- * are pending and no connections are checked out. It finally enters
- * shutdown after hostTimeout has passed (and waits there for current
- * refreshes to process out).
- *
- * At any point a new request sets the state back to running and
- * restarts all timers.
- */
- enum class State {
- // The pool is active
- kRunning,
+ bool _inFulfillRequests = false;
+ bool _inControlLoop = false;
- // No current activity, waiting for hostTimeout to pass
- kIdle,
+ size_t _created = 0;
- // hostTimeout is passed, we're waiting for any processing
- // connections to finish before shutting down
- kInShutdown,
- };
+ transport::Session::TagMask _tags = transport::Session::kPending;
- State _state;
+ HostHealth _health;
};
+auto ConnectionPool::SpecificPool::make(std::shared_ptr<ConnectionPool> parent,
+ const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode) {
+ auto pool = std::make_shared<SpecificPool>(std::move(parent), hostAndPort, sslMode);
+ pool->updateEventTimer();
+ pool->updateHealth();
+ return pool;
+}
+
const Status ConnectionPool::kConnectionStateUnknown =
Status(ErrorCodes::InternalError, "Connection is in an unknown state");
@@ -294,12 +404,18 @@ ConnectionPool::ConnectionPool(std::shared_ptr<DependentTypeFactoryInterface> im
std::string name,
Options options)
: _name(std::move(name)),
- _options(std::move(options)),
_factory(std::move(impl)),
+ _options(std::move(options)),
+ _controller(std::move(_options.controller)),
_manager(options.egressTagCloserManager) {
if (_manager) {
_manager->add(this);
}
+
+ if (!_controller) {
+ _controller = std::make_shared<LimitController>();
+ }
+ _controller->init(this);
}
ConnectionPool::~ConnectionPool() {
@@ -393,9 +509,10 @@ SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPo
transport::ConnectSSLMode sslMode,
Milliseconds timeout) {
stdx::lock_guard lk(_mutex);
+
auto& pool = _pools[hostAndPort];
if (!pool) {
- pool = std::make_shared<SpecificPool>(shared_from_this(), hostAndPort, sslMode);
+ pool = SpecificPool::make(shared_from_this(), hostAndPort, sslMode);
} else {
pool->fassertSSLModeIs(sslMode);
}
@@ -439,65 +556,73 @@ ConnectionPool::SpecificPool::SpecificPool(std::shared_ptr<ConnectionPool> paren
: _parent(std::move(parent)),
_sslMode(sslMode),
_hostAndPort(hostAndPort),
- _readyPool(std::numeric_limits<size_t>::max()),
- _generation(0),
- _inFulfillRequests(false),
- _inSpawnConnections(false),
- _created(0),
- _state(State::kRunning) {
+ _readyPool(std::numeric_limits<size_t>::max()) {
invariant(_parent);
- _requestTimer = _parent->_factory->makeTimer();
+ _eventTimer = _parent->_factory->makeTimer();
}
ConnectionPool::SpecificPool::~SpecificPool() {
- DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();)
+ DESTRUCTOR_GUARD(_eventTimer->cancelTimeout();)
invariant(_requests.empty());
invariant(_checkedOutPool.empty());
}
-size_t ConnectionPool::SpecificPool::inUseConnections() {
+size_t ConnectionPool::SpecificPool::inUseConnections() const {
return _checkedOutPool.size();
}
-size_t ConnectionPool::SpecificPool::availableConnections() {
+size_t ConnectionPool::SpecificPool::availableConnections() const {
return _readyPool.size();
}
-size_t ConnectionPool::SpecificPool::refreshingConnections() {
+size_t ConnectionPool::SpecificPool::refreshingConnections() const {
return _processingPool.size();
}
-size_t ConnectionPool::SpecificPool::createdConnections() {
+size_t ConnectionPool::SpecificPool::createdConnections() const {
return _created;
}
-size_t ConnectionPool::SpecificPool::openConnections() {
+size_t ConnectionPool::SpecificPool::openConnections() const {
return _checkedOutPool.size() + _readyPool.size() + _processingPool.size();
}
+size_t ConnectionPool::SpecificPool::requestsPending() const {
+ return _requests.size();
+}
+
Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection(
Milliseconds timeout) {
- invariant(_state != State::kInShutdown);
- auto conn = tryGetConnection();
+ // Reset our activity timestamp
+ auto now = _parent->_factory->now();
+ _lastActiveTime = now;
- if (conn) {
- return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn));
+ // If we do not have requests, then we can fulfill immediately
+ if (_requests.size() == 0) {
+ auto conn = tryGetConnection();
+
+ if (conn) {
+ LOG(kDiagnosticLogLevel) << "Requesting new connection to " << _hostAndPort
+ << "--using existing idle connection";
+ return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn));
+ }
}
- if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) {
- timeout = _parent->_options.refreshTimeout;
+ auto pendingTimeout = _parent->_controller->pendingTimeout();
+ if (timeout < Milliseconds(0) || timeout > pendingTimeout) {
+ timeout = pendingTimeout;
}
+ LOG(kDiagnosticLogLevel) << "Requesting new connection to " << _hostAndPort << " with timeout "
+ << timeout;
- const auto expiration = _parent->_factory->now() + timeout;
+ const auto expiration = now + timeout;
auto pf = makePromiseFuture<ConnectionHandle>();
_requests.push_back(make_pair(expiration, std::move(pf.promise)));
std::push_heap(begin(_requests), end(_requests), RequestComparator{});
- runOnExecutor([ this, anchor = shared_from_this() ]() { spawnConnections(); });
-
return std::move(pf.future);
}
@@ -505,7 +630,10 @@ auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -
auto deleter = [ this, anchor = shared_from_this() ](ConnectionInterface * connection) {
runOnExecutor([this, connection]() {
stdx::lock_guard lk(_parent->_mutex);
+
returnConnection(connection);
+
+ _lastActiveTime = _parent->_factory->now();
updateState();
});
};
@@ -547,34 +675,38 @@ void ConnectionPool::SpecificPool::finishRefresh(ConnectionInterface* connPtr, S
auto conn = takeFromProcessingPool(connPtr);
// If we're in shutdown, we don't need refreshed connections
- if (_state == State::kInShutdown)
+ if (_health.isShutdown) {
return;
+ }
// If we've exceeded the time limit, start a new connect,
// rather than failing all operations. We do this because the
// various callers have their own time limit which is unrelated
// to our internal one.
if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
- LOG(0) << "Pending connection to host " << _hostAndPort
- << " did not complete within the connection timeout,"
- << " retrying with a new connection;" << openConnections()
- << " connections to that host remain open";
- spawnConnections();
+ LOG(kDiagnosticLogLevel) << "Pending connection to host " << _hostAndPort
+ << " did not complete within the connection timeout,"
+ << " retrying with a new connection;" << openConnections()
+ << " connections to that host remain open";
return;
}
// Pass a failure on through
if (!status.isOK()) {
+ LOG(kDiagnosticLogLevel) << "Connection failed to " << _hostAndPort << " due to "
+ << redact(status);
processFailure(status);
return;
}
// If the host and port were dropped, let this lapse and spawn new connections
- if (conn->getGeneration() != _generation) {
- spawnConnections();
+ if (!conn || conn->getGeneration() != _generation) {
+ LOG(kDiagnosticLogLevel) << "Dropping late refreshed connection to " << _hostAndPort;
return;
}
+ LOG(kDiagnosticLogLevel) << "Finishing connection refresh for " << _hostAndPort;
+
// If the connection refreshed successfully, throw it back in the ready pool
addToReady(std::move(conn));
@@ -582,7 +714,7 @@ void ConnectionPool::SpecificPool::finishRefresh(ConnectionInterface* connPtr, S
}
void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr) {
- auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement;
+ auto needsRefreshTP = connPtr->getLastUsed() + _parent->_controller->toRefreshTimeout();
auto conn = takeFromPool(_checkedOutPool, connPtr);
invariant(conn);
@@ -592,10 +724,11 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
return;
}
- if (!conn->getStatus().isOK()) {
+ if (auto status = conn->getStatus(); !status.isOK()) {
// TODO: alert via some callback if the host is bad
- log() << "Ending connection to host " << _hostAndPort << " due to bad connection status; "
- << openConnections() << " connections to that host remain open";
+ log() << "Ending connection to host " << _hostAndPort
+ << " due to bad connection status: " << redact(status) << "; " << openConnections()
+ << " connections to that host remain open";
return;
}
@@ -603,8 +736,9 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
if (needsRefreshTP <= now) {
// If we need to refresh this connection
+ auto controls = _parent->_controller->getControls(this);
if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >=
- _parent->_options.minConnections) {
+ controls.targetConnections) {
// If we already have minConnections, just let the connection lapse
log() << "Ending idle connection to host " << _hostAndPort
<< " because the pool meets constraints; " << openConnections()
@@ -614,7 +748,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
_processingPool[connPtr] = std::move(conn);
- connPtr->refresh(_parent->_options.refreshTimeout,
+ LOG(kDiagnosticLogLevel) << "Refreshing connection to " << _hostAndPort;
+ connPtr->refresh(_parent->_controller->pendingTimeout(),
guardCallback([this](auto conn, auto status) {
finishRefresh(std::move(conn), std::move(status));
}));
@@ -623,6 +758,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
}
// If it's fine as it is, just put it in the ready queue
+ LOG(kDiagnosticLogLevel) << "Returning ready connection to " << _hostAndPort;
addToReady(std::move(conn));
fulfillRequests();
@@ -638,37 +774,44 @@ void ConnectionPool::SpecificPool::addToReady(OwnedConnection conn) {
// Our strategy for refreshing connections is to check them out and
// immediately check them back in (which kicks off the refresh logic in
// returnConnection
- connPtr->setTimeout(_parent->_options.refreshRequirement, guardCallback([this, connPtr]() {
- auto conn = takeFromPool(_readyPool, connPtr);
+ auto returnConnectionFunc = guardCallback([this, connPtr]() {
+ LOG(kDiagnosticLogLevel) << "Triggered refresh timeout for " << _hostAndPort;
+ auto conn = takeFromPool(_readyPool, connPtr);
- // We've already been checked out. We don't need to refresh
- // ourselves.
- if (!conn)
- return;
+ // We've already been checked out. We don't need to refresh ourselves.
+ if (!conn)
+ return;
- // If we're in shutdown, we don't need to refresh connections
- if (_state == State::kInShutdown)
- return;
+ // If we're in shutdown, we don't need to refresh connections
+ if (_health.isShutdown)
+ return;
- _checkedOutPool[connPtr] = std::move(conn);
+ _checkedOutPool[connPtr] = std::move(conn);
- connPtr->indicateSuccess();
+ connPtr->indicateSuccess();
- returnConnection(connPtr);
- }));
+ returnConnection(connPtr);
+ });
+ connPtr->setTimeout(_parent->_controller->toRefreshTimeout(), std::move(returnConnectionFunc));
}
// Sets state to shutdown and kicks off the failure protocol to tank existing connections
void ConnectionPool::SpecificPool::triggerShutdown(const Status& status) {
- _state = State::kInShutdown;
- _droppedProcessingPool.clear();
+ _health.isShutdown = true;
+
+ LOG(2) << "Delisting connection pool for " << _hostAndPort;
+ _parent->_controller->removeHost(this);
+ _parent->_pools.erase(_hostAndPort);
+
processFailure(status);
+
+ _droppedProcessingPool.clear();
+ _eventTimer->cancelTimeout();
}
// Drop connections and fail all requests
void ConnectionPool::SpecificPool::processFailure(const Status& status) {
- // Bump the generation so we don't reuse any pending or checked out
- // connections
+ // Bump the generation so we don't reuse any pending or checked out connections
_generation++;
if (!_readyPool.empty() || !_processingPool.empty()) {
@@ -686,24 +829,24 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status) {
// Migrate processing connections to the dropped pool
for (auto&& x : _processingPool) {
- if (_state != State::kInShutdown) {
- // If we're just dropping the pool, we can reuse them later
- _droppedProcessingPool[x.first] = std::move(x.second);
- }
+ // If we're just dropping the pool, we can reuse them later
+ _droppedProcessingPool[x.first] = std::move(x.second);
}
_processingPool.clear();
- // Move the requests out so they aren't visible
- // in other threads
- decltype(_requests) requestsToFail;
- {
- using std::swap;
- swap(requestsToFail, _requests);
+ // Mark ourselves as failed so we don't immediately respawn
+ _health.isFailed = true;
+
+ if (_requests.empty()) {
+ return;
}
- for (auto& request : requestsToFail) {
+ for (auto& request : _requests) {
request.second.setError(status);
}
+
+ LOG(kDiagnosticLogLevel) << "Failing requests to " << _hostAndPort;
+ _requests.clear();
}
// fulfills as many outstanding requests as possible
@@ -715,8 +858,10 @@ void ConnectionPool::SpecificPool::fulfillRequests() {
_inFulfillRequests = true;
auto guard = makeGuard([&] { _inFulfillRequests = false; });
-
while (_requests.size()) {
+ // Marking this as our newest active time
+ _lastActiveTime = _parent->_factory->now();
+
// Caution: If this returns with a value, it's important that we not throw until we've
// emplaced the promise (as returning a connection would attempt to take the lock and would
// deadlock).
@@ -735,37 +880,38 @@ void ConnectionPool::SpecificPool::fulfillRequests() {
promise.emplaceValue(std::move(conn));
}
-
- spawnConnections();
}
-// spawn enough connections to satisfy open requests and minpool, while
-// honoring maxpool
+// spawn enough connections to satisfy open requests and minpool, while honoring maxpool
void ConnectionPool::SpecificPool::spawnConnections() {
- // If some other thread (possibly this thread) is spawning connections,
- // don't keep padding the callstack.
- if (_inSpawnConnections)
+ if (_health.isFailed) {
+ LOG(kDiagnosticLogLevel)
+ << "Pool for " << _hostAndPort
+ << " has failed recently. Postponing any attempts to spawn connections";
return;
+ }
- _inSpawnConnections = true;
- auto guard = makeGuard([&] { _inSpawnConnections = false; });
+ auto controls = _parent->_controller->getControls(this);
+ LOG(kDiagnosticLogLevel) << "Comparing connection state for " << _hostAndPort
+ << " to Controls: " << controls;
- // We want minConnections <= outstanding requests <= maxConnections
- auto target = [&] {
- return std::max(
- _parent->_options.minConnections,
- std::min(_requests.size() + _checkedOutPool.size(), _parent->_options.maxConnections));
- };
+ auto pendingConnections = refreshingConnections();
+ if (pendingConnections >= controls.maxPendingConnections) {
+ return;
+ }
- // While all of our inflight connections are less than our target
- while ((_state != State::kInShutdown) &&
- (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) &&
- (_processingPool.size() < _parent->_options.maxConnecting)) {
- if (_readyPool.empty() && _processingPool.empty()) {
- auto severity = MONGO_GET_LIMITED_SEVERITY(_hostAndPort, Seconds{1}, 0, 2);
- LOG(severity) << "Connecting to " << _hostAndPort;
- }
+ auto totalConnections = openConnections();
+ if (totalConnections >= controls.targetConnections) {
+ return;
+ }
+
+ auto severity = MONGO_GET_LIMITED_SEVERITY(_hostAndPort, Seconds{1}, 0, 2);
+ LOG(severity) << "Connecting to " << _hostAndPort;
+ auto allowance = std::min(controls.targetConnections - totalConnections,
+ controls.maxPendingConnections - pendingConnections);
+ LOG(kDiagnosticLogLevel) << "Spawning " << allowance << " connections to " << _hostAndPort;
+ for (decltype(allowance) i = 0; i < allowance; ++i) {
OwnedConnection handle;
try {
// make a new connection and put it in processing
@@ -776,17 +922,13 @@ void ConnectionPool::SpecificPool::spawnConnections() {
}
_processingPool[handle.get()] = handle;
-
++_created;
// Run the setup callback
- handle->setup(_parent->_options.refreshTimeout,
+ handle->setup(_parent->_controller->pendingTimeout(),
guardCallback([this](auto conn, auto status) {
finishRefresh(std::move(conn), std::move(status));
}));
-
- // Note that this assumes that the refreshTimeout is sound for the
- // setupTimeout
}
}
@@ -806,96 +948,145 @@ ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::take
ConnectionInterface* connPtr) {
auto conn = takeFromPool(_processingPool, connPtr);
if (conn) {
- invariant(_state != State::kInShutdown);
+ invariant(!_health.isShutdown);
return conn;
}
return takeFromPool(_droppedProcessingPool, connPtr);
}
+void ConnectionPool::SpecificPool::updateHealth() {
+ const auto now = _parent->_factory->now();
-// Updates our state and manages the request timer
-void ConnectionPool::SpecificPool::updateState() {
- if (_state == State::kInShutdown) {
- // If we're in shutdown, there is nothing to update. Our clients are all gone.
- if (_processingPool.empty()) {
- // If we have no more clients that require access to us, delist from the parent pool
- LOG(2) << "Delisting connection pool for " << _hostAndPort;
- _parent->_pools.erase(_hostAndPort);
- }
- return;
+ // We're expired if we have no sign of connection use and are past our expiry
+ _health.isExpired = _requests.empty() && _checkedOutPool.empty() && (_hostExpiration <= now);
+
+ // We're failed until we get new requests or our timer triggers
+ if (_health.isFailed) {
+ _health.isFailed = _requests.empty();
}
+}
- if (_requests.size()) {
- // We have some outstanding requests, we're live
+void ConnectionPool::SpecificPool::updateEventTimer() {
+ const auto now = _parent->_factory->now();
- // If we were already running and the timer is the same as it was
- // before, nothing to do
- if (_state == State::kRunning && _requestTimerExpiration == _requests.front().first)
- return;
+ // If our pending event has triggered, then schedule a retry as the next event
+ auto nextEventTime = _eventTimerExpiration;
+ if (nextEventTime <= now) {
+ nextEventTime = now + kHostRetryTimeout;
+ }
- _state = State::kRunning;
+ // If our expiration comes before our next event, then it is the next event
+ if (_requests.empty() && _checkedOutPool.empty()) {
+ _hostExpiration = _lastActiveTime + _parent->_controller->hostTimeout();
+ if ((_hostExpiration > now) && (_hostExpiration < nextEventTime)) {
+ nextEventTime = _hostExpiration;
+ }
+ }
- _requestTimer->cancelTimeout();
+ // If a request would timeout before the next event, then it is the next event
+ if (_requests.size() && (_requests.front().first < nextEventTime)) {
+ nextEventTime = _requests.front().first;
+ }
- _requestTimerExpiration = _requests.front().first;
+ // If our timer is already set to the next event, then we're done
+ if (nextEventTime == _eventTimerExpiration) {
+ return;
+ }
- auto timeout = _requests.front().first - _parent->_factory->now();
+ _eventTimerExpiration = nextEventTime;
+ // TODO Our timeout can be a Date_t after SERVER-41459
+ const auto timeout = _eventTimerExpiration - now;
- // We set a timer for the most recent request, then invoke each timed
- // out request we couldn't service
- _requestTimer->setTimeout(
- timeout, guardCallback([this]() {
- auto now = _parent->_factory->now();
+ _eventTimer->cancelTimeout();
- while (_requests.size()) {
- auto& x = _requests.front();
+ // Set our event timer to timeout requests, refresh the state, and potentially expire this pool
+ auto deferredStateUpdateFunc = guardCallback([this]() {
+ auto now = _parent->_factory->now();
- if (x.first <= now) {
- auto promise = std::move(x.second);
- std::pop_heap(begin(_requests), end(_requests), RequestComparator{});
- _requests.pop_back();
+ _health.isFailed = false;
- promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- "Couldn't get a connection within the time limit"));
- } else {
- break;
- }
- }
- }));
- } else if (_checkedOutPool.size()) {
- // If we have no requests, but someone's using a connection, we just
- // hang around until the next request or a return
+ while (_requests.size() && (_requests.front().first <= now)) {
+ std::pop_heap(begin(_requests), end(_requests), RequestComparator{});
- _requestTimer->cancelTimeout();
- _state = State::kRunning;
- _requestTimerExpiration = _requestTimerExpiration.max();
- } else {
- // If we don't have any live requests and no one has checked out connections
+ auto& request = _requests.back();
+ request.second.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ "Couldn't get a connection within the time limit"));
+ _requests.pop_back();
- // If we used to be idle, just bail
- if (_state == State::kIdle)
- return;
+ // Since we've failed a request, we've interacted with external users
+ _lastActiveTime = now;
+ }
+ });
+ _eventTimer->setTimeout(timeout, std::move(deferredStateUpdateFunc));
+}
- _state = State::kIdle;
+void ConnectionPool::SpecificPool::updateController() {
+ if (std::exchange(_inControlLoop, true)) {
+ return;
+ }
+ const auto guard = makeGuard([&] { _inControlLoop = false; });
- _requestTimer->cancelTimeout();
+ auto& controller = *_parent->_controller;
- _requestTimerExpiration = _parent->_factory->now() + _parent->_options.hostTimeout;
+ // Update our own state
+ HostState state{
+ _health,
+ requestsPending(),
+ refreshingConnections(),
+ availableConnections(),
+ inUseConnections(),
+ };
+ LOG(kDiagnosticLogLevel) << "Updating controller for " << _hostAndPort
+ << " with State: " << state;
+ auto hostGroup = controller.updateHost(this, _hostAndPort, std::move(state));
+
+ // If we can shutdown, then do so
+ if (hostGroup.canShutdown) {
+ for (const auto& host : hostGroup.hosts) {
+ auto it = _parent->_pools.find(host);
+ if (it == _parent->_pools.end()) {
+ continue;
+ }
+
+ auto& pool = it->second;
+
+ // At the moment, controllers will never mark for shutdown a pool with active
+ // connections or pending requests. isExpired is never true if these invariants are
+ // false. That's not to say that it's a terrible idea, but if this happens then we
+ // should review what it means to be expired.
+
+ invariant(pool->_checkedOutPool.empty());
+ invariant(pool->_requests.empty());
+
+ pool->triggerShutdown(Status(ErrorCodes::ShutdownInProgress,
+ str::stream() << "Pool for " << host << " has expired."));
+ }
+ return;
+ }
- auto timeout = _parent->_options.hostTimeout;
- // Set the shutdown timer, this gets reset on any request
- _requestTimer->setTimeout(
- timeout, guardCallback([this]() {
- if (_state != State::kIdle)
- return;
+ // Make sure all related hosts exist
+ for (const auto& host : hostGroup.hosts) {
+ if (auto& pool = _parent->_pools[host]; !pool) {
+ pool = SpecificPool::make(_parent, host, _sslMode);
+ }
+ }
- triggerShutdown(
- Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- "Connection pool has been idle for longer than the host timeout"));
- }));
+ runOnExecutor([ this, anchor = shared_from_this() ]() { spawnConnections(); });
+}
+
+// Updates our state and manages the request timer
+void ConnectionPool::SpecificPool::updateState() {
+ if (_health.isShutdown) {
+ // If we're in shutdown, there is nothing to update. Our clients are all gone.
+ LOG(kDiagnosticLogLevel) << _hostAndPort << " is dead";
+ return;
}
+
+ updateEventTimer();
+ updateHealth();
+ updateController();
}
} // namespace executor
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 9fb22e2c71e..6eca977ca1e 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -63,24 +63,28 @@ struct ConnectionPoolStats;
* HostAndPort. See comments on the various Options for how the pool operates.
*/
class ConnectionPool : public EgressTagCloser, public std::enable_shared_from_this<ConnectionPool> {
- class SpecificPool;
+ class LimitController;
public:
+ class SpecificPool;
+
class ConnectionInterface;
class DependentTypeFactoryInterface;
class TimerInterface;
+ class ControllerInterface;
using ConnectionHandleDeleter = stdx::function<void(ConnectionInterface* connection)>;
using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>;
using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>;
- static constexpr Milliseconds kDefaultHostTimeout = Minutes(5);
static constexpr size_t kDefaultMaxConns = std::numeric_limits<size_t>::max();
static constexpr size_t kDefaultMinConns = 1;
static constexpr size_t kDefaultMaxConnecting = 2;
+ static constexpr Milliseconds kDefaultHostTimeout = Minutes(5);
static constexpr Milliseconds kDefaultRefreshRequirement = Minutes(1);
static constexpr Milliseconds kDefaultRefreshTimeout = Seconds(20);
+ static constexpr Milliseconds kHostRetryTimeout = Seconds(1);
static const Status kConnectionStateUnknown;
@@ -137,6 +141,80 @@ public:
* Connections created through this connection pool will not attempt to authenticate.
*/
bool skipAuthentication = false;
+
+ std::shared_ptr<ControllerInterface> controller;
+ };
+
+ /**
+ * A set of flags describing the health of a host pool
+ */
+ struct HostHealth {
+ /**
+ * The pool is expired and can be shutdown by updateController
+ *
+ * This flag is set to true when there have been no connection requests or in use
+ * connections for ControllerInterface::hostTimeout().
+ *
+ * This flag is set to false whenever a connection is requested.
+ */
+ bool isExpired = false;
+
+ /**
+ * The pool has processed a failure and will not spawn new connections until requested
+ *
+ * This flag is set to true by processFailure(), and thus also triggerShutdown().
+ *
+ * This flag is set to false whenever a connection is requested.
+ *
+ * As a further note, this prevents us from spamming a failed host with connection
+ * attempts. If an external user believes a host should be available, they can request
+ * again.
+ */
+ bool isFailed = false;
+
+ /**
+ * The pool is shutdown and will never be called by the ConnectionPool again.
+ *
+ * This flag is set to true by triggerShutdown() or updateController(). It is never unset.
+ */
+ bool isShutdown = false;
+ };
+
+ /**
+ * The state of connection pooling for a single host
+ *
+ * This should only be constructed by the SpecificPool.
+ */
+ struct HostState {
+ HostHealth health;
+ size_t requests = 0;
+ size_t pending = 0;
+ size_t ready = 0;
+ size_t active = 0;
+
+ std::string toString() const;
+ };
+
+ /**
+ * A simple set of controls to direct a single host
+ *
+ * This should only be constructed by a ControllerInterface
+ */
+ struct ConnectionControls {
+ size_t maxPendingConnections = kDefaultMaxConnecting;
+ size_t targetConnections = 0;
+
+ std::string toString() const;
+ };
+
+ /**
+ * A set of hosts and a flag canShutdown for if the group can shutdown
+ *
+ * This should only be constructed by a ControllerInterface
+ */
+ struct HostGroup {
+ std::vector<HostAndPort> hosts;
+ bool canShutdown = false;
};
explicit ConnectionPool(std::shared_ptr<DependentTypeFactoryInterface> impl,
@@ -169,11 +247,10 @@ public:
private:
std::string _name;
- // Options are set at startup and never changed at run time, so these are
- // accessed outside the lock
- const Options _options;
-
const std::shared_ptr<DependentTypeFactoryInterface> _factory;
+ Options _options;
+
+ std::shared_ptr<ControllerInterface> _controller;
// The global mutex for specific pool access and the generation counter
mutable stdx::mutex _mutex;
@@ -321,6 +398,71 @@ private:
};
/**
+ * An implementation of ControllerInterface directs the behavior of a SpecificPool
+ *
+ * Generally speaking, a Controller will be given HostState via updateState and then return Controls
+ * via getControls. A Controller is expected to not directly mutate its SpecificPool, including via
+ * its ConnectionPool pointer. A Controller is expected to be given to only one ConnectionPool.
+ */
+class ConnectionPool::ControllerInterface {
+public:
+ using SpecificPool = typename ConnectionPool::SpecificPool;
+ using HostState = typename ConnectionPool::HostState;
+ using ConnectionControls = typename ConnectionPool::ConnectionControls;
+ using HostGroup = typename ConnectionPool::HostGroup;
+
+ virtual ~ControllerInterface() = default;
+
+ /**
+ * Initialize this ControllerInterface using the given ConnectionPool
+ *
+ * ConnectionPools provide access to Executors and other DTF-provided objects.
+ */
+ virtual void init(ConnectionPool* parent);
+
+ /**
+ * Inform this Controller of a new State for a pool/host
+ *
+ * This function returns information about all hosts tied to this one. This function is also
+ * expected to handle never-before-seen pools.
+ */
+ virtual HostGroup updateHost(const SpecificPool* pool,
+ const HostAndPort& host,
+ const HostState& stats) = 0;
+
+ /**
+ * Inform this Controller that a pool is no longer tracked
+ */
+ virtual void removeHost(const SpecificPool* pool) = 0;
+
+ /**
+ * Get controls for the given pool
+ */
+ virtual ConnectionControls getControls(const SpecificPool* pool) = 0;
+
+ /**
+ * Get the various timeouts that this controller suggests
+ */
+ virtual Milliseconds hostTimeout() const = 0;
+ virtual Milliseconds pendingTimeout() const = 0;
+ virtual Milliseconds toRefreshTimeout() const = 0;
+
+ /**
+ * Get the name for this controller
+ *
+ * This function is intended to provide increased visibility into which controller is in use
+ */
+ virtual StringData name() const = 0;
+
+ const ConnectionPool* getPool() const {
+ return _pool;
+ }
+
+protected:
+ ConnectionPool* _pool = nullptr;
+};
+
+/**
* Implementation interface for the connection pool
*
* This factory provides generators for connections, timers and a clock for the
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 6a7aaf77606..00a42bcc683 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -104,6 +104,7 @@ env.Library(
target='sharding_initialization',
source=[
'sharding_initialization.cpp',
+ 'sharding_task_executor_pool_controller.cpp',
env.Idlc('sharding_task_executor_pool.idl')[0],
'client/sharding_connection_hook.cpp',
'client/sharding_network_connection_hook.cpp',
@@ -118,6 +119,7 @@ env.Library(
LIBDEPS_PRIVATE=[
"$BUILD_DIR/mongo/idl/server_parameter",
'$BUILD_DIR/mongo/executor/thread_pool_task_executor',
+ '$BUILD_DIR/mongo/executor/connection_pool_executor',
'coreshard',
'sharding_task_executor',
],
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 6982c7a4687..0b542f920dc 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -64,6 +64,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/sharding_task_executor.h"
+#include "mongo/s/sharding_task_executor_pool_controller.h"
#include "mongo/s/sharding_task_executor_pool_gen.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -171,53 +172,11 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
return {ErrorCodes::BadValue, "Unrecognized connection string."};
}
- // We don't set the ConnectionPool's static const variables to be the default value in
- // MONGO_EXPORT_STARTUP_SERVER_PARAMETER because it's not guaranteed to be initialized.
- // The following code is a workaround.
ConnectionPool::Options connPoolOptions;
+ connPoolOptions.controller = std::make_shared<ShardingTaskExecutorPoolController>();
- connPoolOptions.minConnections = gShardingTaskExecutorPoolMinConnections;
- connPoolOptions.maxConnections = (gShardingTaskExecutorPoolMaxConnections >= 0)
- ? gShardingTaskExecutorPoolMaxConnections
- : ConnectionPool::kDefaultMaxConns;
- connPoolOptions.maxConnecting = (gShardingTaskExecutorPoolMaxConnecting >= 0)
- ? gShardingTaskExecutorPoolMaxConnecting
- : ConnectionPool::kDefaultMaxConnecting;
-
- connPoolOptions.hostTimeout = Milliseconds(gShardingTaskExecutorPoolHostTimeoutMS);
- connPoolOptions.refreshRequirement =
- Milliseconds(gShardingTaskExecutorPoolRefreshRequirementMS);
- connPoolOptions.refreshTimeout = Milliseconds(gShardingTaskExecutorPoolRefreshTimeoutMS);
-
- if (connPoolOptions.refreshRequirement <= connPoolOptions.refreshTimeout) {
- auto newRefreshTimeout = connPoolOptions.refreshRequirement - Milliseconds(1);
- warning() << "ShardingTaskExecutorPoolRefreshRequirementMS ("
- << connPoolOptions.refreshRequirement
- << ") set below ShardingTaskExecutorPoolRefreshTimeoutMS ("
- << connPoolOptions.refreshTimeout
- << "). Adjusting ShardingTaskExecutorPoolRefreshTimeoutMS to "
- << newRefreshTimeout;
- connPoolOptions.refreshTimeout = newRefreshTimeout;
- }
-
- if (connPoolOptions.hostTimeout <=
- connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout) {
- auto newHostTimeout =
- connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout + Milliseconds(1);
- warning() << "ShardingTaskExecutorPoolHostTimeoutMS (" << connPoolOptions.hostTimeout
- << ") set below ShardingTaskExecutorPoolRefreshRequirementMS ("
- << connPoolOptions.refreshRequirement
- << ") + ShardingTaskExecutorPoolRefreshTimeoutMS ("
- << connPoolOptions.refreshTimeout
- << "). Adjusting ShardingTaskExecutorPoolHostTimeoutMS to " << newHostTimeout;
- connPoolOptions.hostTimeout = newHostTimeout;
- }
-
- auto network =
- executor::makeNetworkInterface("ShardRegistry",
- stdx::make_unique<ShardingNetworkConnectionHook>(),
- hookBuilder(),
- connPoolOptions);
+ auto network = executor::makeNetworkInterface(
+ "ShardRegistry", std::make_unique<ShardingNetworkConnectionHook>(), hookBuilder());
auto networkPtr = network.get();
auto executorPool = makeShardingTaskExecutorPool(
std::move(network), hookBuilder, connPoolOptions, taskExecutorPoolSize);
diff --git a/src/mongo/s/sharding_task_executor_pool.idl b/src/mongo/s/sharding_task_executor_pool.idl
index 390dbc5a17b..dbced832b1b 100644
--- a/src/mongo/s/sharding_task_executor_pool.idl
+++ b/src/mongo/s/sharding_task_executor_pool.idl
@@ -28,49 +28,66 @@
global:
cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/s/sharding_task_executor_pool_controller.h"
server_parameters:
ShardingTaskExecutorPoolMinSize:
description: <-
The minimum number of connections for each executor in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolMinConnections"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.minConnections"
+ validator:
+ gte: 0
default: 1
ShardingTaskExecutorPoolMaxSize:
description: <-
The maximum number of connections for each executor in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolMaxConnections"
- default: -1
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnections"
+ validator:
+ gte: 1
+ default: 32767
ShardingTaskExecutorPoolMaxConnecting:
description: <-
The maximum number of in-flight connections for each executor
in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolMaxConnecting"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnecting"
+ validator:
+ gte: 1
default: 2
ShardingTaskExecutorPoolHostTimeoutMS:
description: <-
The timeout for dropping a host for each executor in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolHostTimeoutMS"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.hostTimeoutMS"
+ validator:
+ callback: "ShardingTaskExecutorPoolController::validateHostTimeout"
+ gte: 1
default: 300000 # 5mins
ShardingTaskExecutorPoolRefreshRequirementMS:
description: <-
The timeout before a connection needs to be refreshed for each executor
in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolRefreshRequirementMS"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.toRefreshTimeoutMS"
+ validator:
+ gte: 1
default: 60000 # 1min
ShardingTaskExecutorPoolRefreshTimeoutMS:
description: <-
The timeout for refreshing a connection for each executor in the pool for the sharding grid.
- set_at: [ startup ]
- cpp_vartype: "int"
- cpp_varname: "gShardingTaskExecutorPoolRefreshTimeoutMS"
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.pendingTimeoutMS"
+ validator:
+ callback: "ShardingTaskExecutorPoolController::validatePendingTimeout"
+ gte: 1
default: 20000 # 20secs
+ ShardingTaskExecutorPoolReplicaSetMatching:
+ description: <-
+ Enables ReplicaSet member connection matching.
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.matchingStrategyString"
+ on_update: "ShardingTaskExecutorPoolController::onUpdateMatchingStrategy"
+ default: "disabled"
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp
new file mode 100644
index 00000000000..77d694e2f16
--- /dev/null
+++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp
@@ -0,0 +1,249 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kConnectionPool
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/replica_set_monitor.h"
+#include "mongo/s/sharding_task_executor_pool_controller.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+Status ShardingTaskExecutorPoolController::validateHostTimeout(const int& hostTimeoutMS) {
+ auto toRefreshTimeoutMS = gParameters.toRefreshTimeoutMS.load();
+ auto pendingTimeoutMS = gParameters.pendingTimeoutMS.load();
+ if (hostTimeoutMS >= (toRefreshTimeoutMS + pendingTimeoutMS)) {
+ return Status::OK();
+ }
+
+ std::string msg = str::stream()
+ << "ShardingTaskExecutorPoolHostTimeoutMS (" << hostTimeoutMS
+ << ") set below ShardingTaskExecutorPoolRefreshRequirementMS (" << toRefreshTimeoutMS
+ << ") + ShardingTaskExecutorPoolRefreshTimeoutMS (" << pendingTimeoutMS << ").";
+ return Status(ErrorCodes::BadValue, msg);
+}
+
+Status ShardingTaskExecutorPoolController::validatePendingTimeout(const int& pendingTimeoutMS) {
+ auto toRefreshTimeoutMS = gParameters.toRefreshTimeoutMS.load();
+ if (pendingTimeoutMS < toRefreshTimeoutMS) {
+ return Status::OK();
+ }
+
+ std::string msg = str::stream()
+ << "ShardingTaskExecutorPoolRefreshRequirementMS (" << toRefreshTimeoutMS
+ << ") set below ShardingTaskExecutorPoolRefreshTimeoutMS (" << pendingTimeoutMS << ").";
+ return Status(ErrorCodes::BadValue, msg);
+}
+
+Status ShardingTaskExecutorPoolController::onUpdateMatchingStrategy(const std::string& str) {
+ // TODO Fix up after SERVER-40224
+ if (str == "disabled") {
+ gParameters.matchingStrategy.store(MatchingStrategy::kDisabled);
+ } else if (str == "matchPrimaryNode") {
+ gParameters.matchingStrategy.store(MatchingStrategy::kMatchPrimaryNode);
+ // TODO Reactive once the prediction pattern is fixed in SERVER-41602
+ //} else if (str == "matchBusiestNode") {
+ // gParameters.matchingStrategy.store(MatchingStrategy::kMatchBusiestNode);
+ } else {
+ return Status{ErrorCodes::BadValue,
+ str::stream() << "Unrecognized matching strategy '" << str << "'"};
+ }
+
+ return Status::OK();
+}
+
+void ShardingTaskExecutorPoolController::_addGroup(WithLock,
+ const ReplicaSetChangeNotifier::State& state) {
+ // Replace the last group
+ auto& group = _hostGroups[state.connStr.getSetName()];
+ group = std::make_shared<HostGroupData>();
+ group->state = state;
+
+ // Mark each host with this group
+ for (auto& host : state.connStr.getServers()) {
+ _hostGroupsByHost[host] = group;
+ }
+}
+
+void ShardingTaskExecutorPoolController::_removeGroup(WithLock, const std::string& name) {
+ auto it = _hostGroups.find(name);
+ if (it == _hostGroups.end()) {
+ return;
+ }
+
+ auto& hostGroup = it->second;
+ for (auto& host : hostGroup->state.connStr.getServers()) {
+ _hostGroupsByHost.erase(host);
+ }
+ _hostGroups.erase(it);
+}
+
+class ShardingTaskExecutorPoolController::ReplicaSetChangeListener final
+ : public ReplicaSetChangeNotifier::Listener {
+public:
+ explicit ReplicaSetChangeListener(ShardingTaskExecutorPoolController* controller)
+ : _controller(controller) {}
+
+ void onFoundSet(const Key& key) override {
+ // Do nothing
+ }
+
+ void onConfirmedSet(const State& state) override {
+ stdx::lock_guard lk(_controller->_mutex);
+
+ _controller->_removeGroup(lk, state.connStr.getSetName());
+ _controller->_addGroup(lk, state);
+ }
+
+ void onPossibleSet(const State& state) override {
+ // Do nothing
+ }
+
+ void onDroppedSet(const Key& key) override {
+ stdx::lock_guard lk(_controller->_mutex);
+
+ _controller->_removeGroup(lk, key);
+ }
+
+private:
+ ShardingTaskExecutorPoolController* const _controller;
+};
+
+void ShardingTaskExecutorPoolController::init(ConnectionPool* parent) {
+ ControllerInterface::init(parent);
+ _listener = ReplicaSetMonitor::getNotifier().makeListener<ReplicaSetChangeListener>(this);
+}
+
+auto ShardingTaskExecutorPoolController::updateHost(const SpecificPool* pool,
+ const HostAndPort& host,
+ const HostState& stats) -> HostGroup {
+ stdx::lock_guard lk(_mutex);
+
+ auto& data = _poolData[pool];
+
+ const size_t minConns = gParameters.minConnections.load();
+ const size_t maxConns = gParameters.maxConnections.load();
+
+ // Update the target for just the pool first
+ data.target = stats.requests + stats.active;
+
+ if (data.target < minConns) {
+ data.target = minConns;
+ } else if (data.target > maxConns) {
+ data.target = maxConns;
+ }
+
+ data.isAbleToShutdown = stats.health.isExpired;
+
+ // If the pool isn't in a group, we can return now
+ auto it = _hostGroupsByHost.find(host);
+ if (it == _hostGroupsByHost.end()) {
+ return {{host}, data.isAbleToShutdown};
+ }
+
+ // If the pool has a group, then update the group
+ auto& hostGroup = it->second;
+ data.hostGroup = hostGroup;
+
+ // Make sure we're part of the group
+ hostGroup->pools.insert(pool);
+
+ switch (gParameters.matchingStrategy.load()) {
+ case MatchingStrategy::kMatchPrimaryNode: {
+ if (hostGroup->state.primary == host) {
+ hostGroup->target = data.target;
+ }
+ } break;
+ case MatchingStrategy::kMatchBusiestNode: {
+ hostGroup->target = std::max(hostGroup->target, data.target);
+ } break;
+ case MatchingStrategy::kDisabled: {
+ // Nothing
+ } break;
+ };
+
+ if (hostGroup->target < minConns) {
+ hostGroup->target = minConns;
+ } else if (hostGroup->target > maxConns) {
+ hostGroup->target = maxConns;
+ }
+
+ auto shouldShutdown = data.isAbleToShutdown &&
+ std::all_of(hostGroup->pools.begin(), hostGroup->pools.end(), [&](auto otherPool) {
+ return _poolData[otherPool].isAbleToShutdown;
+ });
+ return {hostGroup->state.connStr.getServers(), shouldShutdown};
+}
+
+void ShardingTaskExecutorPoolController::removeHost(const SpecificPool* pool) {
+ stdx::lock_guard lk(_mutex);
+ auto it = _poolData.find(pool);
+ if (it == _poolData.end()) {
+ // It's possible that a host immediately needs to go before it updates even once
+ return;
+ }
+
+ auto& data = it->second;
+ if (auto hostGroup = data.hostGroup.lock()) {
+ hostGroup->pools.erase(pool);
+ }
+ _poolData.erase(it);
+}
+
+auto ShardingTaskExecutorPoolController::getControls(const SpecificPool* pool)
+ -> ConnectionControls {
+ stdx::lock_guard lk(_mutex);
+ auto& data = _poolData[pool];
+
+ const size_t maxPending = gParameters.maxConnecting.load();
+
+ auto hostGroup = data.hostGroup.lock();
+ if (!hostGroup || gParameters.matchingStrategy.load() == MatchingStrategy::kDisabled) {
+ return {maxPending, data.target};
+ }
+
+ auto target = std::max(data.target, hostGroup->target);
+ return {maxPending, target};
+}
+
+Milliseconds ShardingTaskExecutorPoolController::hostTimeout() const {
+ return Milliseconds{gParameters.hostTimeoutMS.load()};
+}
+
+Milliseconds ShardingTaskExecutorPoolController::pendingTimeout() const {
+ return Milliseconds{gParameters.pendingTimeoutMS.load()};
+}
+
+Milliseconds ShardingTaskExecutorPoolController::toRefreshTimeout() const {
+ return Milliseconds{gParameters.toRefreshTimeoutMS.load()};
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.h b/src/mongo/s/sharding_task_executor_pool_controller.h
new file mode 100644
index 00000000000..d61f707de73
--- /dev/null
+++ b/src/mongo/s/sharding_task_executor_pool_controller.h
@@ -0,0 +1,186 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status.h"
+#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/executor/connection_pool.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+
+namespace mongo {
+
+/**
+ * A special Controller for the sharding ConnectionPool
+ *
+ * This class has two special members:
+ * * A global set of synchronized Parameters for the ShardingTaskExecutorPool server parameters
+ * * A ReplicaSetChangeListener to inform it of changes to replica set membership
+ *
+ * When the MatchingStrategy from its Parameters is kDisabled, this class operates much like the
+ * LimitController but with its limits allowed to shift at runtime (via Parameters).
+ *
+ * When the MatchingStrategy is kMatchPrimaryNode, the limits are obeyed but, when the pool for a
+ * primary member calls updateHost, it can increase the targetConnections for the pool of each other
+ * member of its replica set. Note that this will, at time of writing, follow the "hosts" field
+ * from the primary isMaster combined with the seed list for the replica set. If the seed list were
+ * to include arbiters or hidden members, then they would also be subject to these constraints.
+ *
+ * When the MatchingStrategy is kMatchBusiestNode, it operates like kMatchPrimaryNode, but any pool
+ * can be responsible for increasing the targetConnections of each member of its set.
+ *
+ * Note that, in essence, there are three outside elements that can mutate the state of this class:
+ * * The ReplicaSetChangeNotifier can notify the listener which updates the host groups
+ * * The ServerParameters can update the Parameters which will used in the next update
+ * * The SpecificPools for its ConnectionPool can updateHost with their individual States
+ */
+class ShardingTaskExecutorPoolController final
+ : public executor::ConnectionPool::ControllerInterface {
+ class ReplicaSetChangeListener;
+
+public:
+ using ConnectionPool = executor::ConnectionPool;
+
+ enum class MatchingStrategy {
+ kDisabled,
+ kMatchPrimaryNode,
+ kMatchBusiestNode,
+ };
+
+ class Parameters {
+ public:
+ AtomicWord<int> minConnections;
+ AtomicWord<int> maxConnections;
+ AtomicWord<int> maxConnecting;
+
+ AtomicWord<int> hostTimeoutMS;
+ AtomicWord<int> pendingTimeoutMS;
+ AtomicWord<int> toRefreshTimeoutMS;
+
+ synchronized_value<std::string> matchingStrategyString;
+ AtomicWord<MatchingStrategy> matchingStrategy;
+ };
+
+ static inline Parameters gParameters;
+
+ /**
+ * Validate that hostTimeoutMS is greater than the sum of pendingTimeoutMS and
+ * toRefreshTimeoutMS
+ */
+ static Status validateHostTimeout(const int& hostTimeoutMS);
+
+ /**
+ * Validate that pendingTimeoutMS is less than toRefreshTimeoutMS
+ */
+ static Status validatePendingTimeout(const int& pendingTimeoutMS);
+
+ /**
+ * Matches the matching strategy string against a set of literals
+ * and either sets gParameters.matchingStrategy or returns !Status::isOK().
+ */
+ static Status onUpdateMatchingStrategy(const std::string& str);
+
+ ShardingTaskExecutorPoolController() = default;
+ ShardingTaskExecutorPoolController& operator=(ShardingTaskExecutorPoolController&&) = delete;
+
+ void init(ConnectionPool* parent) override;
+
+ HostGroup updateHost(const SpecificPool* pool,
+ const HostAndPort& host,
+ const HostState& stats) override;
+ void removeHost(const SpecificPool* pool) override;
+
+ ConnectionControls getControls(const SpecificPool* pool) override;
+
+ Milliseconds hostTimeout() const override;
+ Milliseconds pendingTimeout() const override;
+ Milliseconds toRefreshTimeout() const override;
+
+ StringData name() const override {
+ return "ShardingTaskExecutorPoolController"_sd;
+ }
+
+private:
+ void _addGroup(WithLock, const ReplicaSetChangeNotifier::State& state);
+ void _removeGroup(WithLock, const std::string& key);
+
+ /**
+ * HostGroup is a shared state for a set of hosts (a replica set).
+ *
+ * When the ReplicaSetChangeListener is informed of a change to a replica set,
+ * it creates a new HostGroup and fills it into _hostGroups[setName] and
+ * _hostGroupsByHost[memberHost]. This does not immediately affect the results of getControls.
+ *
+ * When a SpecificPool calls updateHost, it checks _hostGroupsByHost to see if it belongs to
+ * any group and pushes itself into hostData for that group. It then will update target for its
+ * group according to the MatchingStrategy. It will also set shouldShutdown to true if every
+ * member of the group has shouldShutdown at true.
+ *
+ * Note that a HostData can find itself orphaned from its HostGroup during a reconfig.
+ */
+ struct HostGroupData {
+ // The ReplicaSet state for this set
+ ReplicaSetChangeNotifier::State state;
+
+ // Pointer index for each pool in the set
+ stdx::unordered_set<const SpecificPool*> pools;
+
+ // The number of connections that all hosts in the group should maintain
+ size_t target = 0;
+ };
+
+ /**
+ * HostData represents the current state for a specific HostAndPort/SpecificPool.
+ *
+ * It is mutated by updateHost/removeHost and used along with Parameters to form Controls
+ * for getControls.
+ */
+ struct HostData {
+ // The HostGroup associated with this pool.
+ // Note that this will be invalid if there was a replica set change
+ std::weak_ptr<HostGroupData> hostGroup;
+
+ // The number of connections the host should maintain
+ size_t target = 0;
+
+ // This host is able to shutdown
+ bool isAbleToShutdown = false;
+ };
+
+ ReplicaSetChangeListenerHandle _listener;
+
+ stdx::mutex _mutex;
+
+ stdx::unordered_map<const SpecificPool*, HostData> _poolData;
+ stdx::unordered_map<std::string, std::shared_ptr<HostGroupData>> _hostGroups;
+ stdx::unordered_map<HostAndPort, std::shared_ptr<HostGroupData>> _hostGroupsByHost;
+};
+} // namespace mongo