diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2019-04-25 17:33:34 -0400 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2019-06-19 14:40:45 -0400 |
commit | f2b968173b5ac64ba55cf85703e7eb3fefea9c44 (patch) | |
tree | 72674f8c00b93330d453fd55ba1ebe4dc807e963 | |
parent | 629f276dbe0a6a65dc51ad237cef31a7e0c516d8 (diff) | |
download | mongo-f2b968173b5ac64ba55cf85703e7eb3fefea9c44.tar.gz |
SERVER-39817 SERVER-39819 SERVER-38920 Add ConnectionPoolControllers
See JIRA tickets listed for individual commits that landed on v4.3
-rw-r--r-- | jstests/noPassthrough/drop_connections_sharded.js | 9 | ||||
-rw-r--r-- | jstests/noPassthrough/predictive_connpool.js | 159 | ||||
-rw-r--r-- | jstests/noPassthrough/set_step_params.js | 270 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_info.cpp | 8 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 603 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 154 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 49 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool.idl | 55 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.cpp | 249 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.h | 186 |
12 files changed, 1468 insertions, 277 deletions
diff --git a/jstests/noPassthrough/drop_connections_sharded.js b/jstests/noPassthrough/drop_connections_sharded.js index 7ac8e2e8d4d..dd90f587b42 100644 --- a/jstests/noPassthrough/drop_connections_sharded.js +++ b/jstests/noPassthrough/drop_connections_sharded.js @@ -6,7 +6,14 @@ (function() { "use strict"; - const st = new ShardingTest({shards: 1, rs0: {nodes: 3}, mongos: 1}); + // TODO ShardingTaskExecutorPoolMinSize is set to 0 so that we can clearly observe + // dropConnections. This will no longer be necessary with SERVER-41460 + const st = new ShardingTest({ + config: {nodes: 1}, + shards: 1, + rs0: {nodes: 3}, + mongos: [{setParameter: {ShardingTaskExecutorPoolMinSize: 0}}] + }); const mongos = st.s0; const rst = st.rs0; const primary = rst.getPrimary(); diff --git a/jstests/noPassthrough/predictive_connpool.js b/jstests/noPassthrough/predictive_connpool.js new file mode 100644 index 00000000000..c38d01601e2 --- /dev/null +++ b/jstests/noPassthrough/predictive_connpool.js @@ -0,0 +1,159 @@ +load("jstests/libs/parallelTester.js"); + +/** + * @tags: [requires_sharding] + */ + +(function() { + "use strict"; + + const st = new ShardingTest({mongos: 1, shards: 1, rs: {nodes: 2, protocolVersion: 1}}); + const kDbName = 'test'; + const mongosClient = st.s; + const mongos = mongosClient.getDB(kDbName); + const rst = st.rs0; + const primary = rst.getPrimary(); + const secondary = rst.getSecondaries()[0]; + + const cfg = primary.getDB('local').system.replset.findOne(); + const allHosts = cfg.members.map(x => x.host); + const primaryOnly = [primary.name]; + const secondaryOnly = [secondary.name]; + + function configureReplSetFailpoint(name, modeValue) { + st.rs0.nodes.forEach(function(node) { + assert.commandWorked(node.getDB("admin").runCommand({ + configureFailPoint: name, + mode: modeValue, + data: {shouldCheckForInterrupt: true}, + })); + }); + } + + var threads = []; + + function launchFinds({times, readPref, shouldFail}) { + jsTestLog("Starting " + times + " connections"); + for (var i = 0; i < times; i++) { + var thread = new Thread(function(connStr, readPref, dbName, shouldFail) { + var client = new Mongo(connStr); + const ret = client.getDB(dbName).runCommand( + {find: "test", limit: 1, "$readPreference": {mode: readPref}}); + + if (shouldFail) { + assert.commandFailed(ret); + } else { + assert.commandWorked(ret); + } + }, st.s.host, readPref, kDbName, shouldFail); + thread.start(); + threads.push(thread); + } + } + + function updateSetParameters(params) { + var cmd = Object.assign({"setParameter": 1}, params); + assert.commandWorked(mongos.adminCommand(cmd)); + } + + function dropConnections() { + assert.commandWorked(mongos.adminCommand({dropConnections: 1, hostAndPort: allHosts})); + } + + var currentCheckNum = 0; + function hasConnPoolStats(args) { + const checkNum = currentCheckNum++; + jsTestLog("Check #" + checkNum + ": " + tojson(args)); + var {ready, pending, active, hosts, isAbsent} = args; + + ready = ready ? ready : 0; + pending = pending ? pending : 0; + active = active ? active : 0; + hosts = hosts ? hosts : allHosts; + + function checkStats(res, host) { + var stats = res.hosts[host]; + if (!stats) { + jsTestLog("Connection stats for " + host + " are absent"); + return isAbsent; + } + + jsTestLog("Connection stats for " + host + ": " + tojson(stats)); + return stats.available == ready && stats.refreshing == pending && stats.inUse == active; + } + + function checkAllStats() { + var res = mongos.adminCommand({connPoolStats: 1}); + return hosts.map(host => checkStats(res, host)).every(x => x); + } + + assert.soon(checkAllStats, "Check #" + checkNum + " failed", 10000); + + jsTestLog("Check #" + checkNum + " successful"); + } + + function checkConnPoolStats() { + const ret = mongos.runCommand({"connPoolStats": 1}); + const poolStats = ret["pools"]["NetworkInterfaceTL-TaskExecutorPool-0"]; + jsTestLog(poolStats); + } + + function walkThroughBehavior({primaryFollows, secondaryFollows}) { + // Start pooling with a ping + mongos.adminCommand({multicast: {ping: 0}}); + checkConnPoolStats(); + + // Block connections from finishing + configureReplSetFailpoint("waitInFindBeforeMakingBatch", "alwaysOn"); + + // Launch a bunch of primary finds + launchFinds({times: 10, readPref: "primary"}); + + // Confirm we follow + hasConnPoolStats({active: 10, hosts: primaryOnly}); + if (secondaryFollows) { + hasConnPoolStats({ready: 10, hosts: secondaryOnly}); + } + checkConnPoolStats(); + + // Launch a bunch of secondary finds + launchFinds({times: 20, readPref: "secondary"}); + + // Confirm we follow + hasConnPoolStats({active: 20, hosts: secondaryOnly}); + if (primaryFollows) { + hasConnPoolStats({ready: 10, active: 10, hosts: primaryOnly}); + } + checkConnPoolStats(); + + configureReplSetFailpoint("waitInFindBeforeMakingBatch", "off"); + + dropConnections(); + } + + assert.writeOK(mongos.test.insert({x: 1})); + assert.writeOK(mongos.test.insert({x: 2})); + assert.writeOK(mongos.test.insert({x: 3})); + st.rs0.awaitReplication(); + + jsTestLog("Following disabled"); + walkThroughBehavior({primaryFollows: false, secondaryFollows: false}); + + jsTestLog("Following primary node"); + updateSetParameters({ShardingTaskExecutorPoolReplicaSetMatching: "matchPrimaryNode"}); + walkThroughBehavior({primaryFollows: false, secondaryFollows: true}); + + // jsTestLog("Following busiest node"); + // updateSetParameters({ShardingTaskExecutorPoolReplicaSetMatching: "matchBusiestNode"}); + // walkThroughBehavior({primaryFollows: true, secondaryFollows: true}); + + jsTestLog("Reseting to disabled"); + updateSetParameters({ShardingTaskExecutorPoolReplicaSetMatching: "disabled"}); + walkThroughBehavior({primaryFollows: false, secondaryFollows: false}); + + threads.forEach(function(thread) { + thread.join(); + }); + + st.stop(); +})(); diff --git a/jstests/noPassthrough/set_step_params.js b/jstests/noPassthrough/set_step_params.js new file mode 100644 index 00000000000..948ec5c117e --- /dev/null +++ b/jstests/noPassthrough/set_step_params.js @@ -0,0 +1,270 @@ +load("jstests/libs/parallelTester.js"); + +/** + * @tags: [requires_replication, requires_sharding] + */ + +(function() { + "use strict"; + + const kDbName = 'test'; + + const minConns = 4; + var stepParams = { + ShardingTaskExecutorPoolMinSize: minConns, + ShardingTaskExecutorPoolMaxSize: 10, + ShardingTaskExecutorPoolMaxConnecting: 5, + ShardingTaskExecutorPoolHostTimeoutMS: 300000, + ShardingTaskExecutorPoolRefreshRequirementMS: 60000, + ShardingTaskExecutorPoolRefreshTimeoutMS: 20000, + ShardingTaskExecutorPoolReplicaSetMatching: "disabled", + }; + + const st = new ShardingTest({ + config: {nodes: 1}, + shards: 1, + rs0: {nodes: 1}, + mongos: [{setParameter: stepParams}], + }); + const mongos = st.s0; + const rst = st.rs0; + const primary = rst.getPrimary(); + + const cfg = primary.getDB('local').system.replset.findOne(); + const allHosts = cfg.members.map(x => x.host); + const mongosDB = mongos.getDB(kDbName); + const primaryOnly = [primary.name]; + + function configureReplSetFailpoint(name, modeValue) { + st.rs0.nodes.forEach(function(node) { + assert.commandWorked(node.getDB("admin").runCommand({ + configureFailPoint: name, + mode: modeValue, + data: {shouldCheckForInterrupt: true}, + })); + }); + } + + var threads = []; + function launchFinds({times, readPref, shouldFail}) { + jsTestLog("Starting " + times + " connections"); + for (var i = 0; i < times; i++) { + var thread = new Thread(function(connStr, readPref, dbName, shouldFail) { + var client = new Mongo(connStr); + const ret = client.getDB(dbName).runCommand( + {find: "test", limit: 1, "$readPreference": {mode: readPref}}); + + if (shouldFail) { + assert.commandFailed(ret); + } else { + assert.commandWorked(ret); + } + }, st.s.host, readPref, kDbName, shouldFail); + thread.start(); + threads.push(thread); + } + } + + var currentCheckNum = 0; + function hasConnPoolStats(args) { + const checkNum = currentCheckNum++; + jsTestLog("Check #" + checkNum + ": " + tojson(args)); + var {ready, pending, active, hosts, isAbsent} = args; + + ready = ready ? ready : 0; + pending = pending ? pending : 0; + active = active ? active : 0; + hosts = hosts ? hosts : allHosts; + + function checkStats(res, host) { + var stats = res.hosts[host]; + if (!stats) { + jsTestLog("Connection stats for " + host + " are absent"); + return isAbsent; + } + + jsTestLog("Connection stats for " + host + ": " + tojson(stats)); + return stats.available == ready && stats.refreshing == pending && stats.inUse == active; + } + + function checkAllStats() { + var res = mongos.adminCommand({connPoolStats: 1}); + return hosts.map(host => checkStats(res, host)).every(x => x); + } + + assert.soon(checkAllStats, "Check #" + checkNum + " failed", 10000); + + jsTestLog("Check #" + checkNum + " successful"); + } + + function updateSetParameters(params) { + var cmd = Object.assign({"setParameter": 1}, params); + assert.commandWorked(mongos.adminCommand(cmd)); + } + + function dropConnections() { + assert.commandWorked(mongos.adminCommand({dropConnections: 1, hostAndPort: allHosts})); + } + + function resetPools() { + dropConnections(); + mongos.adminCommand({multicast: {ping: 0}}); + hasConnPoolStats({ready: 4}); + dropConnections(); + hasConnPoolStats({}); + } + + function runSubTest(name, fun) { + jsTestLog("Running test for " + name); + + resetPools(); + + fun(); + + updateSetParameters(stepParams); + } + + assert.writeOK(mongosDB.test.insert({x: 1})); + assert.writeOK(mongosDB.test.insert({x: 2})); + assert.writeOK(mongosDB.test.insert({x: 3})); + st.rs0.awaitReplication(); + + runSubTest("MinSize", function() { + // Launch an initial find to trigger to min + launchFinds({times: 1, readPref: "primary"}); + hasConnPoolStats({ready: minConns}); + + // Increase by one + updateSetParameters({ShardingTaskExecutorPoolMinSize: 5}); + hasConnPoolStats({ready: 5}); + + // Increase to MaxSize + updateSetParameters({ShardingTaskExecutorPoolMinSize: 10}); + hasConnPoolStats({ready: 10}); + + // Decrease to zero + updateSetParameters({ShardingTaskExecutorPoolMinSize: 0}); + }); + + runSubTest("MaxSize", function() { + configureReplSetFailpoint("waitInFindBeforeMakingBatch", "alwaysOn"); + + // Launch 10 blocked finds + launchFinds({times: 10, readPref: "primary"}); + hasConnPoolStats({active: 10, hosts: primaryOnly}); + + // Increase by 5 and Launch another 4 blocked finds + updateSetParameters({ShardingTaskExecutorPoolMaxSize: 15}); + launchFinds({times: 4, readPref: "primary"}); + hasConnPoolStats({active: 14, hosts: primaryOnly}); + + // Launch yet another 2, these should add only 1 connection + launchFinds({times: 2, readPref: "primary"}); + hasConnPoolStats({active: 15, hosts: primaryOnly}); + + configureReplSetFailpoint("waitInFindBeforeMakingBatch", "off"); + hasConnPoolStats({ready: 15, pending: 0, hosts: primaryOnly}); + }); + + // Test maxConnecting + runSubTest("MaxConnecting", function() { + const maxPending1 = 2; + const maxPending2 = 4; + const conns = 6; + + updateSetParameters({ + ShardingTaskExecutorPoolMaxSize: 100, + ShardingTaskExecutorPoolMaxConnecting: maxPending1, + }); + + configureReplSetFailpoint("waitInIsMaster", "alwaysOn"); + configureReplSetFailpoint("waitInFindBeforeMakingBatch", "alwaysOn"); + + // Go to the limit of maxConnecting, so we're stuck here + launchFinds({times: maxPending1, readPref: "primary"}); + hasConnPoolStats({pending: maxPending1}); + + // More won't run right now + launchFinds({times: conns - maxPending1, readPref: "primary"}); + hasConnPoolStats({pending: maxPending1}); + + // If we increase our limit, it should fill in some of the connections + updateSetParameters({ShardingTaskExecutorPoolMaxConnecting: maxPending2}); + hasConnPoolStats({pending: maxPending2}); + + // Dropping the limit doesn't cause us to drop pending + updateSetParameters({ShardingTaskExecutorPoolMaxConnecting: maxPending1}); + hasConnPoolStats({pending: maxPending2}); + + // Release our pending and walk away + configureReplSetFailpoint("waitInIsMaster", "off"); + hasConnPoolStats({active: conns}); + configureReplSetFailpoint("waitInFindBeforeMakingBatch", "off"); + }); + + runSubTest("Timeouts", function() { + const conns = minConns; + const pendingTimeoutMS = 5000; + const toRefreshTimeoutMS = 1000; + const idleTimeoutMS1 = 20000; + const idleTimeoutMS2 = 15500; + + // Updating separately since the validation depends on existing params + updateSetParameters({ + ShardingTaskExecutorPoolRefreshTimeoutMS: pendingTimeoutMS, + }); + updateSetParameters({ + ShardingTaskExecutorPoolRefreshRequirementMS: toRefreshTimeoutMS, + }); + updateSetParameters({ + ShardingTaskExecutorPoolHostTimeoutMS: idleTimeoutMS1, + }); + + // Make ready connections + configureReplSetFailpoint("waitInFindBeforeMakingBatch", "alwaysOn"); + launchFinds({times: conns, readPref: "primary"}); + configureReplSetFailpoint("waitInFindBeforeMakingBatch", "off"); + hasConnPoolStats({ready: conns}); + + // Block refreshes and wait for the toRefresh timeout + configureReplSetFailpoint("waitInIsMaster", "alwaysOn"); + sleep(toRefreshTimeoutMS); + + // Confirm that we're in pending for all of our conns + hasConnPoolStats({pending: conns}); + + // Set our min conns to 0 to make sure we don't refresh after pending timeout + updateSetParameters({ + ShardingTaskExecutorPoolMinSize: 0, + }); + + // Wait for our pending timeout + sleep(pendingTimeoutMS); + hasConnPoolStats({}); + + configureReplSetFailpoint("waitInIsMaster", "off"); + + // Reset the min conns to make sure normal refresh doesn't extend the timeout + updateSetParameters({ + ShardingTaskExecutorPoolMinSize: minConns, + }); + + // Wait for our host timeout and confirm the pool drops + sleep(idleTimeoutMS1); + hasConnPoolStats({isAbsent: true}); + + // Reset the pool + resetPools(); + + // Sleep for a shorter timeout and then update so we're already expired + sleep(idleTimeoutMS2); + updateSetParameters({ShardingTaskExecutorPoolHostTimeoutMS: idleTimeoutMS2}); + hasConnPoolStats({isAbsent: true}); + }); + + threads.forEach(function(thread) { + thread.join(); + }); + + st.stop(); +})(); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index a9531a25cac..eda9c1c6315 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1693,6 +1693,7 @@ env.Library( '$BUILD_DIR/mongo/db/auth/saslauth', '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/query_exec', + "$BUILD_DIR/mongo/util/fail_point", 'oplog', 'oplogreader', 'repl_coordinator_interface', diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index 855356c8c9e..e3f26e73513 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -58,10 +58,14 @@ #include "mongo/executor/network_interface.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" #include "mongo/util/map_util.h" namespace mongo { +MONGO_FAIL_POINT_DEFINE(waitInIsMaster); + using std::unique_ptr; using std::list; using std::string; @@ -236,6 +240,10 @@ public: const BSONObj& cmdObj, BSONObjBuilder& result) final { CommandHelpers::handleMarkKillOnClientDisconnect(opCtx); + + // TODO Unwind after SERVER-41070 + MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx, waitInIsMaster); + /* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not authenticated. */ diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 3c4454110c4..034c369afd4 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -33,6 +33,8 @@ #include "mongo/executor/connection_pool.h" +#include <fmt/format.h> + #include "mongo/bson/bsonobjbuilder.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/remote_command_request.h" @@ -42,6 +44,8 @@ #include "mongo/util/lru_cache.h" #include "mongo/util/scopeguard.h" +using namespace fmt::literals; + // One interesting implementation note herein concerns how setup() and // refresh() are invoked outside of the global lock, but setTimeout is not. // This implementation detail simplifies mocks, allowing them to return @@ -82,6 +86,84 @@ size_t ConnectionPool::ConnectionInterface::getGeneration() const { return _generation; } +void ConnectionPool::ControllerInterface::init(ConnectionPool* pool) { + invariant(pool); + + LOG(2) << "Controller for " << pool->_name << " is " << name(); + _pool = pool; +} + +std::string ConnectionPool::ConnectionControls::toString() const { + return "{{ maxPending: {}, target: {}, }}"_format(maxPendingConnections, targetConnections); +} + +std::string ConnectionPool::HostState::toString() const { + return "{{ requests: {}, ready: {}, pending: {}, active: {}, isExpired: {} }}"_format( + requests, ready, pending, active, health.isExpired); +} + +/** + * Standard controller for the ConnectionPool + * + * This class uses the Options struct in the ConnectionPool to determine its controls. + */ +class ConnectionPool::LimitController final : public ConnectionPool::ControllerInterface { +public: + HostGroup updateHost(const SpecificPool* pool, + const HostAndPort& host, + const HostState& stats) override { + stdx::lock_guard lk(_mutex); + auto& data = _poolData[pool]; + + const auto minConns = getPool()->_options.minConnections; + const auto maxConns = getPool()->_options.maxConnections; + + data.target = stats.requests + stats.active; + if (data.target < minConns) { + data.target = minConns; + } else if (data.target > maxConns) { + data.target = maxConns; + } + + return {{host}, stats.health.isExpired}; + } + void removeHost(const SpecificPool* pool) override { + stdx::lock_guard lk(_mutex); + _poolData.erase(pool); + } + + ConnectionControls getControls(const SpecificPool* pool) override { + stdx::lock_guard lk(_mutex); + const auto& data = _poolData[pool]; + + return { + getPool()->_options.maxConnecting, data.target, + }; + } + + Milliseconds hostTimeout() const override { + return getPool()->_options.hostTimeout; + } + Milliseconds pendingTimeout() const override { + return getPool()->_options.refreshTimeout; + } + Milliseconds toRefreshTimeout() const override { + return getPool()->_options.refreshRequirement; + } + + StringData name() const override { + return "LimitController"_sd; + } + +protected: + struct Data { + size_t target = 0; + }; + + stdx::mutex _mutex; + stdx::unordered_map<const SpecificPool*, Data> _poolData; +}; + /** * A pool for a specific HostAndPort * @@ -91,6 +173,8 @@ size_t ConnectionPool::ConnectionInterface::getGeneration() const { */ class ConnectionPool::SpecificPool final : public std::enable_shared_from_this<ConnectionPool::SpecificPool> { + static constexpr int kDiagnosticLogLevel = 3; + public: /** * Whenever a function enters a specific pool, the function needs to be guarded by the lock. @@ -120,15 +204,30 @@ public: ~SpecificPool(); /** + * Create and initialize a SpecificPool + */ + static auto make(std::shared_ptr<ConnectionPool> parent, + const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode); + + /** + * Triggers a controller update, potentially changes the request timer, + * and maybe delists from pool + * + * This should only be called by the ConnectionPool or StateLock + */ + void updateState(); + + /** * Gets a connection from the specific pool. Sinks a unique_lock from the * parent to preserve the lock on _mutex */ Future<ConnectionHandle> getConnection(Milliseconds timeout); /** - * Triggers the shutdown procedure. This function marks the state as kInShutdown - * and calls processFailure below with the status provided. This may not immediately - * delist or destruct this pool. However, both will happen eventually as ConnectionHandles + * Triggers the shutdown procedure. This function sets isShutdown to true + * and calls processFailure below with the status provided. This immediately removes this pool + * from the ConnectionPool. The actual destruction will happen eventually as ConnectionHandles * are deleted. */ void triggerShutdown(const Status& status); @@ -141,37 +240,43 @@ public: void processFailure(const Status& status); /** - * Returns a connection to a specific pool. Sinks a unique_lock from the - * parent to preserve the lock on _mutex - */ - void returnConnection(ConnectionInterface* connection); - - /** * Returns the number of connections currently checked out of the pool. */ - size_t inUseConnections(); + size_t inUseConnections() const; /** * Returns the number of available connections in the pool. */ - size_t availableConnections(); + size_t availableConnections() const; /** * Returns the number of in progress connections in the pool. */ - size_t refreshingConnections(); + size_t refreshingConnections() const; /** * Returns the total number of connections ever created in this pool. */ - size_t createdConnections(); + size_t createdConnections() const; /** * Returns the total number of connections currently open that belong to * this pool. This is the sum of refreshingConnections, availableConnections, * and inUseConnections. */ - size_t openConnections(); + size_t openConnections() const; + + /** + * Returns the number of unfulfilled requests pending. + */ + size_t requestsPending() const; + + /** + * Returns the HostAndPort for this pool. + */ + const HostAndPort& host() const { + return _hostAndPort; + } /** * Return true if the tags on the specific pool match the passed in tags @@ -195,8 +300,6 @@ public: } } - void spawnConnections(); - template <typename CallableT> void runOnExecutor(CallableT&& cb) { ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) // @@ -207,8 +310,6 @@ public: }); } - void updateState(); - private: using OwnedConnection = std::shared_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; @@ -222,13 +323,20 @@ private: ConnectionHandle makeHandle(ConnectionInterface* connection); + /** + * Establishes connections until the ControllerInterface's target is met. + */ + void spawnConnections(); + void finishRefresh(ConnectionInterface* connPtr, Status status); void addToReady(OwnedConnection conn); void fulfillRequests(); - // This internal helper is used both by get and by fulfillRequests and differs in that it + void returnConnection(ConnectionInterface* connPtr); + + // This internal helper is used both by get and by _fulfillRequests and differs in that it // skips some bookkeeping that the other callers do on their own ConnectionHandle tryGetConnection(); @@ -238,6 +346,15 @@ private: OwnedConnection takeFromProcessingPool(ConnectionInterface* connection); + // Update the health struct and related variables + void updateHealth(); + + // Update the event timer for this host pool + void updateEventTimer(); + + // Update the controller and potentially change the controls + void updateController(); + private: const std::shared_ptr<ConnectionPool> _parent; @@ -250,43 +367,36 @@ private: OwnershipPool _checkedOutPool; std::vector<Request> _requests; + Date_t _lastActiveTime; - std::shared_ptr<TimerInterface> _requestTimer; - Date_t _requestTimerExpiration; - size_t _generation; - bool _inFulfillRequests; - bool _inSpawnConnections; + std::shared_ptr<TimerInterface> _eventTimer; + Date_t _eventTimerExpiration; + Date_t _hostExpiration; - size_t _created; + // The _generation is the set of connection objects we believe are healthy. + // It increases when we process a failure. If a connection is from a previous generation, + // it will be discarded on return/refresh. + size_t _generation = 0; - transport::Session::TagMask _tags = transport::Session::kPending; - - /** - * The current state of the pool - * - * The pool begins in a running state. Moves to idle when no requests - * are pending and no connections are checked out. It finally enters - * shutdown after hostTimeout has passed (and waits there for current - * refreshes to process out). - * - * At any point a new request sets the state back to running and - * restarts all timers. - */ - enum class State { - // The pool is active - kRunning, + bool _inFulfillRequests = false; + bool _inControlLoop = false; - // No current activity, waiting for hostTimeout to pass - kIdle, + size_t _created = 0; - // hostTimeout is passed, we're waiting for any processing - // connections to finish before shutting down - kInShutdown, - }; + transport::Session::TagMask _tags = transport::Session::kPending; - State _state; + HostHealth _health; }; +auto ConnectionPool::SpecificPool::make(std::shared_ptr<ConnectionPool> parent, + const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode) { + auto pool = std::make_shared<SpecificPool>(std::move(parent), hostAndPort, sslMode); + pool->updateEventTimer(); + pool->updateHealth(); + return pool; +} + const Status ConnectionPool::kConnectionStateUnknown = Status(ErrorCodes::InternalError, "Connection is in an unknown state"); @@ -294,12 +404,18 @@ ConnectionPool::ConnectionPool(std::shared_ptr<DependentTypeFactoryInterface> im std::string name, Options options) : _name(std::move(name)), - _options(std::move(options)), _factory(std::move(impl)), + _options(std::move(options)), + _controller(std::move(_options.controller)), _manager(options.egressTagCloserManager) { if (_manager) { _manager->add(this); } + + if (!_controller) { + _controller = std::make_shared<LimitController>(); + } + _controller->init(this); } ConnectionPool::~ConnectionPool() { @@ -393,9 +509,10 @@ SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPo transport::ConnectSSLMode sslMode, Milliseconds timeout) { stdx::lock_guard lk(_mutex); + auto& pool = _pools[hostAndPort]; if (!pool) { - pool = std::make_shared<SpecificPool>(shared_from_this(), hostAndPort, sslMode); + pool = SpecificPool::make(shared_from_this(), hostAndPort, sslMode); } else { pool->fassertSSLModeIs(sslMode); } @@ -439,65 +556,73 @@ ConnectionPool::SpecificPool::SpecificPool(std::shared_ptr<ConnectionPool> paren : _parent(std::move(parent)), _sslMode(sslMode), _hostAndPort(hostAndPort), - _readyPool(std::numeric_limits<size_t>::max()), - _generation(0), - _inFulfillRequests(false), - _inSpawnConnections(false), - _created(0), - _state(State::kRunning) { + _readyPool(std::numeric_limits<size_t>::max()) { invariant(_parent); - _requestTimer = _parent->_factory->makeTimer(); + _eventTimer = _parent->_factory->makeTimer(); } ConnectionPool::SpecificPool::~SpecificPool() { - DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();) + DESTRUCTOR_GUARD(_eventTimer->cancelTimeout();) invariant(_requests.empty()); invariant(_checkedOutPool.empty()); } -size_t ConnectionPool::SpecificPool::inUseConnections() { +size_t ConnectionPool::SpecificPool::inUseConnections() const { return _checkedOutPool.size(); } -size_t ConnectionPool::SpecificPool::availableConnections() { +size_t ConnectionPool::SpecificPool::availableConnections() const { return _readyPool.size(); } -size_t ConnectionPool::SpecificPool::refreshingConnections() { +size_t ConnectionPool::SpecificPool::refreshingConnections() const { return _processingPool.size(); } -size_t ConnectionPool::SpecificPool::createdConnections() { +size_t ConnectionPool::SpecificPool::createdConnections() const { return _created; } -size_t ConnectionPool::SpecificPool::openConnections() { +size_t ConnectionPool::SpecificPool::openConnections() const { return _checkedOutPool.size() + _readyPool.size() + _processingPool.size(); } +size_t ConnectionPool::SpecificPool::requestsPending() const { + return _requests.size(); +} + Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection( Milliseconds timeout) { - invariant(_state != State::kInShutdown); - auto conn = tryGetConnection(); + // Reset our activity timestamp + auto now = _parent->_factory->now(); + _lastActiveTime = now; - if (conn) { - return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn)); + // If we do not have requests, then we can fulfill immediately + if (_requests.size() == 0) { + auto conn = tryGetConnection(); + + if (conn) { + LOG(kDiagnosticLogLevel) << "Requesting new connection to " << _hostAndPort + << "--using existing idle connection"; + return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn)); + } } - if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) { - timeout = _parent->_options.refreshTimeout; + auto pendingTimeout = _parent->_controller->pendingTimeout(); + if (timeout < Milliseconds(0) || timeout > pendingTimeout) { + timeout = pendingTimeout; } + LOG(kDiagnosticLogLevel) << "Requesting new connection to " << _hostAndPort << " with timeout " + << timeout; - const auto expiration = _parent->_factory->now() + timeout; + const auto expiration = now + timeout; auto pf = makePromiseFuture<ConnectionHandle>(); _requests.push_back(make_pair(expiration, std::move(pf.promise))); std::push_heap(begin(_requests), end(_requests), RequestComparator{}); - runOnExecutor([ this, anchor = shared_from_this() ]() { spawnConnections(); }); - return std::move(pf.future); } @@ -505,7 +630,10 @@ auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) - auto deleter = [ this, anchor = shared_from_this() ](ConnectionInterface * connection) { runOnExecutor([this, connection]() { stdx::lock_guard lk(_parent->_mutex); + returnConnection(connection); + + _lastActiveTime = _parent->_factory->now(); updateState(); }); }; @@ -547,34 +675,38 @@ void ConnectionPool::SpecificPool::finishRefresh(ConnectionInterface* connPtr, S auto conn = takeFromProcessingPool(connPtr); // If we're in shutdown, we don't need refreshed connections - if (_state == State::kInShutdown) + if (_health.isShutdown) { return; + } // If we've exceeded the time limit, start a new connect, // rather than failing all operations. We do this because the // various callers have their own time limit which is unrelated // to our internal one. if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { - LOG(0) << "Pending connection to host " << _hostAndPort - << " did not complete within the connection timeout," - << " retrying with a new connection;" << openConnections() - << " connections to that host remain open"; - spawnConnections(); + LOG(kDiagnosticLogLevel) << "Pending connection to host " << _hostAndPort + << " did not complete within the connection timeout," + << " retrying with a new connection;" << openConnections() + << " connections to that host remain open"; return; } // Pass a failure on through if (!status.isOK()) { + LOG(kDiagnosticLogLevel) << "Connection failed to " << _hostAndPort << " due to " + << redact(status); processFailure(status); return; } // If the host and port were dropped, let this lapse and spawn new connections - if (conn->getGeneration() != _generation) { - spawnConnections(); + if (!conn || conn->getGeneration() != _generation) { + LOG(kDiagnosticLogLevel) << "Dropping late refreshed connection to " << _hostAndPort; return; } + LOG(kDiagnosticLogLevel) << "Finishing connection refresh for " << _hostAndPort; + // If the connection refreshed successfully, throw it back in the ready pool addToReady(std::move(conn)); @@ -582,7 +714,7 @@ void ConnectionPool::SpecificPool::finishRefresh(ConnectionInterface* connPtr, S } void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr) { - auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement; + auto needsRefreshTP = connPtr->getLastUsed() + _parent->_controller->toRefreshTimeout(); auto conn = takeFromPool(_checkedOutPool, connPtr); invariant(conn); @@ -592,10 +724,11 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr return; } - if (!conn->getStatus().isOK()) { + if (auto status = conn->getStatus(); !status.isOK()) { // TODO: alert via some callback if the host is bad - log() << "Ending connection to host " << _hostAndPort << " due to bad connection status; " - << openConnections() << " connections to that host remain open"; + log() << "Ending connection to host " << _hostAndPort + << " due to bad connection status: " << redact(status) << "; " << openConnections() + << " connections to that host remain open"; return; } @@ -603,8 +736,9 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr if (needsRefreshTP <= now) { // If we need to refresh this connection + auto controls = _parent->_controller->getControls(this); if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >= - _parent->_options.minConnections) { + controls.targetConnections) { // If we already have minConnections, just let the connection lapse log() << "Ending idle connection to host " << _hostAndPort << " because the pool meets constraints; " << openConnections() @@ -614,7 +748,8 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr _processingPool[connPtr] = std::move(conn); - connPtr->refresh(_parent->_options.refreshTimeout, + LOG(kDiagnosticLogLevel) << "Refreshing connection to " << _hostAndPort; + connPtr->refresh(_parent->_controller->pendingTimeout(), guardCallback([this](auto conn, auto status) { finishRefresh(std::move(conn), std::move(status)); })); @@ -623,6 +758,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr } // If it's fine as it is, just put it in the ready queue + LOG(kDiagnosticLogLevel) << "Returning ready connection to " << _hostAndPort; addToReady(std::move(conn)); fulfillRequests(); @@ -638,37 +774,44 @@ void ConnectionPool::SpecificPool::addToReady(OwnedConnection conn) { // Our strategy for refreshing connections is to check them out and // immediately check them back in (which kicks off the refresh logic in // returnConnection - connPtr->setTimeout(_parent->_options.refreshRequirement, guardCallback([this, connPtr]() { - auto conn = takeFromPool(_readyPool, connPtr); + auto returnConnectionFunc = guardCallback([this, connPtr]() { + LOG(kDiagnosticLogLevel) << "Triggered refresh timeout for " << _hostAndPort; + auto conn = takeFromPool(_readyPool, connPtr); - // We've already been checked out. We don't need to refresh - // ourselves. - if (!conn) - return; + // We've already been checked out. We don't need to refresh ourselves. + if (!conn) + return; - // If we're in shutdown, we don't need to refresh connections - if (_state == State::kInShutdown) - return; + // If we're in shutdown, we don't need to refresh connections + if (_health.isShutdown) + return; - _checkedOutPool[connPtr] = std::move(conn); + _checkedOutPool[connPtr] = std::move(conn); - connPtr->indicateSuccess(); + connPtr->indicateSuccess(); - returnConnection(connPtr); - })); + returnConnection(connPtr); + }); + connPtr->setTimeout(_parent->_controller->toRefreshTimeout(), std::move(returnConnectionFunc)); } // Sets state to shutdown and kicks off the failure protocol to tank existing connections void ConnectionPool::SpecificPool::triggerShutdown(const Status& status) { - _state = State::kInShutdown; - _droppedProcessingPool.clear(); + _health.isShutdown = true; + + LOG(2) << "Delisting connection pool for " << _hostAndPort; + _parent->_controller->removeHost(this); + _parent->_pools.erase(_hostAndPort); + processFailure(status); + + _droppedProcessingPool.clear(); + _eventTimer->cancelTimeout(); } // Drop connections and fail all requests void ConnectionPool::SpecificPool::processFailure(const Status& status) { - // Bump the generation so we don't reuse any pending or checked out - // connections + // Bump the generation so we don't reuse any pending or checked out connections _generation++; if (!_readyPool.empty() || !_processingPool.empty()) { @@ -686,24 +829,24 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status) { // Migrate processing connections to the dropped pool for (auto&& x : _processingPool) { - if (_state != State::kInShutdown) { - // If we're just dropping the pool, we can reuse them later - _droppedProcessingPool[x.first] = std::move(x.second); - } + // If we're just dropping the pool, we can reuse them later + _droppedProcessingPool[x.first] = std::move(x.second); } _processingPool.clear(); - // Move the requests out so they aren't visible - // in other threads - decltype(_requests) requestsToFail; - { - using std::swap; - swap(requestsToFail, _requests); + // Mark ourselves as failed so we don't immediately respawn + _health.isFailed = true; + + if (_requests.empty()) { + return; } - for (auto& request : requestsToFail) { + for (auto& request : _requests) { request.second.setError(status); } + + LOG(kDiagnosticLogLevel) << "Failing requests to " << _hostAndPort; + _requests.clear(); } // fulfills as many outstanding requests as possible @@ -715,8 +858,10 @@ void ConnectionPool::SpecificPool::fulfillRequests() { _inFulfillRequests = true; auto guard = makeGuard([&] { _inFulfillRequests = false; }); - while (_requests.size()) { + // Marking this as our newest active time + _lastActiveTime = _parent->_factory->now(); + // Caution: If this returns with a value, it's important that we not throw until we've // emplaced the promise (as returning a connection would attempt to take the lock and would // deadlock). @@ -735,37 +880,38 @@ void ConnectionPool::SpecificPool::fulfillRequests() { promise.emplaceValue(std::move(conn)); } - - spawnConnections(); } -// spawn enough connections to satisfy open requests and minpool, while -// honoring maxpool +// spawn enough connections to satisfy open requests and minpool, while honoring maxpool void ConnectionPool::SpecificPool::spawnConnections() { - // If some other thread (possibly this thread) is spawning connections, - // don't keep padding the callstack. - if (_inSpawnConnections) + if (_health.isFailed) { + LOG(kDiagnosticLogLevel) + << "Pool for " << _hostAndPort + << " has failed recently. Postponing any attempts to spawn connections"; return; + } - _inSpawnConnections = true; - auto guard = makeGuard([&] { _inSpawnConnections = false; }); + auto controls = _parent->_controller->getControls(this); + LOG(kDiagnosticLogLevel) << "Comparing connection state for " << _hostAndPort + << " to Controls: " << controls; - // We want minConnections <= outstanding requests <= maxConnections - auto target = [&] { - return std::max( - _parent->_options.minConnections, - std::min(_requests.size() + _checkedOutPool.size(), _parent->_options.maxConnections)); - }; + auto pendingConnections = refreshingConnections(); + if (pendingConnections >= controls.maxPendingConnections) { + return; + } - // While all of our inflight connections are less than our target - while ((_state != State::kInShutdown) && - (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) && - (_processingPool.size() < _parent->_options.maxConnecting)) { - if (_readyPool.empty() && _processingPool.empty()) { - auto severity = MONGO_GET_LIMITED_SEVERITY(_hostAndPort, Seconds{1}, 0, 2); - LOG(severity) << "Connecting to " << _hostAndPort; - } + auto totalConnections = openConnections(); + if (totalConnections >= controls.targetConnections) { + return; + } + + auto severity = MONGO_GET_LIMITED_SEVERITY(_hostAndPort, Seconds{1}, 0, 2); + LOG(severity) << "Connecting to " << _hostAndPort; + auto allowance = std::min(controls.targetConnections - totalConnections, + controls.maxPendingConnections - pendingConnections); + LOG(kDiagnosticLogLevel) << "Spawning " << allowance << " connections to " << _hostAndPort; + for (decltype(allowance) i = 0; i < allowance; ++i) { OwnedConnection handle; try { // make a new connection and put it in processing @@ -776,17 +922,13 @@ void ConnectionPool::SpecificPool::spawnConnections() { } _processingPool[handle.get()] = handle; - ++_created; // Run the setup callback - handle->setup(_parent->_options.refreshTimeout, + handle->setup(_parent->_controller->pendingTimeout(), guardCallback([this](auto conn, auto status) { finishRefresh(std::move(conn), std::move(status)); })); - - // Note that this assumes that the refreshTimeout is sound for the - // setupTimeout } } @@ -806,96 +948,145 @@ ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::take ConnectionInterface* connPtr) { auto conn = takeFromPool(_processingPool, connPtr); if (conn) { - invariant(_state != State::kInShutdown); + invariant(!_health.isShutdown); return conn; } return takeFromPool(_droppedProcessingPool, connPtr); } +void ConnectionPool::SpecificPool::updateHealth() { + const auto now = _parent->_factory->now(); -// Updates our state and manages the request timer -void ConnectionPool::SpecificPool::updateState() { - if (_state == State::kInShutdown) { - // If we're in shutdown, there is nothing to update. Our clients are all gone. - if (_processingPool.empty()) { - // If we have no more clients that require access to us, delist from the parent pool - LOG(2) << "Delisting connection pool for " << _hostAndPort; - _parent->_pools.erase(_hostAndPort); - } - return; + // We're expired if we have no sign of connection use and are past our expiry + _health.isExpired = _requests.empty() && _checkedOutPool.empty() && (_hostExpiration <= now); + + // We're failed until we get new requests or our timer triggers + if (_health.isFailed) { + _health.isFailed = _requests.empty(); } +} - if (_requests.size()) { - // We have some outstanding requests, we're live +void ConnectionPool::SpecificPool::updateEventTimer() { + const auto now = _parent->_factory->now(); - // If we were already running and the timer is the same as it was - // before, nothing to do - if (_state == State::kRunning && _requestTimerExpiration == _requests.front().first) - return; + // If our pending event has triggered, then schedule a retry as the next event + auto nextEventTime = _eventTimerExpiration; + if (nextEventTime <= now) { + nextEventTime = now + kHostRetryTimeout; + } - _state = State::kRunning; + // If our expiration comes before our next event, then it is the next event + if (_requests.empty() && _checkedOutPool.empty()) { + _hostExpiration = _lastActiveTime + _parent->_controller->hostTimeout(); + if ((_hostExpiration > now) && (_hostExpiration < nextEventTime)) { + nextEventTime = _hostExpiration; + } + } - _requestTimer->cancelTimeout(); + // If a request would timeout before the next event, then it is the next event + if (_requests.size() && (_requests.front().first < nextEventTime)) { + nextEventTime = _requests.front().first; + } - _requestTimerExpiration = _requests.front().first; + // If our timer is already set to the next event, then we're done + if (nextEventTime == _eventTimerExpiration) { + return; + } - auto timeout = _requests.front().first - _parent->_factory->now(); + _eventTimerExpiration = nextEventTime; + // TODO Our timeout can be a Date_t after SERVER-41459 + const auto timeout = _eventTimerExpiration - now; - // We set a timer for the most recent request, then invoke each timed - // out request we couldn't service - _requestTimer->setTimeout( - timeout, guardCallback([this]() { - auto now = _parent->_factory->now(); + _eventTimer->cancelTimeout(); - while (_requests.size()) { - auto& x = _requests.front(); + // Set our event timer to timeout requests, refresh the state, and potentially expire this pool + auto deferredStateUpdateFunc = guardCallback([this]() { + auto now = _parent->_factory->now(); - if (x.first <= now) { - auto promise = std::move(x.second); - std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); - _requests.pop_back(); + _health.isFailed = false; - promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, - "Couldn't get a connection within the time limit")); - } else { - break; - } - } - })); - } else if (_checkedOutPool.size()) { - // If we have no requests, but someone's using a connection, we just - // hang around until the next request or a return + while (_requests.size() && (_requests.front().first <= now)) { + std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); - _requestTimer->cancelTimeout(); - _state = State::kRunning; - _requestTimerExpiration = _requestTimerExpiration.max(); - } else { - // If we don't have any live requests and no one has checked out connections + auto& request = _requests.back(); + request.second.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, + "Couldn't get a connection within the time limit")); + _requests.pop_back(); - // If we used to be idle, just bail - if (_state == State::kIdle) - return; + // Since we've failed a request, we've interacted with external users + _lastActiveTime = now; + } + }); + _eventTimer->setTimeout(timeout, std::move(deferredStateUpdateFunc)); +} - _state = State::kIdle; +void ConnectionPool::SpecificPool::updateController() { + if (std::exchange(_inControlLoop, true)) { + return; + } + const auto guard = makeGuard([&] { _inControlLoop = false; }); - _requestTimer->cancelTimeout(); + auto& controller = *_parent->_controller; - _requestTimerExpiration = _parent->_factory->now() + _parent->_options.hostTimeout; + // Update our own state + HostState state{ + _health, + requestsPending(), + refreshingConnections(), + availableConnections(), + inUseConnections(), + }; + LOG(kDiagnosticLogLevel) << "Updating controller for " << _hostAndPort + << " with State: " << state; + auto hostGroup = controller.updateHost(this, _hostAndPort, std::move(state)); + + // If we can shutdown, then do so + if (hostGroup.canShutdown) { + for (const auto& host : hostGroup.hosts) { + auto it = _parent->_pools.find(host); + if (it == _parent->_pools.end()) { + continue; + } + + auto& pool = it->second; + + // At the moment, controllers will never mark for shutdown a pool with active + // connections or pending requests. isExpired is never true if these invariants are + // false. That's not to say that it's a terrible idea, but if this happens then we + // should review what it means to be expired. + + invariant(pool->_checkedOutPool.empty()); + invariant(pool->_requests.empty()); + + pool->triggerShutdown(Status(ErrorCodes::ShutdownInProgress, + str::stream() << "Pool for " << host << " has expired.")); + } + return; + } - auto timeout = _parent->_options.hostTimeout; - // Set the shutdown timer, this gets reset on any request - _requestTimer->setTimeout( - timeout, guardCallback([this]() { - if (_state != State::kIdle) - return; + // Make sure all related hosts exist + for (const auto& host : hostGroup.hosts) { + if (auto& pool = _parent->_pools[host]; !pool) { + pool = SpecificPool::make(_parent, host, _sslMode); + } + } - triggerShutdown( - Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, - "Connection pool has been idle for longer than the host timeout")); - })); + runOnExecutor([ this, anchor = shared_from_this() ]() { spawnConnections(); }); +} + +// Updates our state and manages the request timer +void ConnectionPool::SpecificPool::updateState() { + if (_health.isShutdown) { + // If we're in shutdown, there is nothing to update. Our clients are all gone. + LOG(kDiagnosticLogLevel) << _hostAndPort << " is dead"; + return; } + + updateEventTimer(); + updateHealth(); + updateController(); } } // namespace executor diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 9fb22e2c71e..6eca977ca1e 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -63,24 +63,28 @@ struct ConnectionPoolStats; * HostAndPort. See comments on the various Options for how the pool operates. */ class ConnectionPool : public EgressTagCloser, public std::enable_shared_from_this<ConnectionPool> { - class SpecificPool; + class LimitController; public: + class SpecificPool; + class ConnectionInterface; class DependentTypeFactoryInterface; class TimerInterface; + class ControllerInterface; using ConnectionHandleDeleter = stdx::function<void(ConnectionInterface* connection)>; using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>; using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>; - static constexpr Milliseconds kDefaultHostTimeout = Minutes(5); static constexpr size_t kDefaultMaxConns = std::numeric_limits<size_t>::max(); static constexpr size_t kDefaultMinConns = 1; static constexpr size_t kDefaultMaxConnecting = 2; + static constexpr Milliseconds kDefaultHostTimeout = Minutes(5); static constexpr Milliseconds kDefaultRefreshRequirement = Minutes(1); static constexpr Milliseconds kDefaultRefreshTimeout = Seconds(20); + static constexpr Milliseconds kHostRetryTimeout = Seconds(1); static const Status kConnectionStateUnknown; @@ -137,6 +141,80 @@ public: * Connections created through this connection pool will not attempt to authenticate. */ bool skipAuthentication = false; + + std::shared_ptr<ControllerInterface> controller; + }; + + /** + * A set of flags describing the health of a host pool + */ + struct HostHealth { + /** + * The pool is expired and can be shutdown by updateController + * + * This flag is set to true when there have been no connection requests or in use + * connections for ControllerInterface::hostTimeout(). + * + * This flag is set to false whenever a connection is requested. + */ + bool isExpired = false; + + /** + * The pool has processed a failure and will not spawn new connections until requested + * + * This flag is set to true by processFailure(), and thus also triggerShutdown(). + * + * This flag is set to false whenever a connection is requested. + * + * As a further note, this prevents us from spamming a failed host with connection + * attempts. If an external user believes a host should be available, they can request + * again. + */ + bool isFailed = false; + + /** + * The pool is shutdown and will never be called by the ConnectionPool again. + * + * This flag is set to true by triggerShutdown() or updateController(). It is never unset. + */ + bool isShutdown = false; + }; + + /** + * The state of connection pooling for a single host + * + * This should only be constructed by the SpecificPool. + */ + struct HostState { + HostHealth health; + size_t requests = 0; + size_t pending = 0; + size_t ready = 0; + size_t active = 0; + + std::string toString() const; + }; + + /** + * A simple set of controls to direct a single host + * + * This should only be constructed by a ControllerInterface + */ + struct ConnectionControls { + size_t maxPendingConnections = kDefaultMaxConnecting; + size_t targetConnections = 0; + + std::string toString() const; + }; + + /** + * A set of hosts and a flag canShutdown for if the group can shutdown + * + * This should only be constructed by a ControllerInterface + */ + struct HostGroup { + std::vector<HostAndPort> hosts; + bool canShutdown = false; }; explicit ConnectionPool(std::shared_ptr<DependentTypeFactoryInterface> impl, @@ -169,11 +247,10 @@ public: private: std::string _name; - // Options are set at startup and never changed at run time, so these are - // accessed outside the lock - const Options _options; - const std::shared_ptr<DependentTypeFactoryInterface> _factory; + Options _options; + + std::shared_ptr<ControllerInterface> _controller; // The global mutex for specific pool access and the generation counter mutable stdx::mutex _mutex; @@ -321,6 +398,71 @@ private: }; /** + * An implementation of ControllerInterface directs the behavior of a SpecificPool + * + * Generally speaking, a Controller will be given HostState via updateState and then return Controls + * via getControls. A Controller is expected to not directly mutate its SpecificPool, including via + * its ConnectionPool pointer. A Controller is expected to be given to only one ConnectionPool. + */ +class ConnectionPool::ControllerInterface { +public: + using SpecificPool = typename ConnectionPool::SpecificPool; + using HostState = typename ConnectionPool::HostState; + using ConnectionControls = typename ConnectionPool::ConnectionControls; + using HostGroup = typename ConnectionPool::HostGroup; + + virtual ~ControllerInterface() = default; + + /** + * Initialize this ControllerInterface using the given ConnectionPool + * + * ConnectionPools provide access to Executors and other DTF-provided objects. + */ + virtual void init(ConnectionPool* parent); + + /** + * Inform this Controller of a new State for a pool/host + * + * This function returns information about all hosts tied to this one. This function is also + * expected to handle never-before-seen pools. + */ + virtual HostGroup updateHost(const SpecificPool* pool, + const HostAndPort& host, + const HostState& stats) = 0; + + /** + * Inform this Controller that a pool is no longer tracked + */ + virtual void removeHost(const SpecificPool* pool) = 0; + + /** + * Get controls for the given pool + */ + virtual ConnectionControls getControls(const SpecificPool* pool) = 0; + + /** + * Get the various timeouts that this controller suggests + */ + virtual Milliseconds hostTimeout() const = 0; + virtual Milliseconds pendingTimeout() const = 0; + virtual Milliseconds toRefreshTimeout() const = 0; + + /** + * Get the name for this controller + * + * This function is intended to provide increased visibility into which controller is in use + */ + virtual StringData name() const = 0; + + const ConnectionPool* getPool() const { + return _pool; + } + +protected: + ConnectionPool* _pool = nullptr; +}; + +/** * Implementation interface for the connection pool * * This factory provides generators for connections, timers and a clock for the diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 6a7aaf77606..00a42bcc683 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -104,6 +104,7 @@ env.Library( target='sharding_initialization', source=[ 'sharding_initialization.cpp', + 'sharding_task_executor_pool_controller.cpp', env.Idlc('sharding_task_executor_pool.idl')[0], 'client/sharding_connection_hook.cpp', 'client/sharding_network_connection_hook.cpp', @@ -118,6 +119,7 @@ env.Library( LIBDEPS_PRIVATE=[ "$BUILD_DIR/mongo/idl/server_parameter", '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + '$BUILD_DIR/mongo/executor/connection_pool_executor', 'coreshard', 'sharding_task_executor', ], diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 6982c7a4687..0b542f920dc 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -64,6 +64,7 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/sharding_task_executor.h" +#include "mongo/s/sharding_task_executor_pool_controller.h" #include "mongo/s/sharding_task_executor_pool_gen.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/thread_pool.h" @@ -171,53 +172,11 @@ Status initializeGlobalShardingState(OperationContext* opCtx, return {ErrorCodes::BadValue, "Unrecognized connection string."}; } - // We don't set the ConnectionPool's static const variables to be the default value in - // MONGO_EXPORT_STARTUP_SERVER_PARAMETER because it's not guaranteed to be initialized. - // The following code is a workaround. ConnectionPool::Options connPoolOptions; + connPoolOptions.controller = std::make_shared<ShardingTaskExecutorPoolController>(); - connPoolOptions.minConnections = gShardingTaskExecutorPoolMinConnections; - connPoolOptions.maxConnections = (gShardingTaskExecutorPoolMaxConnections >= 0) - ? gShardingTaskExecutorPoolMaxConnections - : ConnectionPool::kDefaultMaxConns; - connPoolOptions.maxConnecting = (gShardingTaskExecutorPoolMaxConnecting >= 0) - ? gShardingTaskExecutorPoolMaxConnecting - : ConnectionPool::kDefaultMaxConnecting; - - connPoolOptions.hostTimeout = Milliseconds(gShardingTaskExecutorPoolHostTimeoutMS); - connPoolOptions.refreshRequirement = - Milliseconds(gShardingTaskExecutorPoolRefreshRequirementMS); - connPoolOptions.refreshTimeout = Milliseconds(gShardingTaskExecutorPoolRefreshTimeoutMS); - - if (connPoolOptions.refreshRequirement <= connPoolOptions.refreshTimeout) { - auto newRefreshTimeout = connPoolOptions.refreshRequirement - Milliseconds(1); - warning() << "ShardingTaskExecutorPoolRefreshRequirementMS (" - << connPoolOptions.refreshRequirement - << ") set below ShardingTaskExecutorPoolRefreshTimeoutMS (" - << connPoolOptions.refreshTimeout - << "). Adjusting ShardingTaskExecutorPoolRefreshTimeoutMS to " - << newRefreshTimeout; - connPoolOptions.refreshTimeout = newRefreshTimeout; - } - - if (connPoolOptions.hostTimeout <= - connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout) { - auto newHostTimeout = - connPoolOptions.refreshRequirement + connPoolOptions.refreshTimeout + Milliseconds(1); - warning() << "ShardingTaskExecutorPoolHostTimeoutMS (" << connPoolOptions.hostTimeout - << ") set below ShardingTaskExecutorPoolRefreshRequirementMS (" - << connPoolOptions.refreshRequirement - << ") + ShardingTaskExecutorPoolRefreshTimeoutMS (" - << connPoolOptions.refreshTimeout - << "). Adjusting ShardingTaskExecutorPoolHostTimeoutMS to " << newHostTimeout; - connPoolOptions.hostTimeout = newHostTimeout; - } - - auto network = - executor::makeNetworkInterface("ShardRegistry", - stdx::make_unique<ShardingNetworkConnectionHook>(), - hookBuilder(), - connPoolOptions); + auto network = executor::makeNetworkInterface( + "ShardRegistry", std::make_unique<ShardingNetworkConnectionHook>(), hookBuilder()); auto networkPtr = network.get(); auto executorPool = makeShardingTaskExecutorPool( std::move(network), hookBuilder, connPoolOptions, taskExecutorPoolSize); diff --git a/src/mongo/s/sharding_task_executor_pool.idl b/src/mongo/s/sharding_task_executor_pool.idl index 390dbc5a17b..dbced832b1b 100644 --- a/src/mongo/s/sharding_task_executor_pool.idl +++ b/src/mongo/s/sharding_task_executor_pool.idl @@ -28,49 +28,66 @@ global: cpp_namespace: "mongo" + cpp_includes: + - "mongo/s/sharding_task_executor_pool_controller.h" server_parameters: ShardingTaskExecutorPoolMinSize: description: <- The minimum number of connections for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolMinConnections" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.minConnections" + validator: + gte: 0 default: 1 ShardingTaskExecutorPoolMaxSize: description: <- The maximum number of connections for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolMaxConnections" - default: -1 + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnections" + validator: + gte: 1 + default: 32767 ShardingTaskExecutorPoolMaxConnecting: description: <- The maximum number of in-flight connections for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolMaxConnecting" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnecting" + validator: + gte: 1 default: 2 ShardingTaskExecutorPoolHostTimeoutMS: description: <- The timeout for dropping a host for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolHostTimeoutMS" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.hostTimeoutMS" + validator: + callback: "ShardingTaskExecutorPoolController::validateHostTimeout" + gte: 1 default: 300000 # 5mins ShardingTaskExecutorPoolRefreshRequirementMS: description: <- The timeout before a connection needs to be refreshed for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolRefreshRequirementMS" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.toRefreshTimeoutMS" + validator: + gte: 1 default: 60000 # 1min ShardingTaskExecutorPoolRefreshTimeoutMS: description: <- The timeout for refreshing a connection for each executor in the pool for the sharding grid. - set_at: [ startup ] - cpp_vartype: "int" - cpp_varname: "gShardingTaskExecutorPoolRefreshTimeoutMS" + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.pendingTimeoutMS" + validator: + callback: "ShardingTaskExecutorPoolController::validatePendingTimeout" + gte: 1 default: 20000 # 20secs + ShardingTaskExecutorPoolReplicaSetMatching: + description: <- + Enables ReplicaSet member connection matching. + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.matchingStrategyString" + on_update: "ShardingTaskExecutorPoolController::onUpdateMatchingStrategy" + default: "disabled" diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp new file mode 100644 index 00000000000..77d694e2f16 --- /dev/null +++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp @@ -0,0 +1,249 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kConnectionPool + +#include "mongo/platform/basic.h" + +#include "mongo/client/replica_set_monitor.h" +#include "mongo/s/sharding_task_executor_pool_controller.h" +#include "mongo/util/log.h" + +namespace mongo { + +Status ShardingTaskExecutorPoolController::validateHostTimeout(const int& hostTimeoutMS) { + auto toRefreshTimeoutMS = gParameters.toRefreshTimeoutMS.load(); + auto pendingTimeoutMS = gParameters.pendingTimeoutMS.load(); + if (hostTimeoutMS >= (toRefreshTimeoutMS + pendingTimeoutMS)) { + return Status::OK(); + } + + std::string msg = str::stream() + << "ShardingTaskExecutorPoolHostTimeoutMS (" << hostTimeoutMS + << ") set below ShardingTaskExecutorPoolRefreshRequirementMS (" << toRefreshTimeoutMS + << ") + ShardingTaskExecutorPoolRefreshTimeoutMS (" << pendingTimeoutMS << ")."; + return Status(ErrorCodes::BadValue, msg); +} + +Status ShardingTaskExecutorPoolController::validatePendingTimeout(const int& pendingTimeoutMS) { + auto toRefreshTimeoutMS = gParameters.toRefreshTimeoutMS.load(); + if (pendingTimeoutMS < toRefreshTimeoutMS) { + return Status::OK(); + } + + std::string msg = str::stream() + << "ShardingTaskExecutorPoolRefreshRequirementMS (" << toRefreshTimeoutMS + << ") set below ShardingTaskExecutorPoolRefreshTimeoutMS (" << pendingTimeoutMS << ")."; + return Status(ErrorCodes::BadValue, msg); +} + +Status ShardingTaskExecutorPoolController::onUpdateMatchingStrategy(const std::string& str) { + // TODO Fix up after SERVER-40224 + if (str == "disabled") { + gParameters.matchingStrategy.store(MatchingStrategy::kDisabled); + } else if (str == "matchPrimaryNode") { + gParameters.matchingStrategy.store(MatchingStrategy::kMatchPrimaryNode); + // TODO Reactive once the prediction pattern is fixed in SERVER-41602 + //} else if (str == "matchBusiestNode") { + // gParameters.matchingStrategy.store(MatchingStrategy::kMatchBusiestNode); + } else { + return Status{ErrorCodes::BadValue, + str::stream() << "Unrecognized matching strategy '" << str << "'"}; + } + + return Status::OK(); +} + +void ShardingTaskExecutorPoolController::_addGroup(WithLock, + const ReplicaSetChangeNotifier::State& state) { + // Replace the last group + auto& group = _hostGroups[state.connStr.getSetName()]; + group = std::make_shared<HostGroupData>(); + group->state = state; + + // Mark each host with this group + for (auto& host : state.connStr.getServers()) { + _hostGroupsByHost[host] = group; + } +} + +void ShardingTaskExecutorPoolController::_removeGroup(WithLock, const std::string& name) { + auto it = _hostGroups.find(name); + if (it == _hostGroups.end()) { + return; + } + + auto& hostGroup = it->second; + for (auto& host : hostGroup->state.connStr.getServers()) { + _hostGroupsByHost.erase(host); + } + _hostGroups.erase(it); +} + +class ShardingTaskExecutorPoolController::ReplicaSetChangeListener final + : public ReplicaSetChangeNotifier::Listener { +public: + explicit ReplicaSetChangeListener(ShardingTaskExecutorPoolController* controller) + : _controller(controller) {} + + void onFoundSet(const Key& key) override { + // Do nothing + } + + void onConfirmedSet(const State& state) override { + stdx::lock_guard lk(_controller->_mutex); + + _controller->_removeGroup(lk, state.connStr.getSetName()); + _controller->_addGroup(lk, state); + } + + void onPossibleSet(const State& state) override { + // Do nothing + } + + void onDroppedSet(const Key& key) override { + stdx::lock_guard lk(_controller->_mutex); + + _controller->_removeGroup(lk, key); + } + +private: + ShardingTaskExecutorPoolController* const _controller; +}; + +void ShardingTaskExecutorPoolController::init(ConnectionPool* parent) { + ControllerInterface::init(parent); + _listener = ReplicaSetMonitor::getNotifier().makeListener<ReplicaSetChangeListener>(this); +} + +auto ShardingTaskExecutorPoolController::updateHost(const SpecificPool* pool, + const HostAndPort& host, + const HostState& stats) -> HostGroup { + stdx::lock_guard lk(_mutex); + + auto& data = _poolData[pool]; + + const size_t minConns = gParameters.minConnections.load(); + const size_t maxConns = gParameters.maxConnections.load(); + + // Update the target for just the pool first + data.target = stats.requests + stats.active; + + if (data.target < minConns) { + data.target = minConns; + } else if (data.target > maxConns) { + data.target = maxConns; + } + + data.isAbleToShutdown = stats.health.isExpired; + + // If the pool isn't in a group, we can return now + auto it = _hostGroupsByHost.find(host); + if (it == _hostGroupsByHost.end()) { + return {{host}, data.isAbleToShutdown}; + } + + // If the pool has a group, then update the group + auto& hostGroup = it->second; + data.hostGroup = hostGroup; + + // Make sure we're part of the group + hostGroup->pools.insert(pool); + + switch (gParameters.matchingStrategy.load()) { + case MatchingStrategy::kMatchPrimaryNode: { + if (hostGroup->state.primary == host) { + hostGroup->target = data.target; + } + } break; + case MatchingStrategy::kMatchBusiestNode: { + hostGroup->target = std::max(hostGroup->target, data.target); + } break; + case MatchingStrategy::kDisabled: { + // Nothing + } break; + }; + + if (hostGroup->target < minConns) { + hostGroup->target = minConns; + } else if (hostGroup->target > maxConns) { + hostGroup->target = maxConns; + } + + auto shouldShutdown = data.isAbleToShutdown && + std::all_of(hostGroup->pools.begin(), hostGroup->pools.end(), [&](auto otherPool) { + return _poolData[otherPool].isAbleToShutdown; + }); + return {hostGroup->state.connStr.getServers(), shouldShutdown}; +} + +void ShardingTaskExecutorPoolController::removeHost(const SpecificPool* pool) { + stdx::lock_guard lk(_mutex); + auto it = _poolData.find(pool); + if (it == _poolData.end()) { + // It's possible that a host immediately needs to go before it updates even once + return; + } + + auto& data = it->second; + if (auto hostGroup = data.hostGroup.lock()) { + hostGroup->pools.erase(pool); + } + _poolData.erase(it); +} + +auto ShardingTaskExecutorPoolController::getControls(const SpecificPool* pool) + -> ConnectionControls { + stdx::lock_guard lk(_mutex); + auto& data = _poolData[pool]; + + const size_t maxPending = gParameters.maxConnecting.load(); + + auto hostGroup = data.hostGroup.lock(); + if (!hostGroup || gParameters.matchingStrategy.load() == MatchingStrategy::kDisabled) { + return {maxPending, data.target}; + } + + auto target = std::max(data.target, hostGroup->target); + return {maxPending, target}; +} + +Milliseconds ShardingTaskExecutorPoolController::hostTimeout() const { + return Milliseconds{gParameters.hostTimeoutMS.load()}; +} + +Milliseconds ShardingTaskExecutorPoolController::pendingTimeout() const { + return Milliseconds{gParameters.pendingTimeoutMS.load()}; +} + +Milliseconds ShardingTaskExecutorPoolController::toRefreshTimeout() const { + return Milliseconds{gParameters.toRefreshTimeoutMS.load()}; +} + +} // namespace mongo diff --git a/src/mongo/s/sharding_task_executor_pool_controller.h b/src/mongo/s/sharding_task_executor_pool_controller.h new file mode 100644 index 00000000000..d61f707de73 --- /dev/null +++ b/src/mongo/s/sharding_task_executor_pool_controller.h @@ -0,0 +1,186 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" +#include "mongo/client/replica_set_change_notifier.h" +#include "mongo/executor/connection_pool.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/unordered_map.h" + +namespace mongo { + +/** + * A special Controller for the sharding ConnectionPool + * + * This class has two special members: + * * A global set of synchronized Parameters for the ShardingTaskExecutorPool server parameters + * * A ReplicaSetChangeListener to inform it of changes to replica set membership + * + * When the MatchingStrategy from its Parameters is kDisabled, this class operates much like the + * LimitController but with its limits allowed to shift at runtime (via Parameters). + * + * When the MatchingStrategy is kMatchPrimaryNode, the limits are obeyed but, when the pool for a + * primary member calls updateHost, it can increase the targetConnections for the pool of each other + * member of its replica set. Note that this will, at time of writing, follow the "hosts" field + * from the primary isMaster combined with the seed list for the replica set. If the seed list were + * to include arbiters or hidden members, then they would also be subject to these constraints. + * + * When the MatchingStrategy is kMatchBusiestNode, it operates like kMatchPrimaryNode, but any pool + * can be responsible for increasing the targetConnections of each member of its set. + * + * Note that, in essence, there are three outside elements that can mutate the state of this class: + * * The ReplicaSetChangeNotifier can notify the listener which updates the host groups + * * The ServerParameters can update the Parameters which will used in the next update + * * The SpecificPools for its ConnectionPool can updateHost with their individual States + */ +class ShardingTaskExecutorPoolController final + : public executor::ConnectionPool::ControllerInterface { + class ReplicaSetChangeListener; + +public: + using ConnectionPool = executor::ConnectionPool; + + enum class MatchingStrategy { + kDisabled, + kMatchPrimaryNode, + kMatchBusiestNode, + }; + + class Parameters { + public: + AtomicWord<int> minConnections; + AtomicWord<int> maxConnections; + AtomicWord<int> maxConnecting; + + AtomicWord<int> hostTimeoutMS; + AtomicWord<int> pendingTimeoutMS; + AtomicWord<int> toRefreshTimeoutMS; + + synchronized_value<std::string> matchingStrategyString; + AtomicWord<MatchingStrategy> matchingStrategy; + }; + + static inline Parameters gParameters; + + /** + * Validate that hostTimeoutMS is greater than the sum of pendingTimeoutMS and + * toRefreshTimeoutMS + */ + static Status validateHostTimeout(const int& hostTimeoutMS); + + /** + * Validate that pendingTimeoutMS is less than toRefreshTimeoutMS + */ + static Status validatePendingTimeout(const int& pendingTimeoutMS); + + /** + * Matches the matching strategy string against a set of literals + * and either sets gParameters.matchingStrategy or returns !Status::isOK(). + */ + static Status onUpdateMatchingStrategy(const std::string& str); + + ShardingTaskExecutorPoolController() = default; + ShardingTaskExecutorPoolController& operator=(ShardingTaskExecutorPoolController&&) = delete; + + void init(ConnectionPool* parent) override; + + HostGroup updateHost(const SpecificPool* pool, + const HostAndPort& host, + const HostState& stats) override; + void removeHost(const SpecificPool* pool) override; + + ConnectionControls getControls(const SpecificPool* pool) override; + + Milliseconds hostTimeout() const override; + Milliseconds pendingTimeout() const override; + Milliseconds toRefreshTimeout() const override; + + StringData name() const override { + return "ShardingTaskExecutorPoolController"_sd; + } + +private: + void _addGroup(WithLock, const ReplicaSetChangeNotifier::State& state); + void _removeGroup(WithLock, const std::string& key); + + /** + * HostGroup is a shared state for a set of hosts (a replica set). + * + * When the ReplicaSetChangeListener is informed of a change to a replica set, + * it creates a new HostGroup and fills it into _hostGroups[setName] and + * _hostGroupsByHost[memberHost]. This does not immediately affect the results of getControls. + * + * When a SpecificPool calls updateHost, it checks _hostGroupsByHost to see if it belongs to + * any group and pushes itself into hostData for that group. It then will update target for its + * group according to the MatchingStrategy. It will also set shouldShutdown to true if every + * member of the group has shouldShutdown at true. + * + * Note that a HostData can find itself orphaned from its HostGroup during a reconfig. + */ + struct HostGroupData { + // The ReplicaSet state for this set + ReplicaSetChangeNotifier::State state; + + // Pointer index for each pool in the set + stdx::unordered_set<const SpecificPool*> pools; + + // The number of connections that all hosts in the group should maintain + size_t target = 0; + }; + + /** + * HostData represents the current state for a specific HostAndPort/SpecificPool. + * + * It is mutated by updateHost/removeHost and used along with Parameters to form Controls + * for getControls. + */ + struct HostData { + // The HostGroup associated with this pool. + // Note that this will be invalid if there was a replica set change + std::weak_ptr<HostGroupData> hostGroup; + + // The number of connections the host should maintain + size_t target = 0; + + // This host is able to shutdown + bool isAbleToShutdown = false; + }; + + ReplicaSetChangeListenerHandle _listener; + + stdx::mutex _mutex; + + stdx::unordered_map<const SpecificPool*, HostData> _poolData; + stdx::unordered_map<std::string, std::shared_ptr<HostGroupData>> _hostGroups; + stdx::unordered_map<HostAndPort, std::shared_ptr<HostGroupData>> _hostGroupsByHost; +}; +} // namespace mongo |