summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/conn_pool_csrs.js126
-rw-r--r--src/mongo/s/client/shard_registry.cpp2
-rw-r--r--src/mongo/s/client/shard_registry.h2
-rw-r--r--src/mongo/s/grid.cpp2
-rw-r--r--src/mongo/s/grid.h4
-rw-r--r--src/mongo/s/sharding_initialization.cpp8
-rw-r--r--src/mongo/s/sharding_task_executor_pool.idl19
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.cpp32
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.h14
9 files changed, 198 insertions, 11 deletions
diff --git a/jstests/noPassthrough/conn_pool_csrs.js b/jstests/noPassthrough/conn_pool_csrs.js
new file mode 100644
index 00000000000..f517a6c75d2
--- /dev/null
+++ b/jstests/noPassthrough/conn_pool_csrs.js
@@ -0,0 +1,126 @@
+load("jstests/libs/parallelTester.js");
+
+/**
+ * @tags: [requires_replication, requires_sharding]
+ *
+ * Test that servers can be configured with distinct size limits for
+ * connection pools that are connected to a config server.
+ *
+ * Heavily modified from jstest/noPassthrough/set_step_params.js
+ */
+
+(function() {
+"use strict";
+
+const poolSizeLogId = 6265600;
+
+const rsMin = 10;
+const rsMax = 20;
+
+const baselineParameters = {
+ ShardingTaskExecutorPoolMinSize: rsMin,
+ ShardingTaskExecutorPoolMaxSize: rsMax,
+ ShardingTaskExecutorPoolMinSizeForConfigServers: 4,
+ ShardingTaskExecutorPoolMaxSizeForConfigServers: 6,
+};
+
+const mongosParameters = Object.assign(
+ {logComponentVerbosity: tojson({network: {connectionPool: 5}})}, baselineParameters);
+
+const st = new ShardingTest({
+ config: {nodes: 1},
+ shards: 1,
+ rs0: {nodes: 1},
+ mongos: [{setParameter: mongosParameters}],
+});
+const mongos = st.s0;
+const configServer = st.c0;
+const mongod = st.rs0.getPrimary();
+
+const adminCmd = req => assert.commandWorked(mongos.adminCommand(req));
+
+const populateTestDb = () => {
+ const db = mongos.getDB('test');
+ const coll = db.test;
+ assert.commandWorked(coll.insert({x: 1}));
+ assert.commandWorked(coll.insert({x: 2}));
+ assert.commandWorked(coll.insert({x: 3}));
+};
+
+const setCSPoolBounds = (min, max) => {
+ adminCmd(Object.assign({"setParameter": 1}, {
+ ShardingTaskExecutorPoolMinSizeForConfigServers: min,
+ ShardingTaskExecutorPoolMaxSizeForConfigServers: max,
+ }));
+};
+
+// Make mongos open a connection to the config server. This is done by issuing a
+// query that mongos will connect to the config server to execute.
+const connectConfigServer = () => {
+ jsTestLog('Cause Config Server connection');
+ assert.commandWorked(mongos.getDB('config').runCommand(
+ {find: "databases", limit: 1, "$readPreference": {mode: 'primary'}}));
+};
+
+// The (only) mongod has a 'test' database.
+const connectMongodServer = () => {
+ jsTestLog('Cause Mongod Server connection');
+ assert.commandWorked(mongos.getDB('test').runCommand(
+ {find: "test", limit: 1, "$readPreference": {mode: 'primary'}}));
+};
+
+// Waits until mongos emits a log line indicating that the conn pool
+// `targetHost` is being resized. Returns all such lines.
+const awaitUpdateHost = targetHost => {
+ let hits;
+ assert.soon(() => {
+ let log = checkLog.getGlobalLog(st.s);
+ // jsTestLog(`Fetched log: """${log}"""`);
+ // Emulate parsing an equivalent LOGV2 message, extracting attrs.
+ const re = /Update connection pool: host=(\S*), minConns=(\d*), maxConns=(\d*)/;
+ hits = checkLog.getGlobalLog(mongos)
+ .map(line => line.match(re))
+ .filter(m => m)
+ .map(m => ({
+ attr: {
+ host: m[1],
+ minConns: parseInt(m[2]),
+ maxConns: parseInt(m[3]),
+ },
+ }))
+ .filter(o => o.attr.host == targetHost);
+ return hits.length > 0;
+ }, `log lines for target ${targetHost}`, 10 * 1000, 1 * 1000, {runHangAnalyzer: false});
+ return hits;
+};
+
+populateTestDb();
+
+// Try a few {min,max} pairs.
+for (const [min, max] of [[4, 6], [10, 20], [2, 4], [-1, -1]]) {
+ jsTestLog(`Try ConfigServer pool bounds [${min},${max}]`);
+ setCSPoolBounds(min, max);
+
+ adminCmd({dropConnections: 1, hostAndPort: [configServer.host]});
+ adminCmd({clearLog: 'global'});
+ connectConfigServer();
+ for (let o of awaitUpdateHost(configServer.host)) {
+ const cascade = (x, fallback) => x >= 0 ? x : fallback;
+ assert.eq(o.attr.minConns, cascade(min, rsMin));
+ assert.eq(o.attr.maxConns, cascade(max, rsMax));
+ }
+
+ // Make sure the setting doesn't affect non-ConfigServer pools.
+ adminCmd({dropConnections: 1, hostAndPort: [mongod.host]});
+ adminCmd({clearLog: 'global'});
+ connectMongodServer();
+ for (let o of awaitUpdateHost(mongod.host)) {
+ assert.eq(o.attr.minConns, rsMin);
+ assert.eq(o.attr.maxConns, rsMax);
+ }
+
+ adminCmd(Object.assign({"setParameter": 1}, baselineParameters));
+}
+
+st.stop();
+})();
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 6d25d012860..e4e157506f6 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -149,7 +149,7 @@ shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) {
return _data.findByShardId(shardId);
}
-shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) {
+shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) const {
return _data.findByHostAndPort(host);
}
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index e755e2cff0c..14693e6a0c6 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -224,7 +224,7 @@ public:
* Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not
* refresh the shard registry or otherwise perform any network traffic.
*/
- std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost);
+ std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost) const;
/**
* Returns shared pointer to the shard object representing the config servers.
diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp
index bda2bc6e929..73a30a4a6b8 100644
--- a/src/mongo/s/grid.cpp
+++ b/src/mongo/s/grid.cpp
@@ -62,7 +62,7 @@ Grid* Grid::get(OperationContext* operationContext) {
void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient,
std::unique_ptr<CatalogCache> catalogCache,
- std::unique_ptr<ShardRegistry> shardRegistry,
+ std::shared_ptr<ShardRegistry> shardRegistry,
std::unique_ptr<ClusterCursorManager> cursorManager,
std::unique_ptr<BalancerConfiguration> balancerConfig,
std::unique_ptr<executor::TaskExecutorPool> executorPool,
diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h
index 2564b781734..6ea99db0a0c 100644
--- a/src/mongo/s/grid.h
+++ b/src/mongo/s/grid.h
@@ -76,7 +76,7 @@ public:
*/
void init(std::unique_ptr<ShardingCatalogClient> catalogClient,
std::unique_ptr<CatalogCache> catalogCache,
- std::unique_ptr<ShardRegistry> shardRegistry,
+ std::shared_ptr<ShardRegistry> shardRegistry,
std::unique_ptr<ClusterCursorManager> cursorManager,
std::unique_ptr<BalancerConfiguration> balancerConfig,
std::unique_ptr<executor::TaskExecutorPool> executorPool,
@@ -175,7 +175,7 @@ public:
private:
std::unique_ptr<ShardingCatalogClient> _catalogClient;
std::unique_ptr<CatalogCache> _catalogCache;
- std::unique_ptr<ShardRegistry> _shardRegistry;
+ std::shared_ptr<ShardRegistry> _shardRegistry;
std::unique_ptr<ClusterCursorManager> _cursorManager;
std::unique_ptr<BalancerConfiguration> _balancerConfig;
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index e9add81153c..b4aa839396d 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -194,8 +194,10 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
rpc::ShardingEgressMetadataHookBuilder hookBuilder,
boost::optional<size_t> taskExecutorPoolSize) {
ConnectionPool::Options connPoolOptions;
- connPoolOptions.controllerFactory = []() noexcept {
- return std::make_shared<ShardingTaskExecutorPoolController>();
+ std::shared_ptr<ShardRegistry> srsp(std::move(shardRegistry));
+ std::weak_ptr<ShardRegistry> srwp{srsp};
+ connPoolOptions.controllerFactory = [srwp] {
+ return std::make_shared<ShardingTaskExecutorPoolController>(srwp);
};
auto network = executor::makeNetworkInterface(
@@ -213,7 +215,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
grid->init(makeCatalogClient(service, distLockProcessId),
std::move(catalogCache),
- std::move(shardRegistry),
+ std::move(srsp),
stdx::make_unique<ClusterCursorManager>(service->getPreciseClockSource()),
stdx::make_unique<BalancerConfiguration>(),
std::move(executorPool),
diff --git a/src/mongo/s/sharding_task_executor_pool.idl b/src/mongo/s/sharding_task_executor_pool.idl
index 361ad6606ff..b635eae936d 100644
--- a/src/mongo/s/sharding_task_executor_pool.idl
+++ b/src/mongo/s/sharding_task_executor_pool.idl
@@ -91,3 +91,22 @@ server_parameters:
cpp_varname: "ShardingTaskExecutorPoolController::gParameters.matchingStrategyString"
on_update: "ShardingTaskExecutorPoolController::onUpdateMatchingStrategy"
default: "automatic" # matchPrimaryNode on mongos; disabled on mongod
+
+ ShardingTaskExecutorPoolMinSizeForConfigServers:
+ description: <-
+ Overrides ShardingTaskExecutorPoolMinSize for pools targeting config servers.
+ Has no effect if set to -1 (the default).
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.minConnectionsForConfigServers"
+ validator:
+ gte: -1
+ default: -1
+ ShardingTaskExecutorPoolMaxSizeForConfigServers:
+ description: <-
+ Overrides ShardingTaskExecutorPoolMaxSize for pools targeting config servers.
+ Has no effect if set to -1 (the default).
+ set_at: [ startup, runtime ]
+ cpp_varname: "ShardingTaskExecutorPoolController::gParameters.maxConnectionsForConfigServers"
+ validator:
+ gte: -1
+ default: -1
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp
index 8853f87b7df..f0cd378335d 100644
--- a/src/mongo/s/sharding_task_executor_pool_controller.cpp
+++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp
@@ -33,6 +33,7 @@
#include "mongo/client/replica_set_monitor.h"
#include "mongo/executor/connection_pool_stats.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/is_mongos.h"
#include "mongo/s/sharding_task_executor_pool_controller.h"
#include "mongo/util/log.h"
@@ -55,6 +56,15 @@ void emplaceOrInvariant(Map&& map, Args&&... args) noexcept {
invariant(ret.second, "Element already existed in map/set");
}
+bool isConfigServer(const ShardRegistry* sr, const HostAndPort& peer) {
+ if (!sr)
+ return false;
+ auto shard = sr->getShardForHostNoReload(peer);
+ if (!shard)
+ return false;
+ return shard->isConfig();
+}
+
} // namespace
Status ShardingTaskExecutorPoolController::validateHostTimeout(const int& hostTimeoutMS) {
@@ -198,6 +208,7 @@ void ShardingTaskExecutorPoolController::addHost(PoolId id, const HostAndPort& h
PoolData poolData;
poolData.host = host;
+ poolData.isConfigServer = isConfigServer(_shardRegistry.lock().get(), host);
// Set up the GroupAndId
auto& groupAndId = _groupAndIds[host];
@@ -221,8 +232,25 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState&
auto& poolData = getOrInvariant(_poolDatas, id);
- const size_t minConns = gParameters.minConnections.load();
- const size_t maxConns = gParameters.maxConnections.load();
+ size_t minConns, maxConns;
+ std::tie(minConns, maxConns) = [&] {
+ size_t lo = gParameters.minConnections.load();
+ size_t hi = gParameters.maxConnections.load();
+ if (poolData.isConfigServer) {
+ auto maybeOverride = [](size_t& t, int val) {
+ if (val >= 0)
+ t = val;
+ };
+ maybeOverride(lo, gParameters.minConnectionsForConfigServers.load());
+ maybeOverride(hi, gParameters.maxConnectionsForConfigServers.load());
+ }
+ return std::tuple(lo, hi);
+ }();
+ // conn_pool_csrs.js looks for this message in the log.
+ MONGO_LOG(5) << "Update connection pool" //
+ << ": host=" << poolData.host //
+ << ", minConns=" << minConns //
+ << ", maxConns=" << maxConns;
// Update the target for just the pool first
poolData.target = stats.requests + stats.active;
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.h b/src/mongo/s/sharding_task_executor_pool_controller.h
index 4cee10bd9f6..9ef7c6bfd6c 100644
--- a/src/mongo/s/sharding_task_executor_pool_controller.h
+++ b/src/mongo/s/sharding_task_executor_pool_controller.h
@@ -40,6 +40,8 @@
namespace mongo {
+class ShardRegistry;
+
/**
* A special Controller for the sharding ConnectionPool
*
@@ -102,6 +104,9 @@ public:
synchronized_value<std::string> matchingStrategyString;
AtomicWord<MatchingStrategy> matchingStrategy;
+
+ AtomicWord<int> minConnectionsForConfigServers;
+ AtomicWord<int> maxConnectionsForConfigServers;
};
static inline Parameters gParameters;
@@ -123,7 +128,8 @@ public:
*/
static Status onUpdateMatchingStrategy(const std::string& str);
- ShardingTaskExecutorPoolController() = default;
+ explicit ShardingTaskExecutorPoolController(std::weak_ptr<ShardRegistry> shardRegistry)
+ : _shardRegistry(std::move(shardRegistry)) {}
ShardingTaskExecutorPoolController& operator=(ShardingTaskExecutorPoolController&&) = delete;
void init(ConnectionPool* parent) override;
@@ -185,6 +191,9 @@ private:
// The host associated with this pool
HostAndPort host;
+ // A pool connected to a config server gets special treatment
+ bool isConfigServer = false;
+
// The GroupData associated with this pool.
// Note that this will be invalid if there was a replica set change
std::weak_ptr<GroupData> groupData;
@@ -208,6 +217,9 @@ private:
boost::optional<PoolId> maybeId;
};
+ /** Needed by isConfigServer */
+ std::weak_ptr<ShardRegistry> const _shardRegistry;
+
std::shared_ptr<ReplicaSetChangeNotifier::Listener> _listener;
Mutex _mutex = MONGO_MAKE_LATCH("ShardingTaskExecutorPoolController::_mutex");