From 14476629f4af6e786550bf842277282d6cad31eb Mon Sep 17 00:00:00 2001 From: Billy Donahue Date: Tue, 29 Mar 2022 01:01:28 +0000 Subject: SERVER-62656 optional minmax for CSRS pools (cherry picked from commit b0b3890bdfc47bfd5a3cbc6c3f4d9b53c87441c4) --- jstests/noPassthrough/conn_pool_csrs.js | 119 +++++++++++++++++++++ src/mongo/s/client/shard_registry.cpp | 2 +- src/mongo/s/client/shard_registry.h | 2 +- src/mongo/s/grid.cpp | 2 +- src/mongo/s/grid.h | 4 +- src/mongo/s/sharding_initialization.cpp | 7 +- src/mongo/s/sharding_task_executor_pool.idl | 19 ++++ .../s/sharding_task_executor_pool_controller.cpp | 34 +++++- .../s/sharding_task_executor_pool_controller.h | 14 ++- 9 files changed, 192 insertions(+), 11 deletions(-) create mode 100644 jstests/noPassthrough/conn_pool_csrs.js diff --git a/jstests/noPassthrough/conn_pool_csrs.js b/jstests/noPassthrough/conn_pool_csrs.js new file mode 100644 index 00000000000..2f6f7aa32de --- /dev/null +++ b/jstests/noPassthrough/conn_pool_csrs.js @@ -0,0 +1,119 @@ +load("jstests/libs/parallelTester.js"); + +/** + * @tags: [requires_replication, requires_sharding] + * + * Test that servers can be configured with distinct size limits for + * connection pools that are connected to a config server. + * + * Heavily modified from jstest/noPassthrough/set_step_params.js + */ + +(function() { +"use strict"; + +const poolSizeLogId = 6265600; + +const rsMin = 10; +const rsMax = 20; + +const baselineParameters = { + ShardingTaskExecutorPoolMinSize: rsMin, + ShardingTaskExecutorPoolMaxSize: rsMax, + ShardingTaskExecutorPoolMinSizeForConfigServers: 4, + ShardingTaskExecutorPoolMaxSizeForConfigServers: 6, +}; + +const mongosParameters = Object.assign( + {logComponentVerbosity: tojson({network: {connectionPool: 5}})}, baselineParameters); + +const st = new ShardingTest({ + config: {nodes: 1}, + shards: 1, + rs0: {nodes: 1}, + mongos: [{setParameter: mongosParameters}], +}); +const mongos = st.s0; +const configServer = st.c0; +const mongod = st.rs0.getPrimary(); + +const adminCmd = req => assert.commandWorked(mongos.adminCommand(req)); + +const populateTestDb = () => { + const db = mongos.getDB('test'); + const coll = db.test; + assert.commandWorked(coll.insert({x: 1})); + assert.commandWorked(coll.insert({x: 2})); + assert.commandWorked(coll.insert({x: 3})); +}; + +const setCSPoolBounds = (min, max) => { + adminCmd(Object.assign({"setParameter": 1}, { + ShardingTaskExecutorPoolMinSizeForConfigServers: min, + ShardingTaskExecutorPoolMaxSizeForConfigServers: max, + })); +}; + +// Make mongos open a connection to the config server. This is done by issuing a +// query that mongos will connect to the config server to execute. +const connectConfigServer = () => { + jsTestLog('Cause Config Server connection'); + assert.commandWorked(mongos.getDB('config').runCommand( + {find: "databases", limit: 1, "$readPreference": {mode: 'primary'}})); +}; + +// The (only) mongod has a 'test' database. +const connectMongodServer = () => { + jsTestLog('Cause Mongod Server connection'); + assert.commandWorked(mongos.getDB('test').runCommand( + {find: "test", limit: 1, "$readPreference": {mode: 'primary'}})); +}; + +// Waits until mongos emits a log line indicating that the conn pool +// `targetHost` is being resized. Returns all such lines. +const awaitUpdateHost = targetHost => { + let hits; + assert.soon(() => { + let log = checkLog.getGlobalLog(st.s); + // jsTestLog(`Fetched log: """${log}"""`); + hits = checkLog.getGlobalLog(mongos) + .map(line => JSON.parse(line)) + .filter(o => o.id == poolSizeLogId) + .filter(o => o.attr.host == targetHost); + return hits.length > 0; + }, `log lines id:${poolSizeLogId} for target ${targetHost}`, 10 * 1000, 1 * 1000, { + runHangAnalyzer: false + }); + return hits; +}; + +populateTestDb(); + +// Try a few {min,max} pairs. +for (const [min, max] of [[4, 6], [10, 20], [2, 4], [-1, -1]]) { + jsTestLog(`Try ConfigServer pool bounds [${min},${max}]`); + setCSPoolBounds(min, max); + + adminCmd({dropConnections: 1, hostAndPort: [configServer.host]}); + adminCmd({clearLog: 'global'}); + connectConfigServer(); + for (let o of awaitUpdateHost(configServer.host)) { + const cascade = (x, fallback) => x >= 0 ? x : fallback; + assert.eq(o.attr.minConns, cascade(min, rsMin)); + assert.eq(o.attr.maxConns, cascade(max, rsMax)); + } + + // Make sure the setting doesn't affect non-ConfigServer pools. + adminCmd({dropConnections: 1, hostAndPort: [mongod.host]}); + adminCmd({clearLog: 'global'}); + connectMongodServer(); + for (let o of awaitUpdateHost(mongod.host)) { + assert.eq(o.attr.minConns, rsMin); + assert.eq(o.attr.maxConns, rsMax); + } + + adminCmd(Object.assign({"setParameter": 1}, baselineParameters)); +} + +st.stop(); +})(); diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 5d18387f63f..51c10ac451c 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -151,7 +151,7 @@ shared_ptr ShardRegistry::getShardNoReload(const ShardId& shardId) { return _data.findShard(shardId); } -shared_ptr ShardRegistry::getShardForHostNoReload(const HostAndPort& host) { +shared_ptr ShardRegistry::getShardForHostNoReload(const HostAndPort& host) const { return _data.findByHostAndPort(host); } diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 5ed94c5d143..1ad563e34c7 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -254,7 +254,7 @@ public: * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not * refresh the shard registry or otherwise perform any network traffic. */ - std::shared_ptr getShardForHostNoReload(const HostAndPort& shardHost); + std::shared_ptr getShardForHostNoReload(const HostAndPort& shardHost) const; /** * Returns shared pointer to the shard object representing the config servers. diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index ad83cfc0a1c..8d43e8856e3 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -62,7 +62,7 @@ Grid* Grid::get(OperationContext* operationContext) { void Grid::init(std::unique_ptr catalogClient, std::unique_ptr catalogCache, - std::unique_ptr shardRegistry, + std::shared_ptr shardRegistry, std::unique_ptr cursorManager, std::unique_ptr balancerConfig, std::unique_ptr executorPool, diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 0bd14b9c5cf..32abf601abe 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -77,7 +77,7 @@ public: */ void init(std::unique_ptr catalogClient, std::unique_ptr catalogCache, - std::unique_ptr shardRegistry, + std::shared_ptr shardRegistry, std::unique_ptr cursorManager, std::unique_ptr balancerConfig, std::unique_ptr executorPool, @@ -176,7 +176,7 @@ public: private: std::unique_ptr _catalogClient; std::unique_ptr _catalogCache; - std::unique_ptr _shardRegistry; + std::shared_ptr _shardRegistry; std::unique_ptr _cursorManager; std::unique_ptr _balancerConfig; diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index ffc7da9a0e9..d61614e29aa 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -192,8 +192,9 @@ Status initializeGlobalShardingState(OperationContext* opCtx, rpc::ShardingEgressMetadataHookBuilder hookBuilder, boost::optional taskExecutorPoolSize) { ConnectionPool::Options connPoolOptions; - connPoolOptions.controllerFactory = []() noexcept { - return std::make_shared(); + std::shared_ptr srsp(std::move(shardRegistry)); + connPoolOptions.controllerFactory = [srwp = std::weak_ptr(srsp)] { + return std::make_shared(srwp); }; auto network = executor::makeNetworkInterface( @@ -211,7 +212,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, grid->init(makeCatalogClient(service, distLockProcessId), std::move(catalogCache), - std::move(shardRegistry), + std::move(srsp), std::make_unique(service->getPreciseClockSource()), std::make_unique(), std::move(executorPool), diff --git a/src/mongo/s/sharding_task_executor_pool.idl b/src/mongo/s/sharding_task_executor_pool.idl index 361ad6606ff..b635eae936d 100644 --- a/src/mongo/s/sharding_task_executor_pool.idl +++ b/src/mongo/s/sharding_task_executor_pool.idl @@ -91,3 +91,22 @@ server_parameters: cpp_varname: "ShardingTaskExecutorPoolController::gParameters.matchingStrategyString" on_update: "ShardingTaskExecutorPoolController::onUpdateMatchingStrategy" default: "automatic" # matchPrimaryNode on mongos; disabled on mongod + + ShardingTaskExecutorPoolMinSizeForConfigServers: + description: <- + Overrides ShardingTaskExecutorPoolMinSize for pools targeting config servers. + Has no effect if set to -1 (the default). + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.minConnectionsForConfigServers" + validator: + gte: -1 + default: -1 + ShardingTaskExecutorPoolMaxSizeForConfigServers: + description: <- + Overrides ShardingTaskExecutorPoolMaxSize for pools targeting config servers. + Has no effect if set to -1 (the default). + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnectionsForConfigServers" + validator: + gte: -1 + default: -1 diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp index b4d7b50a171..581682bd317 100644 --- a/src/mongo/s/sharding_task_executor_pool_controller.cpp +++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp @@ -33,6 +33,8 @@ #include "mongo/client/replica_set_monitor.h" #include "mongo/executor/connection_pool_stats.h" +#include "mongo/logv2/log.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/is_mongos.h" #include "mongo/s/sharding_task_executor_pool_controller.h" @@ -54,6 +56,15 @@ void emplaceOrInvariant(Map&& map, Args&&... args) noexcept { invariant(ret.second, "Element already existed in map/set"); } +bool isConfigServer(const ShardRegistry* sr, const HostAndPort& peer) { + if (!sr) + return false; + auto shard = sr->getShardForHostNoReload(peer); + if (!shard) + return false; + return shard->isConfig(); +} + } // namespace Status ShardingTaskExecutorPoolController::validateHostTimeout(const int& hostTimeoutMS) { @@ -197,6 +208,7 @@ void ShardingTaskExecutorPoolController::addHost(PoolId id, const HostAndPort& h PoolData poolData; poolData.host = host; + poolData.isConfigServer = isConfigServer(_shardRegistry.lock().get(), host); // Set up the GroupAndId auto& groupAndId = _groupAndIds[host]; @@ -220,8 +232,26 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState& auto& poolData = getOrInvariant(_poolDatas, id); - const size_t minConns = gParameters.minConnections.load(); - const size_t maxConns = gParameters.maxConnections.load(); + const auto [minConns, maxConns] = [&] { + size_t lo = gParameters.minConnections.load(); + size_t hi = gParameters.maxConnections.load(); + if (poolData.isConfigServer) { + auto maybeOverride = [](size_t& t, int val) { + if (val >= 0) + t = val; + }; + maybeOverride(lo, gParameters.minConnectionsForConfigServers.load()); + maybeOverride(hi, gParameters.maxConnectionsForConfigServers.load()); + } + return std::tuple(lo, hi); + }(); + // conn_pool_csrs.js looks for this message in the log. + LOGV2_DEBUG(6265600, + 5, + "Update connection pool", + "host"_attr = poolData.host, + "minConns"_attr = minConns, + "maxConns"_attr = maxConns); // Update the target for just the pool first poolData.target = stats.requests + stats.active; diff --git a/src/mongo/s/sharding_task_executor_pool_controller.h b/src/mongo/s/sharding_task_executor_pool_controller.h index 4cee10bd9f6..9ef7c6bfd6c 100644 --- a/src/mongo/s/sharding_task_executor_pool_controller.h +++ b/src/mongo/s/sharding_task_executor_pool_controller.h @@ -40,6 +40,8 @@ namespace mongo { +class ShardRegistry; + /** * A special Controller for the sharding ConnectionPool * @@ -102,6 +104,9 @@ public: synchronized_value matchingStrategyString; AtomicWord matchingStrategy; + + AtomicWord minConnectionsForConfigServers; + AtomicWord maxConnectionsForConfigServers; }; static inline Parameters gParameters; @@ -123,7 +128,8 @@ public: */ static Status onUpdateMatchingStrategy(const std::string& str); - ShardingTaskExecutorPoolController() = default; + explicit ShardingTaskExecutorPoolController(std::weak_ptr shardRegistry) + : _shardRegistry(std::move(shardRegistry)) {} ShardingTaskExecutorPoolController& operator=(ShardingTaskExecutorPoolController&&) = delete; void init(ConnectionPool* parent) override; @@ -185,6 +191,9 @@ private: // The host associated with this pool HostAndPort host; + // A pool connected to a config server gets special treatment + bool isConfigServer = false; + // The GroupData associated with this pool. // Note that this will be invalid if there was a replica set change std::weak_ptr groupData; @@ -208,6 +217,9 @@ private: boost::optional maybeId; }; + /** Needed by isConfigServer */ + std::weak_ptr const _shardRegistry; + std::shared_ptr _listener; Mutex _mutex = MONGO_MAKE_LATCH("ShardingTaskExecutorPoolController::_mutex"); -- cgit v1.2.1