diff options
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 11 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 8 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 196 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.cpp | 34 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.h | 2 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 4 |
6 files changed, 249 insertions, 6 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 7de24928f8f..cf8aadd4d3a 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -184,6 +184,7 @@ private: constexpr Milliseconds ConnectionPool::kDefaultHostTimeout; size_t const ConnectionPool::kDefaultMaxConns = std::numeric_limits<size_t>::max(); size_t const ConnectionPool::kDefaultMinConns = 1; +size_t const ConnectionPool::kDefaultMaxConnecting = std::numeric_limits<size_t>::max(); constexpr Milliseconds ConnectionPool::kDefaultRefreshRequirement; constexpr Milliseconds ConnectionPool::kDefaultRefreshTimeout; @@ -370,8 +371,10 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr auto conn = takeFromProcessingPool(connPtr); // If the host and port were dropped, let this lapse - if (conn->getGeneration() != _generation) + if (conn->getGeneration() != _generation) { + spawnConnections(lk); return; + } // If we're in shutdown, we don't need refreshed connections if (_state == State::kInShutdown) @@ -381,6 +384,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // pool if (status.isOK()) { addToReady(lk, std::move(conn)); + spawnConnections(lk); return; } @@ -560,7 +564,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute }; // While all of our inflight connections are less than our target - while (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) { + while ((_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) && + (_processingPool.size() < _parent->_options.maxConnecting)) { std::unique_ptr<ConnectionPool::ConnectionInterface> handle; try { // make a new connection and put it in processing @@ -588,8 +593,10 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute if (conn->getGeneration() != _generation) { // If the host and port was dropped, let the // connection lapse + spawnConnections(lk); } else if (status.isOK()) { addToReady(lk, std::move(conn)); + spawnConnections(lk); } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { // If we've exceeded the time limit, restart the connect, rather than // failing all operations. We do this because the various callers diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index b644c0ae148..b3f3839d7b7 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -71,6 +71,7 @@ public: static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000); // 5mins static const size_t kDefaultMaxConns; static const size_t kDefaultMinConns; + static const size_t kDefaultMaxConnecting; static constexpr Milliseconds kDefaultRefreshRequirement = Milliseconds(60000); // 1min static constexpr Milliseconds kDefaultRefreshTimeout = Milliseconds(20000); // 20secs @@ -93,6 +94,13 @@ public: size_t maxConnections = kDefaultMaxConns; /** + * The maximum number of processing connections for a host. This includes pending + * connections in setup/refresh. It's designed to rate limit connection storms rather than + * steady state processing (as maxConnections does). + */ + size_t maxConnecting = kDefaultMaxConnecting; + + /** * Amount of time to wait before timing out a refresh attempt */ Milliseconds refreshTimeout = kDefaultRefreshTimeout; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 6a857602ca6..2f968bd196f 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -438,6 +438,202 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { } /** + * Verify that we respect maxConnecting + */ +TEST_F(ConnectionPoolTest, maxConnectingRespected) { + ConnectionPool::Options options; + options.minConnections = 1; + options.maxConnecting = 2; + ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + + ConnectionPool::ConnectionHandle conn1; + ConnectionPool::ConnectionHandle conn2; + ConnectionPool::ConnectionHandle conn3; + + // Make 3 requests, each which keep their connection (don't return it to + // the pool) + pool.get(HostAndPort(), + Milliseconds(3000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn3 = std::move(swConn.getValue()); + }); + pool.get(HostAndPort(), + Milliseconds(2000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn2 = std::move(swConn.getValue()); + }); + pool.get(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn1 = std::move(swConn.getValue()); + }); + + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + + ASSERT(conn1); + ASSERT(conn2); + ASSERT(conn3); + + ASSERT_NE(conn1.get(), conn2.get()); + ASSERT_NE(conn2.get(), conn3.get()); + ASSERT_NE(conn1.get(), conn3.get()); + + doneWith(conn1); + doneWith(conn2); + doneWith(conn3); +} + +/** + * Verify that refresh callbacks block new connections, then trigger new connection spawns after + * they return + */ +TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) { + ConnectionPool::Options options; + options.maxConnecting = 1; + options.refreshRequirement = Milliseconds(1000); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + // Get a connection + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); + + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); + + // After 1 second, one refresh has queued + PoolImpl::setNow(now + Milliseconds(1000)); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 1u); + + bool reachedA = false; + + // Try to get another connection + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + reachedA = true; + }); + + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + ASSERT(!reachedA); + ConnectionImpl::pushRefresh(Status::OK()); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); + ASSERT(reachedA); +} + +/** + * Verify that refreshes block new connects, but don't themselves respect maxConnecting + */ +TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) { + ConnectionPool::Options options; + options.maxConnecting = 2; + options.minConnections = 3; + options.refreshRequirement = Milliseconds(1000); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + // Get us spun up to 3 connections in the pool + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ConnectionImpl::pushSetup(Status::OK()); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + + // Force more than two connections into refresh + PoolImpl::setNow(now + Milliseconds(1500)); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 3u); + + std::array<ConnectionPool::ConnectionHandle, 5> conns; + + // Start 5 new requests + for (size_t i = 0; i < conns.size(); ++i) { + pool.get(HostAndPort(), + Milliseconds(static_cast<int>(1000 + i)), + [&conns, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + conns[i] = std::move(swConn.getValue()); + }); + } + + auto firstNBound = [&](size_t n) { + for (size_t i = 0; i < n; ++i) { + ASSERT(conns[i]); + } + for (size_t i = n; i < conns.size(); ++i) { + ASSERT_FALSE(conns[i]); + } + }; + + // None have started connecting + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 3u); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + firstNBound(0); + + // After one refresh, one refreshed connection gets handed out + ConnectionImpl::pushRefresh(Status::OK()); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 2u); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + firstNBound(1); + + // After two refresh, one enters the setup queue, one refreshed connection gets handed out + ConnectionImpl::pushRefresh(Status::OK()); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 1u); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u); + firstNBound(2); + + // After three refresh, we're done refreshing. Two queued in setup + ConnectionImpl::pushRefresh(Status::OK()); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + firstNBound(3); + + // now pushing setup gets us a new connection + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u); + firstNBound(4); + + // and we're done + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + firstNBound(5); + + for (auto& conn : conns) { + doneWith(conn); + } +} + +/** * Verify that minConnections is respected */ TEST_F(ConnectionPoolTest, minPoolRespected) { diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index 8c193f39e06..a2e3215393f 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -116,9 +116,13 @@ void ConnectionImpl::pushSetup(PushSetupCallback status) { _pushSetupQueue.push_back(status); if (_setupQueue.size()) { - _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()()); + auto connPtr = _setupQueue.front(); + auto callback = _pushSetupQueue.front(); _setupQueue.pop_front(); _pushSetupQueue.pop_front(); + + auto cb = connPtr->_setupCallback; + cb(connPtr, callback()); } } @@ -126,13 +130,22 @@ void ConnectionImpl::pushSetup(Status status) { pushSetup([status]() { return status; }); } +size_t ConnectionImpl::setupQueueDepth() { + return _setupQueue.size(); +} + void ConnectionImpl::pushRefresh(PushRefreshCallback status) { _pushRefreshQueue.push_back(status); if (_refreshQueue.size()) { - _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()()); + auto connPtr = _refreshQueue.front(); + auto callback = _pushRefreshQueue.front(); + _refreshQueue.pop_front(); _pushRefreshQueue.pop_front(); + + auto cb = connPtr->_refreshCallback; + cb(connPtr, callback()); } } @@ -140,6 +153,10 @@ void ConnectionImpl::pushRefresh(Status status) { pushRefresh([status]() { return status; }); } +size_t ConnectionImpl::refreshQueueDepth() { + return _refreshQueue.size(); +} + Date_t ConnectionImpl::getLastUsed() const { return _lastUsed; } @@ -166,9 +183,13 @@ void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) { _setupQueue.push_back(this); if (_pushSetupQueue.size()) { - _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()()); + auto connPtr = _setupQueue.front(); + auto callback = _pushSetupQueue.front(); _setupQueue.pop_front(); _pushSetupQueue.pop_front(); + + auto cb = connPtr->_setupCallback; + cb(connPtr, callback()); } } @@ -182,9 +203,14 @@ void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) { _refreshQueue.push_back(this); if (_pushRefreshQueue.size()) { - _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()()); + auto connPtr = _refreshQueue.front(); + auto callback = _pushRefreshQueue.front(); + _refreshQueue.pop_front(); _pushRefreshQueue.pop_front(); + + auto cb = connPtr->_refreshCallback; + cb(connPtr, callback()); } } diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index 74946ba5cf4..8509e03171c 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -95,10 +95,12 @@ public: // Push either a callback that returns the status for a setup, or just the Status static void pushSetup(PushSetupCallback status); static void pushSetup(Status status); + static size_t setupQueueDepth(); // Push either a callback that returns the status for a refresh, or just the Status static void pushRefresh(PushRefreshCallback status); static void pushRefresh(Status status); + static size_t refreshQueueDepth(); private: void indicateUsed() override; diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 47803747f3a..2bf84b02d18 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -74,6 +74,7 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolHostTimeoutMS, int, ConnectionPool::kDefaultHostTimeout.count()); MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMaxSize, int, -1); +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMaxConnecting, int, -1); MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMinSize, int, static_cast<int>(ConnectionPool::kDefaultMinConns)); @@ -174,6 +175,9 @@ Status initializeGlobalShardingState(OperationContext* txn, connPoolOptions.maxConnections = (ShardingTaskExecutorPoolMaxSize != -1) ? ShardingTaskExecutorPoolMaxSize : ConnectionPool::kDefaultMaxConns; + connPoolOptions.maxConnecting = (ShardingTaskExecutorPoolMaxConnecting != -1) + ? ShardingTaskExecutorPoolMaxConnecting + : ConnectionPool::kDefaultMaxConnecting; connPoolOptions.minConnections = ShardingTaskExecutorPoolMinSize; connPoolOptions.refreshRequirement = Milliseconds(ShardingTaskExecutorPoolRefreshRequirementMS); connPoolOptions.refreshTimeout = Milliseconds(ShardingTaskExecutorPoolRefreshTimeoutMS); |