diff options
-rw-r--r-- | jstests/noPassthrough/conn_pool_csrs.js | 126 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 2 | ||||
-rw-r--r-- | src/mongo/s/grid.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/grid.h | 4 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool.idl | 19 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.cpp | 32 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.h | 14 |
9 files changed, 198 insertions, 11 deletions
diff --git a/jstests/noPassthrough/conn_pool_csrs.js b/jstests/noPassthrough/conn_pool_csrs.js new file mode 100644 index 00000000000..f517a6c75d2 --- /dev/null +++ b/jstests/noPassthrough/conn_pool_csrs.js @@ -0,0 +1,126 @@ +load("jstests/libs/parallelTester.js"); + +/** + * @tags: [requires_replication, requires_sharding] + * + * Test that servers can be configured with distinct size limits for + * connection pools that are connected to a config server. + * + * Heavily modified from jstest/noPassthrough/set_step_params.js + */ + +(function() { +"use strict"; + +const poolSizeLogId = 6265600; + +const rsMin = 10; +const rsMax = 20; + +const baselineParameters = { + ShardingTaskExecutorPoolMinSize: rsMin, + ShardingTaskExecutorPoolMaxSize: rsMax, + ShardingTaskExecutorPoolMinSizeForConfigServers: 4, + ShardingTaskExecutorPoolMaxSizeForConfigServers: 6, +}; + +const mongosParameters = Object.assign( + {logComponentVerbosity: tojson({network: {connectionPool: 5}})}, baselineParameters); + +const st = new ShardingTest({ + config: {nodes: 1}, + shards: 1, + rs0: {nodes: 1}, + mongos: [{setParameter: mongosParameters}], +}); +const mongos = st.s0; +const configServer = st.c0; +const mongod = st.rs0.getPrimary(); + +const adminCmd = req => assert.commandWorked(mongos.adminCommand(req)); + +const populateTestDb = () => { + const db = mongos.getDB('test'); + const coll = db.test; + assert.commandWorked(coll.insert({x: 1})); + assert.commandWorked(coll.insert({x: 2})); + assert.commandWorked(coll.insert({x: 3})); +}; + +const setCSPoolBounds = (min, max) => { + adminCmd(Object.assign({"setParameter": 1}, { + ShardingTaskExecutorPoolMinSizeForConfigServers: min, + ShardingTaskExecutorPoolMaxSizeForConfigServers: max, + })); +}; + +// Make mongos open a connection to the config server. This is done by issuing a +// query that mongos will connect to the config server to execute. +const connectConfigServer = () => { + jsTestLog('Cause Config Server connection'); + assert.commandWorked(mongos.getDB('config').runCommand( + {find: "databases", limit: 1, "$readPreference": {mode: 'primary'}})); +}; + +// The (only) mongod has a 'test' database. +const connectMongodServer = () => { + jsTestLog('Cause Mongod Server connection'); + assert.commandWorked(mongos.getDB('test').runCommand( + {find: "test", limit: 1, "$readPreference": {mode: 'primary'}})); +}; + +// Waits until mongos emits a log line indicating that the conn pool +// `targetHost` is being resized. Returns all such lines. +const awaitUpdateHost = targetHost => { + let hits; + assert.soon(() => { + let log = checkLog.getGlobalLog(st.s); + // jsTestLog(`Fetched log: """${log}"""`); + // Emulate parsing an equivalent LOGV2 message, extracting attrs. + const re = /Update connection pool: host=(\S*), minConns=(\d*), maxConns=(\d*)/; + hits = checkLog.getGlobalLog(mongos) + .map(line => line.match(re)) + .filter(m => m) + .map(m => ({ + attr: { + host: m[1], + minConns: parseInt(m[2]), + maxConns: parseInt(m[3]), + }, + })) + .filter(o => o.attr.host == targetHost); + return hits.length > 0; + }, `log lines for target ${targetHost}`, 10 * 1000, 1 * 1000, {runHangAnalyzer: false}); + return hits; +}; + +populateTestDb(); + +// Try a few {min,max} pairs. +for (const [min, max] of [[4, 6], [10, 20], [2, 4], [-1, -1]]) { + jsTestLog(`Try ConfigServer pool bounds [${min},${max}]`); + setCSPoolBounds(min, max); + + adminCmd({dropConnections: 1, hostAndPort: [configServer.host]}); + adminCmd({clearLog: 'global'}); + connectConfigServer(); + for (let o of awaitUpdateHost(configServer.host)) { + const cascade = (x, fallback) => x >= 0 ? x : fallback; + assert.eq(o.attr.minConns, cascade(min, rsMin)); + assert.eq(o.attr.maxConns, cascade(max, rsMax)); + } + + // Make sure the setting doesn't affect non-ConfigServer pools. + adminCmd({dropConnections: 1, hostAndPort: [mongod.host]}); + adminCmd({clearLog: 'global'}); + connectMongodServer(); + for (let o of awaitUpdateHost(mongod.host)) { + assert.eq(o.attr.minConns, rsMin); + assert.eq(o.attr.maxConns, rsMax); + } + + adminCmd(Object.assign({"setParameter": 1}, baselineParameters)); +} + +st.stop(); +})(); diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 6d25d012860..e4e157506f6 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -149,7 +149,7 @@ shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) { return _data.findByShardId(shardId); } -shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) { +shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) const { return _data.findByHostAndPort(host); } diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index e755e2cff0c..14693e6a0c6 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -224,7 +224,7 @@ public: * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not * refresh the shard registry or otherwise perform any network traffic. */ - std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost); + std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost) const; /** * Returns shared pointer to the shard object representing the config servers. diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index bda2bc6e929..73a30a4a6b8 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -62,7 +62,7 @@ Grid* Grid::get(OperationContext* operationContext) { void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient, std::unique_ptr<CatalogCache> catalogCache, - std::unique_ptr<ShardRegistry> shardRegistry, + std::shared_ptr<ShardRegistry> shardRegistry, std::unique_ptr<ClusterCursorManager> cursorManager, std::unique_ptr<BalancerConfiguration> balancerConfig, std::unique_ptr<executor::TaskExecutorPool> executorPool, diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 2564b781734..6ea99db0a0c 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -76,7 +76,7 @@ public: */ void init(std::unique_ptr<ShardingCatalogClient> catalogClient, std::unique_ptr<CatalogCache> catalogCache, - std::unique_ptr<ShardRegistry> shardRegistry, + std::shared_ptr<ShardRegistry> shardRegistry, std::unique_ptr<ClusterCursorManager> cursorManager, std::unique_ptr<BalancerConfiguration> balancerConfig, std::unique_ptr<executor::TaskExecutorPool> executorPool, @@ -175,7 +175,7 @@ public: private: std::unique_ptr<ShardingCatalogClient> _catalogClient; std::unique_ptr<CatalogCache> _catalogCache; - std::unique_ptr<ShardRegistry> _shardRegistry; + std::shared_ptr<ShardRegistry> _shardRegistry; std::unique_ptr<ClusterCursorManager> _cursorManager; std::unique_ptr<BalancerConfiguration> _balancerConfig; diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index e9add81153c..b4aa839396d 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -194,8 +194,10 @@ Status initializeGlobalShardingState(OperationContext* opCtx, rpc::ShardingEgressMetadataHookBuilder hookBuilder, boost::optional<size_t> taskExecutorPoolSize) { ConnectionPool::Options connPoolOptions; - connPoolOptions.controllerFactory = []() noexcept { - return std::make_shared<ShardingTaskExecutorPoolController>(); + std::shared_ptr<ShardRegistry> srsp(std::move(shardRegistry)); + std::weak_ptr<ShardRegistry> srwp{srsp}; + connPoolOptions.controllerFactory = [srwp] { + return std::make_shared<ShardingTaskExecutorPoolController>(srwp); }; auto network = executor::makeNetworkInterface( @@ -213,7 +215,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, grid->init(makeCatalogClient(service, distLockProcessId), std::move(catalogCache), - std::move(shardRegistry), + std::move(srsp), stdx::make_unique<ClusterCursorManager>(service->getPreciseClockSource()), stdx::make_unique<BalancerConfiguration>(), std::move(executorPool), diff --git a/src/mongo/s/sharding_task_executor_pool.idl b/src/mongo/s/sharding_task_executor_pool.idl index 361ad6606ff..b635eae936d 100644 --- a/src/mongo/s/sharding_task_executor_pool.idl +++ b/src/mongo/s/sharding_task_executor_pool.idl @@ -91,3 +91,22 @@ server_parameters: cpp_varname: "ShardingTaskExecutorPoolController::gParameters.matchingStrategyString" on_update: "ShardingTaskExecutorPoolController::onUpdateMatchingStrategy" default: "automatic" # matchPrimaryNode on mongos; disabled on mongod + + ShardingTaskExecutorPoolMinSizeForConfigServers: + description: <- + Overrides ShardingTaskExecutorPoolMinSize for pools targeting config servers. + Has no effect if set to -1 (the default). + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.minConnectionsForConfigServers" + validator: + gte: -1 + default: -1 + ShardingTaskExecutorPoolMaxSizeForConfigServers: + description: <- + Overrides ShardingTaskExecutorPoolMaxSize for pools targeting config servers. + Has no effect if set to -1 (the default). + set_at: [ startup, runtime ] + cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnectionsForConfigServers" + validator: + gte: -1 + default: -1 diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp index 8853f87b7df..f0cd378335d 100644 --- a/src/mongo/s/sharding_task_executor_pool_controller.cpp +++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp @@ -33,6 +33,7 @@ #include "mongo/client/replica_set_monitor.h" #include "mongo/executor/connection_pool_stats.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/is_mongos.h" #include "mongo/s/sharding_task_executor_pool_controller.h" #include "mongo/util/log.h" @@ -55,6 +56,15 @@ void emplaceOrInvariant(Map&& map, Args&&... args) noexcept { invariant(ret.second, "Element already existed in map/set"); } +bool isConfigServer(const ShardRegistry* sr, const HostAndPort& peer) { + if (!sr) + return false; + auto shard = sr->getShardForHostNoReload(peer); + if (!shard) + return false; + return shard->isConfig(); +} + } // namespace Status ShardingTaskExecutorPoolController::validateHostTimeout(const int& hostTimeoutMS) { @@ -198,6 +208,7 @@ void ShardingTaskExecutorPoolController::addHost(PoolId id, const HostAndPort& h PoolData poolData; poolData.host = host; + poolData.isConfigServer = isConfigServer(_shardRegistry.lock().get(), host); // Set up the GroupAndId auto& groupAndId = _groupAndIds[host]; @@ -221,8 +232,25 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState& auto& poolData = getOrInvariant(_poolDatas, id); - const size_t minConns = gParameters.minConnections.load(); - const size_t maxConns = gParameters.maxConnections.load(); + size_t minConns, maxConns; + std::tie(minConns, maxConns) = [&] { + size_t lo = gParameters.minConnections.load(); + size_t hi = gParameters.maxConnections.load(); + if (poolData.isConfigServer) { + auto maybeOverride = [](size_t& t, int val) { + if (val >= 0) + t = val; + }; + maybeOverride(lo, gParameters.minConnectionsForConfigServers.load()); + maybeOverride(hi, gParameters.maxConnectionsForConfigServers.load()); + } + return std::tuple(lo, hi); + }(); + // conn_pool_csrs.js looks for this message in the log. + MONGO_LOG(5) << "Update connection pool" // + << ": host=" << poolData.host // + << ", minConns=" << minConns // + << ", maxConns=" << maxConns; // Update the target for just the pool first poolData.target = stats.requests + stats.active; diff --git a/src/mongo/s/sharding_task_executor_pool_controller.h b/src/mongo/s/sharding_task_executor_pool_controller.h index 4cee10bd9f6..9ef7c6bfd6c 100644 --- a/src/mongo/s/sharding_task_executor_pool_controller.h +++ b/src/mongo/s/sharding_task_executor_pool_controller.h @@ -40,6 +40,8 @@ namespace mongo { +class ShardRegistry; + /** * A special Controller for the sharding ConnectionPool * @@ -102,6 +104,9 @@ public: synchronized_value<std::string> matchingStrategyString; AtomicWord<MatchingStrategy> matchingStrategy; + + AtomicWord<int> minConnectionsForConfigServers; + AtomicWord<int> maxConnectionsForConfigServers; }; static inline Parameters gParameters; @@ -123,7 +128,8 @@ public: */ static Status onUpdateMatchingStrategy(const std::string& str); - ShardingTaskExecutorPoolController() = default; + explicit ShardingTaskExecutorPoolController(std::weak_ptr<ShardRegistry> shardRegistry) + : _shardRegistry(std::move(shardRegistry)) {} ShardingTaskExecutorPoolController& operator=(ShardingTaskExecutorPoolController&&) = delete; void init(ConnectionPool* parent) override; @@ -185,6 +191,9 @@ private: // The host associated with this pool HostAndPort host; + // A pool connected to a config server gets special treatment + bool isConfigServer = false; + // The GroupData associated with this pool. // Note that this will be invalid if there was a replica set change std::weak_ptr<GroupData> groupData; @@ -208,6 +217,9 @@ private: boost::optional<PoolId> maybeId; }; + /** Needed by isConfigServer */ + std::weak_ptr<ShardRegistry> const _shardRegistry; + std::shared_ptr<ReplicaSetChangeNotifier::Listener> _listener; Mutex _mutex = MONGO_MAKE_LATCH("ShardingTaskExecutorPoolController::_mutex"); |