From f75fb8f5eee2b91599334681ab2b1ebcf20b491c Mon Sep 17 00:00:00 2001 From: Jason Carey Date: Tue, 16 May 2017 15:08:59 -0400 Subject: SERVER-29237 add maxConnecting in asio connpool A new connection pool option which controls that rate at which we add new connections. It changes behavior so that only N connections can be in the processing state at any one time (in setup/refresh). More connections will be added, if needed, as each new connection completes, fails or times out. available in mongos via ShardingTaskExecutorPoolMaxConnecting, defaults to unlimited. --- src/mongo/executor/connection_pool.cpp | 11 +- src/mongo/executor/connection_pool.h | 8 + src/mongo/executor/connection_pool_test.cpp | 196 +++++++++++++++++++++ .../executor/connection_pool_test_fixture.cpp | 34 +++- src/mongo/executor/connection_pool_test_fixture.h | 2 + src/mongo/s/sharding_initialization.cpp | 4 + 6 files changed, 249 insertions(+), 6 deletions(-) diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 78665f7f11e..475f3405fb6 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -189,6 +189,7 @@ private: constexpr Milliseconds ConnectionPool::kDefaultHostTimeout; size_t const ConnectionPool::kDefaultMaxConns = std::numeric_limits::max(); size_t const ConnectionPool::kDefaultMinConns = 1; +size_t const ConnectionPool::kDefaultMaxConnecting = std::numeric_limits::max(); constexpr Milliseconds ConnectionPool::kDefaultRefreshRequirement; constexpr Milliseconds ConnectionPool::kDefaultRefreshTimeout; @@ -376,8 +377,10 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr auto conn = takeFromProcessingPool(connPtr); // If the host and port were dropped, let this lapse - if (conn->getGeneration() != _generation) + if (conn->getGeneration() != _generation) { + spawnConnections(lk); return; + } // If we're in shutdown, we don't need refreshed connections if (_state == State::kInShutdown) @@ -387,6 +390,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // pool if (status.isOK()) { addToReady(lk, std::move(conn)); + spawnConnections(lk); return; } @@ -568,7 +572,8 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock_options.maxConnecting)) { std::unique_ptr handle; try { // make a new connection and put it in processing @@ -596,8 +601,10 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lockgetGeneration() != _generation) { // If the host and port was dropped, let the // connection lapse + spawnConnections(lk); } else if (status.isOK()) { addToReady(lk, std::move(conn)); + spawnConnections(lk); } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { // If we've exceeded the time limit, restart the connect, rather than // failing all operations. We do this because the various callers diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index b644c0ae148..b3f3839d7b7 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -71,6 +71,7 @@ public: static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000); // 5mins static const size_t kDefaultMaxConns; static const size_t kDefaultMinConns; + static const size_t kDefaultMaxConnecting; static constexpr Milliseconds kDefaultRefreshRequirement = Milliseconds(60000); // 1min static constexpr Milliseconds kDefaultRefreshTimeout = Milliseconds(20000); // 20secs @@ -92,6 +93,13 @@ public: */ size_t maxConnections = kDefaultMaxConns; + /** + * The maximum number of processing connections for a host. This includes pending + * connections in setup/refresh. It's designed to rate limit connection storms rather than + * steady state processing (as maxConnections does). + */ + size_t maxConnecting = kDefaultMaxConnecting; + /** * Amount of time to wait before timing out a refresh attempt */ diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 22d7f5d93f3..93233e9b966 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -611,6 +611,202 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { doneWith(conn3); } +/** + * Verify that we respect maxConnecting + */ +TEST_F(ConnectionPoolTest, maxConnectingRespected) { + ConnectionPool::Options options; + options.minConnections = 1; + options.maxConnecting = 2; + ConnectionPool pool(stdx::make_unique(), "test pool", options); + + ConnectionPool::ConnectionHandle conn1; + ConnectionPool::ConnectionHandle conn2; + ConnectionPool::ConnectionHandle conn3; + + // Make 3 requests, each which keep their connection (don't return it to + // the pool) + pool.get(HostAndPort(), + Milliseconds(3000), + [&](StatusWith swConn) { + ASSERT(swConn.isOK()); + + conn3 = std::move(swConn.getValue()); + }); + pool.get(HostAndPort(), + Milliseconds(2000), + [&](StatusWith swConn) { + ASSERT(swConn.isOK()); + + conn2 = std::move(swConn.getValue()); + }); + pool.get(HostAndPort(), + Milliseconds(1000), + [&](StatusWith swConn) { + ASSERT(swConn.isOK()); + + conn1 = std::move(swConn.getValue()); + }); + + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + + ASSERT(conn1); + ASSERT(conn2); + ASSERT(conn3); + + ASSERT_NE(conn1.get(), conn2.get()); + ASSERT_NE(conn2.get(), conn3.get()); + ASSERT_NE(conn1.get(), conn3.get()); + + doneWith(conn1); + doneWith(conn2); + doneWith(conn3); +} + +/** + * Verify that refresh callbacks block new connections, then trigger new connection spawns after + * they return + */ +TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) { + ConnectionPool::Options options; + options.maxConnecting = 1; + options.refreshRequirement = Milliseconds(1000); + ConnectionPool pool(stdx::make_unique(), "test pool", options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + // Get a connection + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); + + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); + + // After 1 second, one refresh has queued + PoolImpl::setNow(now + Milliseconds(1000)); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 1u); + + bool reachedA = false; + + // Try to get another connection + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + reachedA = true; + }); + + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + ASSERT(!reachedA); + ConnectionImpl::pushRefresh(Status::OK()); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); + ASSERT(reachedA); +} + +/** + * Verify that refreshes block new connects, but don't themselves respect maxConnecting + */ +TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) { + ConnectionPool::Options options; + options.maxConnecting = 2; + options.minConnections = 3; + options.refreshRequirement = Milliseconds(1000); + ConnectionPool pool(stdx::make_unique(), "test pool", options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + // Get us spun up to 3 connections in the pool + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ConnectionImpl::pushSetup(Status::OK()); + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + + // Force more than two connections into refresh + PoolImpl::setNow(now + Milliseconds(1500)); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 3u); + + std::array conns; + + // Start 5 new requests + for (size_t i = 0; i < conns.size(); ++i) { + pool.get(HostAndPort(), + Milliseconds(static_cast(1000 + i)), + [&conns, i](StatusWith swConn) { + ASSERT(swConn.isOK()); + conns[i] = std::move(swConn.getValue()); + }); + } + + auto firstNBound = [&](size_t n) { + for (size_t i = 0; i < n; ++i) { + ASSERT(conns[i]); + } + for (size_t i = n; i < conns.size(); ++i) { + ASSERT_FALSE(conns[i]); + } + }; + + // None have started connecting + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 3u); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + firstNBound(0); + + // After one refresh, one refreshed connection gets handed out + ConnectionImpl::pushRefresh(Status::OK()); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 2u); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + firstNBound(1); + + // After two refresh, one enters the setup queue, one refreshed connection gets handed out + ConnectionImpl::pushRefresh(Status::OK()); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 1u); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u); + firstNBound(2); + + // After three refresh, we're done refreshing. Two queued in setup + ConnectionImpl::pushRefresh(Status::OK()); + ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + firstNBound(3); + + // now pushing setup gets us a new connection + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u); + firstNBound(4); + + // and we're done + ConnectionImpl::pushSetup(Status::OK()); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + firstNBound(5); + + for (auto& conn : conns) { + doneWith(conn); + } +} + /** * Verify that minConnections is respected */ diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index 8c193f39e06..a2e3215393f 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -116,9 +116,13 @@ void ConnectionImpl::pushSetup(PushSetupCallback status) { _pushSetupQueue.push_back(status); if (_setupQueue.size()) { - _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()()); + auto connPtr = _setupQueue.front(); + auto callback = _pushSetupQueue.front(); _setupQueue.pop_front(); _pushSetupQueue.pop_front(); + + auto cb = connPtr->_setupCallback; + cb(connPtr, callback()); } } @@ -126,13 +130,22 @@ void ConnectionImpl::pushSetup(Status status) { pushSetup([status]() { return status; }); } +size_t ConnectionImpl::setupQueueDepth() { + return _setupQueue.size(); +} + void ConnectionImpl::pushRefresh(PushRefreshCallback status) { _pushRefreshQueue.push_back(status); if (_refreshQueue.size()) { - _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()()); + auto connPtr = _refreshQueue.front(); + auto callback = _pushRefreshQueue.front(); + _refreshQueue.pop_front(); _pushRefreshQueue.pop_front(); + + auto cb = connPtr->_refreshCallback; + cb(connPtr, callback()); } } @@ -140,6 +153,10 @@ void ConnectionImpl::pushRefresh(Status status) { pushRefresh([status]() { return status; }); } +size_t ConnectionImpl::refreshQueueDepth() { + return _refreshQueue.size(); +} + Date_t ConnectionImpl::getLastUsed() const { return _lastUsed; } @@ -166,9 +183,13 @@ void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) { _setupQueue.push_back(this); if (_pushSetupQueue.size()) { - _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()()); + auto connPtr = _setupQueue.front(); + auto callback = _pushSetupQueue.front(); _setupQueue.pop_front(); _pushSetupQueue.pop_front(); + + auto cb = connPtr->_setupCallback; + cb(connPtr, callback()); } } @@ -182,9 +203,14 @@ void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) { _refreshQueue.push_back(this); if (_pushRefreshQueue.size()) { - _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()()); + auto connPtr = _refreshQueue.front(); + auto callback = _pushRefreshQueue.front(); + _refreshQueue.pop_front(); _pushRefreshQueue.pop_front(); + + auto cb = connPtr->_refreshCallback; + cb(connPtr, callback()); } } diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index 74946ba5cf4..8509e03171c 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -95,10 +95,12 @@ public: // Push either a callback that returns the status for a setup, or just the Status static void pushSetup(PushSetupCallback status); static void pushSetup(Status status); + static size_t setupQueueDepth(); // Push either a callback that returns the status for a refresh, or just the Status static void pushRefresh(PushRefreshCallback status); static void pushRefresh(Status status); + static size_t refreshQueueDepth(); private: void indicateUsed() override; diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 68b5538aef8..d74350dca78 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -79,6 +79,7 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolHostTimeoutMS, int, ConnectionPool::kDefaultHostTimeout.count()); MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMaxSize, int, -1); +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMaxConnecting, int, -1); MONGO_EXPORT_STARTUP_SERVER_PARAMETER(ShardingTaskExecutorPoolMinSize, int, static_cast(ConnectionPool::kDefaultMinConns)); @@ -179,6 +180,9 @@ Status initializeGlobalShardingState(OperationContext* opCtx, connPoolOptions.maxConnections = (ShardingTaskExecutorPoolMaxSize != -1) ? ShardingTaskExecutorPoolMaxSize : ConnectionPool::kDefaultMaxConns; + connPoolOptions.maxConnecting = (ShardingTaskExecutorPoolMaxConnecting != -1) + ? ShardingTaskExecutorPoolMaxConnecting + : ConnectionPool::kDefaultMaxConnecting; connPoolOptions.minConnections = ShardingTaskExecutorPoolMinSize; connPoolOptions.refreshRequirement = Milliseconds(ShardingTaskExecutorPoolRefreshRequirementMS); connPoolOptions.refreshTimeout = Milliseconds(ShardingTaskExecutorPoolRefreshTimeoutMS); -- cgit v1.2.1