summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/executor/connection_pool.cpp11
-rw-r--r--src/mongo/executor/connection_pool.h8
-rw-r--r--src/mongo/executor/connection_pool_test.cpp196
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp34
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.h2
-rw-r--r--src/mongo/s/sharding_initialization.cpp4
6 files changed, 249 insertions, 6 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 7de24928f8f..cf8aadd4d3a 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -184,6 +184,7 @@ private:
constexpr Milliseconds ConnectionPool::kDefaultHostTimeout;
size_t const ConnectionPool::kDefaultMaxConns = std::numeric_limits<size_t>::max();
size_t const ConnectionPool::kDefaultMinConns = 1;
+size_t const ConnectionPool::kDefaultMaxConnecting = std::numeric_limits<size_t>::max();
constexpr Milliseconds ConnectionPool::kDefaultRefreshRequirement;
constexpr Milliseconds ConnectionPool::kDefaultRefreshTimeout;
@@ -370,8 +371,10 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
auto conn = takeFromProcessingPool(connPtr);
// If the host and port were dropped, let this lapse
- if (conn->getGeneration() != _generation)
+ if (conn->getGeneration() != _generation) {
+ spawnConnections(lk);
return;
+ }
// If we're in shutdown, we don't need refreshed connections
if (_state == State::kInShutdown)
@@ -381,6 +384,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
// pool
if (status.isOK()) {
addToReady(lk, std::move(conn));
+ spawnConnections(lk);
return;
}
@@ -560,7 +564,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
};
// While all of our inflight connections are less than our target
- while (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) {
+ while ((_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) &&
+ (_processingPool.size() < _parent->_options.maxConnecting)) {
std::unique_ptr<ConnectionPool::ConnectionInterface> handle;
try {
// make a new connection and put it in processing
@@ -588,8 +593,10 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
if (conn->getGeneration() != _generation) {
// If the host and port was dropped, let the
// connection lapse
+ spawnConnections(lk);
} else if (status.isOK()) {
addToReady(lk, std::move(conn));
+ spawnConnections(lk);
} else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
// If we've exceeded the time limit, restart the connect, rather than
// failing all operations. We do this because the various callers
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index b644c0ae148..b3f3839d7b7 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -71,6 +71,7 @@ public:
static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000); // 5mins
static const size_t kDefaultMaxConns;
static const size_t kDefaultMinConns;
+ static const size_t kDefaultMaxConnecting;
static constexpr Milliseconds kDefaultRefreshRequirement = Milliseconds(60000); // 1min
static constexpr Milliseconds kDefaultRefreshTimeout = Milliseconds(20000); // 20secs
@@ -93,6 +94,13 @@ public:
size_t maxConnections = kDefaultMaxConns;
/**
+ * The maximum number of processing connections for a host. This includes pending
+ * connections in setup/refresh. It's designed to rate limit connection storms rather than
+ * steady state processing (as maxConnections does).
+ */
+ size_t maxConnecting = kDefaultMaxConnecting;
+
+ /**
* Amount of time to wait before timing out a refresh attempt
*/
Milliseconds refreshTimeout = kDefaultRefreshTimeout;
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 6a857602ca6..2f968bd196f 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -438,6 +438,202 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) {
}
/**
+ * Verify that we respect maxConnecting
+ */
+TEST_F(ConnectionPoolTest, maxConnectingRespected) {
+ ConnectionPool::Options options;
+ options.minConnections = 1;
+ options.maxConnecting = 2;
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+
+ ConnectionPool::ConnectionHandle conn1;
+ ConnectionPool::ConnectionHandle conn2;
+ ConnectionPool::ConnectionHandle conn3;
+
+ // Make 3 requests, each which keep their connection (don't return it to
+ // the pool)
+ pool.get(HostAndPort(),
+ Milliseconds(3000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn3 = std::move(swConn.getValue());
+ });
+ pool.get(HostAndPort(),
+ Milliseconds(2000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn2 = std::move(swConn.getValue());
+ });
+ pool.get(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn1 = std::move(swConn.getValue());
+ });
+
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
+ ConnectionImpl::pushSetup(Status::OK());
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
+ ConnectionImpl::pushSetup(Status::OK());
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u);
+ ConnectionImpl::pushSetup(Status::OK());
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
+
+ ASSERT(conn1);
+ ASSERT(conn2);
+ ASSERT(conn3);
+
+ ASSERT_NE(conn1.get(), conn2.get());
+ ASSERT_NE(conn2.get(), conn3.get());
+ ASSERT_NE(conn1.get(), conn3.get());
+
+ doneWith(conn1);
+ doneWith(conn2);
+ doneWith(conn3);
+}
+
+/**
+ * Verify that refresh callbacks block new connections, then trigger new connection spawns after
+ * they return
+ */
+TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) {
+ ConnectionPool::Options options;
+ options.maxConnecting = 1;
+ options.refreshRequirement = Milliseconds(1000);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+
+ auto now = Date_t::now();
+
+ PoolImpl::setNow(now);
+
+ // Get a connection
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ });
+
+ ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u);
+
+ // After 1 second, one refresh has queued
+ PoolImpl::setNow(now + Milliseconds(1000));
+ ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 1u);
+
+ bool reachedA = false;
+
+ // Try to get another connection
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ reachedA = true;
+ });
+
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
+ ASSERT(!reachedA);
+ ConnectionImpl::pushRefresh(Status::OK());
+ ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u);
+ ASSERT(reachedA);
+}
+
+/**
+ * Verify that refreshes block new connects, but don't themselves respect maxConnecting
+ */
+TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) {
+ ConnectionPool::Options options;
+ options.maxConnecting = 2;
+ options.minConnections = 3;
+ options.refreshRequirement = Milliseconds(1000);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+
+ auto now = Date_t::now();
+
+ PoolImpl::setNow(now);
+
+ // Get us spun up to 3 connections in the pool
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ });
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
+ ConnectionImpl::pushSetup(Status::OK());
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
+ ConnectionImpl::pushSetup(Status::OK());
+ ConnectionImpl::pushSetup(Status::OK());
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
+
+ // Force more than two connections into refresh
+ PoolImpl::setNow(now + Milliseconds(1500));
+ ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 3u);
+
+ std::array<ConnectionPool::ConnectionHandle, 5> conns;
+
+ // Start 5 new requests
+ for (size_t i = 0; i < conns.size(); ++i) {
+ pool.get(HostAndPort(),
+ Milliseconds(static_cast<int>(1000 + i)),
+ [&conns, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ conns[i] = std::move(swConn.getValue());
+ });
+ }
+
+ auto firstNBound = [&](size_t n) {
+ for (size_t i = 0; i < n; ++i) {
+ ASSERT(conns[i]);
+ }
+ for (size_t i = n; i < conns.size(); ++i) {
+ ASSERT_FALSE(conns[i]);
+ }
+ };
+
+ // None have started connecting
+ ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 3u);
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
+ firstNBound(0);
+
+ // After one refresh, one refreshed connection gets handed out
+ ConnectionImpl::pushRefresh(Status::OK());
+ ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 2u);
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
+ firstNBound(1);
+
+ // After two refresh, one enters the setup queue, one refreshed connection gets handed out
+ ConnectionImpl::pushRefresh(Status::OK());
+ ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 1u);
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u);
+ firstNBound(2);
+
+ // After three refresh, we're done refreshing. Two queued in setup
+ ConnectionImpl::pushRefresh(Status::OK());
+ ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u);
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
+ firstNBound(3);
+
+ // now pushing setup gets us a new connection
+ ConnectionImpl::pushSetup(Status::OK());
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u);
+ firstNBound(4);
+
+ // and we're done
+ ConnectionImpl::pushSetup(Status::OK());
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
+ firstNBound(5);
+
+ for (auto& conn : conns) {
+ doneWith(conn);
+ }
+}
+
+/**
* Verify that minConnections is respected
*/
TEST_F(ConnectionPoolTest, minPoolRespected) {
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
index 8c193f39e06..a2e3215393f 100644
--- a/src/mongo/executor/connection_pool_test_fixture.cpp
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -116,9 +116,13 @@ void ConnectionImpl::pushSetup(PushSetupCallback status) {
_pushSetupQueue.push_back(status);
if (_setupQueue.size()) {
- _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()());
+ auto connPtr = _setupQueue.front();
+ auto callback = _pushSetupQueue.front();
_setupQueue.pop_front();
_pushSetupQueue.pop_front();
+
+ auto cb = connPtr->_setupCallback;
+ cb(connPtr, callback());
}
}
@@ -126,13 +130,22 @@ void ConnectionImpl::pushSetup(Status status) {
pushSetup([status]() { return status; });
}
+size_t ConnectionImpl::setupQueueDepth() {
+ return _setupQueue.size();
+}
+
void ConnectionImpl::pushRefresh(PushRefreshCallback status) {
_pushRefreshQueue.push_back(status);
if (_refreshQueue.size()) {
- _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()());
+ auto connPtr = _refreshQueue.front();
+ auto callback = _pushRefreshQueue.front();
+
_refreshQueue.pop_front();
_pushRefreshQueue.pop_front();
+
+ auto cb = connPtr->_refreshCallback;
+ cb(connPtr, callback());
}
}
@@ -140,6 +153,10 @@ void ConnectionImpl::pushRefresh(Status status) {
pushRefresh([status]() { return status; });
}
+size_t ConnectionImpl::refreshQueueDepth() {
+ return _refreshQueue.size();
+}
+
Date_t ConnectionImpl::getLastUsed() const {
return _lastUsed;
}
@@ -166,9 +183,13 @@ void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) {
_setupQueue.push_back(this);
if (_pushSetupQueue.size()) {
- _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()());
+ auto connPtr = _setupQueue.front();
+ auto callback = _pushSetupQueue.front();
_setupQueue.pop_front();
_pushSetupQueue.pop_front();
+
+ auto cb = connPtr->_setupCallback;
+ cb(connPtr, callback());
}
}
@@ -182,9 +203,14 @@ void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) {
_refreshQueue.push_back(this);
if (_pushRefreshQueue.size()) {
- _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()());
+ auto connPtr = _refreshQueue.front();
+ auto callback = _pushRefreshQueue.front();
+
_refreshQueue.pop_front();
_pushRefreshQueue.pop_front();
+
+ auto cb = connPtr->_refreshCallback;
+ cb(connPtr, callback());
}
}
diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h
index 74946ba5cf4..8509e03171c 100644
--- a/src/mongo/executor/connection_pool_test_fixture.h
+++ b/src/mongo/executor/connection_pool_test_fixture.h
@@ -95,10 +95,12 @@ public:
// Push either a callback that returns the status for a setup, or just the Status
static void pushSetup(PushSetupCallback status);
static void pushSetup(Status status);
+ static size_t setupQueueDepth();
// Push either a callback that returns the status for a refresh, or just the Status
static void pushRefresh(PushRefreshCallback status);
static void pushRefresh(Status status);
+ static size_t refreshQueueDepth();
private:
void indicateUsed() override;
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 47803747f3a..2bf84b02d18 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -74,6 +74,7 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolHostTimeoutMS,
int,
ConnectionPool::kDefaultHostTimeout.count());
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMaxSize, int, -1);
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMaxConnecting, int, -1);
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMinSize,
int,
static_cast<int>(ConnectionPool::kDefaultMinConns));
@@ -174,6 +175,9 @@ Status initializeGlobalShardingState(OperationContext* txn,
connPoolOptions.maxConnections = (ShardingTaskExecutorPoolMaxSize != -1)
? ShardingTaskExecutorPoolMaxSize
: ConnectionPool::kDefaultMaxConns;
+ connPoolOptions.maxConnecting = (ShardingTaskExecutorPoolMaxConnecting != -1)
+ ? ShardingTaskExecutorPoolMaxConnecting
+ : ConnectionPool::kDefaultMaxConnecting;
connPoolOptions.minConnections = ShardingTaskExecutorPoolMinSize;
connPoolOptions.refreshRequirement = Milliseconds(ShardingTaskExecutorPoolRefreshRequirementMS);
connPoolOptions.refreshTimeout = Milliseconds(ShardingTaskExecutorPoolRefreshTimeoutMS);