diff options
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 82 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 778 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.h | 28 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.cpp | 10 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.h | 10 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 2 | ||||
-rw-r--r-- | src/mongo/util/out_of_line_executor.h | 3 |
10 files changed, 469 insertions, 456 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 8d934581ca2..ac43bbaed6f 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -36,7 +36,6 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/remote_command_request.h" -#include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" @@ -94,15 +93,8 @@ class ConnectionPool::SpecificPool final : public std::enable_shared_from_this<ConnectionPool::SpecificPool> { public: /** - * Whenever a function enters a specific pool, the function needs to be guarded. - * The presence of one of these guards will bump a counter on the specific pool - * which will prevent the pool from removing itself from the map of pools. - * - * The complexity comes from the need to hold a lock when writing to the - * _activeClients param on the specific pool. Because the code beneath the client needs to lock - * and unlock the parent mutex (and can leave unlocked), we want to start the client with the - * lock acquired, move it into the client, then re-acquire to decrement the counter on the way - * out. + * Whenever a function enters a specific pool, + * the function needs to be guarded by the pool lock. * * This callback also (perhaps overly aggressively) binds a shared pointer to the guard. * It is *always* safe to reference the original specific pool in the guarded function object. @@ -116,19 +108,12 @@ public: template <typename Callback> auto guardCallback(Callback&& cb) { return [ cb = std::forward<Callback>(cb), anchor = shared_from_this() ](auto&&... args) { - stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex); - ++(anchor->_activeClients); - - ON_BLOCK_EXIT([anchor]() { - stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex); - --(anchor->_activeClients); - }); - - return cb(std::move(lk), std::forward<decltype(args)>(args)...); + return cb(stdx::unique_lock(anchor->_parent->_mutex), + std::forward<decltype(args)>(args)...); }; } - SpecificPool(ConnectionPool* parent, + SpecificPool(std::shared_ptr<ConnectionPool> parent, const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode); ~SpecificPool(); @@ -247,7 +232,7 @@ private: void updateStateInLock(); private: - ConnectionPool* const _parent; + const std::shared_ptr<ConnectionPool> _parent; const transport::ConnectSSLMode _sslMode; const HostAndPort _hostAndPort; @@ -261,7 +246,6 @@ private: std::shared_ptr<TimerInterface> _requestTimer; Date_t _requestTimerExpiration; - size_t _activeClients; size_t _generation; bool _inFulfillRequests; bool _inSpawnConnections; @@ -408,7 +392,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& auto iter = _pools.find(hostAndPort); if (iter == _pools.end()) { - pool = stdx::make_unique<SpecificPool>(this, hostAndPort, sslMode); + pool = std::make_shared<SpecificPool>(shared_from_this(), hostAndPort, sslMode); _pools[hostAndPort] = pool; } else { pool = iter->second; @@ -445,33 +429,21 @@ size_t ConnectionPool::getNumConnectionsPerHost(const HostAndPort& hostAndPort) return 0; } -void ConnectionPool::returnConnection(ConnectionInterface* conn) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - auto iter = _pools.find(conn->getHostAndPort()); - - invariant(iter != _pools.end(), - str::stream() << "Tried to return connection but no pool found for " - << conn->getHostAndPort()); - - auto pool = iter->second; - pool->returnConnection(conn, std::move(lk)); -} - -ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, +ConnectionPool::SpecificPool::SpecificPool(std::shared_ptr<ConnectionPool> parent, const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode) - : _parent(parent), + : _parent(std::move(parent)), _sslMode(sslMode), _hostAndPort(hostAndPort), _readyPool(std::numeric_limits<size_t>::max()), - _requestTimer(parent->_factory->makeTimer()), - _activeClients(0), _generation(0), _inFulfillRequests(false), _inSpawnConnections(false), _created(0), - _state(State::kRunning) {} + _state(State::kRunning) { + invariant(_parent); + _requestTimer = _parent->_factory->makeTimer(); +} ConnectionPool::SpecificPool::~SpecificPool() { DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();) @@ -527,7 +499,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec updateStateInLock(); lk.unlock(); - _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { + _parent->_factory->getExecutor()->schedule(guardCallback([this](auto lk, auto schedStatus) { fassert(51169, schedStatus); spawnConnections(lk); @@ -615,7 +587,7 @@ void ConnectionPool::SpecificPool::finishRefresh(stdx::unique_lock<stdx::mutex> addToReady(lk, std::move(conn)); lk.unlock(); - _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { + _parent->_factory->getExecutor()->schedule(guardCallback([this](auto lk, auto schedStatus) { fassert(51170, schedStatus); fulfillRequests(lk); })); @@ -669,7 +641,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr addToReady(lk, std::move(conn)); lk.unlock(); - _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { + _parent->_factory->getExecutor()->schedule(guardCallback([this](auto lk, auto schedStatus) { fassert(51171, schedStatus); fulfillRequests(lk); })); @@ -886,7 +858,7 @@ ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::take void ConnectionPool::SpecificPool::updateStateInLock() { if (_state == State::kInShutdown) { // If we're in shutdown, there is nothing to update. Our clients are all gone. - if (_processingPool.empty() && !_activeClients) { + if (_processingPool.empty()) { // If we have no more clients that require access to us, delist from the parent pool LOG(2) << "Delisting connection pool for " << _hostAndPort; _parent->_pools.erase(_hostAndPort); @@ -958,16 +930,16 @@ void ConnectionPool::SpecificPool::updateStateInLock() { auto timeout = _parent->_options.hostTimeout; // Set the shutdown timer, this gets reset on any request - _requestTimer->setTimeout(timeout, [ this, anchor = shared_from_this() ]() { - stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex); - if (_state != State::kIdle) - return; - - triggerShutdown( - Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, - "Connection pool has been idle for longer than the host timeout"), - std::move(lk)); - }); + _requestTimer->setTimeout( + timeout, guardCallback([this](auto lk) { + if (_state != State::kIdle) + return; + + triggerShutdown( + Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, + "Connection pool has been idle for longer than the host timeout"), + std::move(lk)); + })); } } diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index f3bfb691ff5..5944e7a527e 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -62,7 +62,7 @@ struct ConnectionPoolStats; * The overall workflow here is to manage separate pools for each unique * HostAndPort. See comments on the various Options for how the pool operates. */ -class ConnectionPool : public EgressTagCloser { +class ConnectionPool : public EgressTagCloser, public std::enable_shared_from_this<ConnectionPool> { class SpecificPool; public: @@ -167,8 +167,6 @@ public: size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const; private: - void returnConnection(ConnectionInterface* connection); - std::string _name; // Options are set at startup and never changed at run time, so these are @@ -347,7 +345,7 @@ public: /** * Return the executor for use with this factory */ - virtual OutOfLineExecutor& getExecutor() = 0; + virtual const std::shared_ptr<OutOfLineExecutor>& getExecutor() = 0; /** * Makes a new timer diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index b36cf8cf749..0bb27501fcf 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -52,11 +52,22 @@ protected: void setUp() override {} void tearDown() override { + if (_pool) { + _pool->shutdown(); + } + ConnectionImpl::clear(); TimerImpl::clear(); } + auto makePool(ConnectionPool::Options options = {}) { + _pool = + std::make_shared<ConnectionPool>(std::make_shared<PoolImpl>(), "test pool", options); + return _pool; + } + private: + std::shared_ptr<ConnectionPool> _pool; }; void doneWith(const ConnectionPool::ConnectionHandle& conn) { @@ -76,27 +87,27 @@ auto verifyAndGetId(StatusWithConn& swConn) { * another. */ TEST_F(ConnectionPoolTest, SameConn) { - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool"); + auto pool = makePool(); // Grab and stash an id for the first request size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); // Grab and stash an id for the second request size_t conn2Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); // Verify that we hit them, and that they're the same ASSERT(conn1Id); @@ -108,7 +119,7 @@ TEST_F(ConnectionPoolTest, SameConn) { * Verify that connections are obtained in MRU order. */ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool"); + auto pool = makePool(); // Obtain a set of connections constexpr size_t kSize = 100; @@ -132,13 +143,14 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { for (size_t i = 0; i != kSize; ++i) { ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - connections.push_back(std::move(swConn.getValue())); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + connections.push_back(std::move(swConn.getValue())); + }); } + ASSERT_EQ(connections.size(), kSize); // Shuffle them into a random order std::random_device rd; @@ -153,21 +165,23 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { ids.push(static_cast<ConnectionImpl*>(conn.get())->id()); conn->indicateSuccess(); } + ASSERT_EQ(ids.size(), kSize); // Re-obtain the connections. They should come back in the same order // as the IDs in the stack, since the pool returns them in MRU order. for (size_t i = 0; i != kSize; ++i) { ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - const auto id = verifyAndGetId(swConn); - connections.push_back(std::move(swConn.getValue())); - ASSERT(id == ids.top()); - ids.pop(); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + const auto id = verifyAndGetId(swConn); + connections.push_back(std::move(swConn.getValue())); + ASSERT_EQ(id, ids.top()); + ids.pop(); + }); } + ASSERT(ids.empty()); } /** @@ -179,9 +193,9 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { options.refreshRequirement = Milliseconds(1000); options.refreshTimeout = Milliseconds(5000); options.hostTimeout = Minutes(1); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); - ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), 0U); + ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), 0U); // Obtain a set of connections constexpr size_t kSize = 100; @@ -210,17 +224,17 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { std::set<size_t> original_ids; for (size_t i = 0; i != kSize; ++i) { ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - original_ids.insert(verifyAndGetId(swConn)); - connections.push_back(std::move(swConn.getValue())); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + original_ids.insert(verifyAndGetId(swConn)); + connections.push_back(std::move(swConn.getValue())); + }); } ASSERT_EQ(original_ids.size(), kSize); - ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize); + ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize); // Shuffle them into a random order std::random_device rd; @@ -236,25 +250,25 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { // Advance the time, but not enough to age out connections. We should still have them all. PoolImpl::setNow(now + Milliseconds(500)); - ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize); + ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize); // Re-obtain a quarter of the connections, and record their IDs in a set. std::set<size_t> reacquired_ids; for (size_t i = 0; i < kSize / 4; ++i) { ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - reacquired_ids.insert(verifyAndGetId(swConn)); - connections.push_back(std::move(swConn.getValue())); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + reacquired_ids.insert(verifyAndGetId(swConn)); + connections.push_back(std::move(swConn.getValue())); + }); } ASSERT_EQ(reacquired_ids.size(), kSize / 4); ASSERT(std::includes( original_ids.begin(), original_ids.end(), reacquired_ids.begin(), reacquired_ids.end())); - ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize); + ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize); // Put them right back in. while (!connections.empty()) { @@ -264,40 +278,40 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { } // We should still have all of them in the pool - ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize); + ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize); // Advance across the host timeout for the 75 connections we // didn't use. Afterwards, the pool should contain only those // kSize/4 connections we used above. PoolImpl::setNow(now + Milliseconds(1000)); - ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize / 4); + ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize / 4); } /** * Verify that a failed connection isn't returned to the pool */ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool"); + auto pool = makePool(); // Grab the first connection and indicate that it failed size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); - swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error")); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = verifyAndGetId(swConn); + swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error")); + }); // Grab the second id size_t conn2Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); // Verify that we hit them, and that they're different ASSERT(conn1Id); @@ -310,27 +324,27 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { * connections. */ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool"); + auto pool = makePool(); // Conn 1 from port 30000 size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort("localhost:30000"), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort("localhost:30000"), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); // Conn 2 from port 30001 size_t conn2Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort("localhost:30001"), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort("localhost:30001"), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); // Hit them and not the same ASSERT(conn1Id); @@ -342,27 +356,27 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { * Verify that not returning handle's to the pool spins up new connections. */ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) { - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool"); + auto pool = makePool(); // Get the first connection, move it out rather than letting it return ConnectionPool::ConnectionHandle conn1; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - conn1 = std::move(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + conn1 = std::move(swConn.getValue()); + }); // Get the second connection, move it out rather than letting it return ConnectionPool::ConnectionHandle conn2; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - conn2 = std::move(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + conn2 = std::move(swConn.getValue()); + }); // Verify that the two connections are different ASSERT_NE(conn1.get(), conn2.get()); @@ -378,7 +392,7 @@ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) { * Note that the lack of pushSetup() calls delays the get. */ TEST_F(ConnectionPoolTest, TimeoutOnSetup) { - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool"); + auto pool = makePool(); bool notOk = false; @@ -386,7 +400,7 @@ TEST_F(ConnectionPoolTest, TimeoutOnSetup) { PoolImpl::setNow(now); - pool.get_forTest( + pool->get_forTest( HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { notOk = !swConn.isOK(); }); @@ -414,7 +428,7 @@ TEST_F(ConnectionPoolTest, refreshHappens) { ConnectionPool::Options options; options.refreshRequirement = Milliseconds(1000); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); @@ -422,12 +436,12 @@ TEST_F(ConnectionPoolTest, refreshHappens) { // Get a connection ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); // After 1 second, one refresh has occurred PoolImpl::setNow(now + Milliseconds(1000)); @@ -450,7 +464,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { ConnectionPool::Options options; options.refreshRequirement = Milliseconds(1000); options.refreshTimeout = Milliseconds(2000); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); @@ -460,23 +474,23 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { // Grab a connection and verify it's good ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); PoolImpl::setNow(now + Milliseconds(500)); size_t conn2Id = 0; // Make sure we still get the first one - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); ASSERT_EQ(conn1Id, conn2Id); // This should trigger a refresh, but not time it out. So now we have one @@ -487,13 +501,13 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { // This will wait because we have a refreshing connection, so it'll wait to // see if that pans out. In this case, we'll get a failure on timeout. ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(1000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(!swConn.isOK()); + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(!swConn.isOK()); - reachedA = true; - }); + reachedA = true; + }); ASSERT(!reachedA); PoolImpl::setNow(now + Milliseconds(3000)); @@ -503,13 +517,13 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { bool reachedB = false; // Make sure we can get a new connection - pool.get_forTest(HostAndPort(), - Milliseconds(1000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(verifyAndGetId(swConn), conn1Id); - reachedB = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(verifyAndGetId(swConn), conn1Id); + reachedB = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedA); ASSERT(reachedB); @@ -519,31 +533,31 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { * Verify that requests are served in expiration order, not insertion order */ TEST_F(ConnectionPoolTest, requestsServedByUrgency) { - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool"); + auto pool = makePool(); bool reachedA = false; bool reachedB = false; ConnectionPool::ConnectionHandle conn; - pool.get_forTest(HostAndPort(), - Milliseconds(2000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + pool->get_forTest(HostAndPort(), + Milliseconds(2000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); - reachedA = true; - doneWith(swConn.getValue()); - }); + reachedA = true; + doneWith(swConn.getValue()); + }); - pool.get_forTest(HostAndPort(), - Milliseconds(1000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); - reachedB = true; + reachedB = true; - conn = std::move(swConn.getValue()); - }); + conn = std::move(swConn.getValue()); + }); ConnectionImpl::pushSetup(Status::OK()); @@ -566,7 +580,7 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { ConnectionPool::Options options; options.minConnections = 1; options.maxConnections = 2; - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); ConnectionPool::ConnectionHandle conn1; ConnectionPool::ConnectionHandle conn2; @@ -574,27 +588,27 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { // Make 3 requests, each which keep their connection (don't return it to // the pool) - pool.get_forTest(HostAndPort(), - Milliseconds(3000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - - conn3 = std::move(swConn.getValue()); - }); - pool.get_forTest(HostAndPort(), - Milliseconds(2000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - - conn2 = std::move(swConn.getValue()); - }); - pool.get_forTest(HostAndPort(), - Milliseconds(1000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - - conn1 = std::move(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(3000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn3 = std::move(swConn.getValue()); + }); + pool->get_forTest(HostAndPort(), + Milliseconds(2000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn2 = std::move(swConn.getValue()); + }); + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn1 = std::move(swConn.getValue()); + }); ConnectionImpl::pushSetup(Status::OK()); ConnectionImpl::pushSetup(Status::OK()); @@ -624,7 +638,7 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) { ConnectionPool::Options options; options.minConnections = 1; options.maxConnecting = 2; - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); ConnectionPool::ConnectionHandle conn1; ConnectionPool::ConnectionHandle conn2; @@ -632,27 +646,27 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) { // Make 3 requests, each which keep their connection (don't return it to // the pool) - pool.get_forTest(HostAndPort(), - Milliseconds(3000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - - conn3 = std::move(swConn.getValue()); - }); - pool.get_forTest(HostAndPort(), - Milliseconds(2000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - - conn2 = std::move(swConn.getValue()); - }); - pool.get_forTest(HostAndPort(), - Milliseconds(1000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - - conn1 = std::move(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(3000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn3 = std::move(swConn.getValue()); + }); + pool->get_forTest(HostAndPort(), + Milliseconds(2000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn2 = std::move(swConn.getValue()); + }); + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn1 = std::move(swConn.getValue()); + }); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); ConnectionImpl::pushSetup(Status::OK()); @@ -683,7 +697,7 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) { ConnectionPool::Options options; options.maxConnecting = 1; options.refreshRequirement = Milliseconds(1000); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); @@ -691,12 +705,12 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) { // Get a connection ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); @@ -707,13 +721,13 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) { bool reachedA = false; // Try to get another connection - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - doneWith(swConn.getValue()); - reachedA = true; - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + reachedA = true; + }); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); ASSERT(!reachedA); @@ -730,19 +744,19 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) { options.maxConnecting = 2; options.minConnections = 3; options.refreshRequirement = Milliseconds(1000); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); PoolImpl::setNow(now); // Get us spun up to 3 connections in the pool - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); ConnectionImpl::pushSetup(Status::OK()); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); @@ -758,12 +772,12 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) { // Start 5 new requests for (size_t i = 0; i < conns.size(); ++i) { - pool.get_forTest(HostAndPort(), - Milliseconds(static_cast<int>(1000 + i)), - [&conns, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - conns[i] = std::move(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(static_cast<int>(1000 + i)), + [&conns, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + conns[i] = std::move(swConn.getValue()); + }); } auto firstNBound = [&](size_t n) { @@ -822,7 +836,7 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { options.maxConnections = 3; options.refreshRequirement = Milliseconds(1000); options.refreshTimeout = Milliseconds(2000); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); @@ -833,13 +847,13 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { ConnectionPool::ConnectionHandle conn3; // Grab one connection without returning it - pool.get_forTest(HostAndPort(), - Milliseconds(1000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); - conn1 = std::move(swConn.getValue()); - }); + conn1 = std::move(swConn.getValue()); + }); bool reachedA = false; bool reachedB = false; @@ -865,20 +879,20 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { ASSERT(!reachedC); // Two more get's without returns - pool.get_forTest(HostAndPort(), - Milliseconds(2000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + pool->get_forTest(HostAndPort(), + Milliseconds(2000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); - conn2 = std::move(swConn.getValue()); - }); - pool.get_forTest(HostAndPort(), - Milliseconds(3000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + conn2 = std::move(swConn.getValue()); + }); + pool->get_forTest(HostAndPort(), + Milliseconds(3000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); - conn3 = std::move(swConn.getValue()); - }); + conn3 = std::move(swConn.getValue()); + }); ASSERT(conn2); ASSERT(conn3); @@ -931,7 +945,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { options.refreshRequirement = Milliseconds(5000); options.refreshTimeout = Milliseconds(5000); options.hostTimeout = Milliseconds(1000); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); @@ -942,13 +956,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { bool reachedA = false; // Grab 1 connection and return it ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = verifyAndGetId(swConn); - reachedA = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + connId = verifyAndGetId(swConn); + reachedA = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedA); @@ -959,13 +973,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { // Verify that a new connection was spawned ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, verifyAndGetId(swConn)); - reachedB = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(connId, verifyAndGetId(swConn)); + reachedB = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedB); } @@ -980,7 +994,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { options.refreshRequirement = Milliseconds(5000); options.refreshTimeout = Milliseconds(5000); options.hostTimeout = Milliseconds(1000); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); @@ -992,13 +1006,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { // Grab and return ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = verifyAndGetId(swConn); - reachedA = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + connId = verifyAndGetId(swConn); + reachedA = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedA); // Jump almost up to the hostTimeout @@ -1006,26 +1020,26 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { bool reachedB = false; // Same connection - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, verifyAndGetId(swConn)); - reachedB = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_EQ(connId, verifyAndGetId(swConn)); + reachedB = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedB); // Now our timeout should be 1999 ms from 'now' instead of 1000 ms // if we do another 'get' we should still get the original connection PoolImpl::setNow(now + Milliseconds(1500)); bool reachedB2 = false; - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, verifyAndGetId(swConn)); - reachedB2 = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_EQ(connId, verifyAndGetId(swConn)); + reachedB2 = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedB2); // We should time out when we get to 'now' + 2500 ms @@ -1034,13 +1048,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { bool reachedC = false; // Different id ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, verifyAndGetId(swConn)); - reachedC = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(connId, verifyAndGetId(swConn)); + reachedC = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedC); } @@ -1055,7 +1069,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { options.refreshRequirement = Milliseconds(5000); options.refreshTimeout = Milliseconds(5000); options.hostTimeout = Milliseconds(1000); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); @@ -1067,23 +1081,23 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { // save 1 connection ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); - conn1 = std::move(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = verifyAndGetId(swConn); + conn1 = std::move(swConn.getValue()); + }); ASSERT(conn1Id); // return the second ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); ASSERT(conn2Id); @@ -1093,13 +1107,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { bool reachedA = false; // conn 2 is still there - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(conn2Id, verifyAndGetId(swConn)); - reachedA = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_EQ(conn2Id, verifyAndGetId(swConn)); + reachedA = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedA); @@ -1114,14 +1128,14 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { // make sure that this is a new id ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(conn1Id, verifyAndGetId(swConn)); - ASSERT_NE(conn2Id, verifyAndGetId(swConn)); - reachedB = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(conn1Id, verifyAndGetId(swConn)); + ASSERT_NE(conn2Id, verifyAndGetId(swConn)); + reachedB = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedB); } @@ -1135,7 +1149,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { options.maxConnections = 1; options.refreshRequirement = Seconds(1); options.refreshTimeout = Seconds(2); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); PoolImpl::setNow(now); @@ -1143,22 +1157,22 @@ TEST_F(ConnectionPoolTest, dropConnections) { // Grab the first connection id size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); ASSERT(conn1Id); // Grab it and this time keep it out of the pool ConnectionPool::ConnectionHandle handle; - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(verifyAndGetId(swConn), conn1Id); - handle = std::move(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_EQ(verifyAndGetId(swConn), conn1Id); + handle = std::move(swConn.getValue()); + }); ASSERT(handle); @@ -1166,16 +1180,16 @@ TEST_F(ConnectionPoolTest, dropConnections) { // Queue up a request. This won't fire until we drop connections, then it // will fail. - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(!swConn.isOK()); - reachedA = true; - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(!swConn.isOK()); + reachedA = true; + }); ASSERT(!reachedA); // fails the previous get - pool.dropConnections(HostAndPort()); + pool->dropConnections(HostAndPort()); ASSERT(reachedA); @@ -1187,20 +1201,20 @@ TEST_F(ConnectionPoolTest, dropConnections) { // connection size_t conn2Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); - ASSERT_NE(conn2Id, conn1Id); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = verifyAndGetId(swConn); + ASSERT_NE(conn2Id, conn1Id); + doneWith(swConn.getValue()); + }); ASSERT(conn2Id); // Push conn2 into refresh PoolImpl::setNow(now + Milliseconds(1500)); // drop the connections - pool.dropConnections(HostAndPort()); + pool->dropConnections(HostAndPort()); // refresh still pending PoolImpl::setNow(now + Milliseconds(2500)); @@ -1209,13 +1223,13 @@ TEST_F(ConnectionPoolTest, dropConnections) { // being pending bool reachedB = false; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(verifyAndGetId(swConn), conn2Id); - reachedB = true; - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(verifyAndGetId(swConn), conn2Id); + reachedB = true; + doneWith(swConn.getValue()); + }); ASSERT(reachedB); } @@ -1228,13 +1242,13 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) { options.maxConnections = 1; options.refreshTimeout = Seconds(2); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); PoolImpl::setNow(now); boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn1; - pool.get_forTest( + pool->get_forTest( HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1 = std::move(swConn); }); @@ -1248,7 +1262,7 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) { ASSERT(!conn1); // Get conn2 (which should have an extra second before the timeout) - pool.get_forTest( + pool->get_forTest( HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress); }); @@ -1269,7 +1283,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { options.maxConnections = 1; options.refreshTimeout = Seconds(2); options.refreshRequirement = Seconds(3); - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); PoolImpl::setNow(now); @@ -1277,7 +1291,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { // Successfully get a new connection size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest( + pool->get_forTest( HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -1288,7 +1302,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { PoolImpl::setNow(now + Seconds(3)); boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn1; - pool.get_forTest( + pool->get_forTest( HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1 = std::move(swConn); }); @@ -1301,7 +1315,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { ASSERT(!conn1); // Get conn2 (which should have an extra second before the timeout) - pool.get_forTest( + pool->get_forTest( HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress); }); @@ -1313,8 +1327,8 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { ASSERT(conn1->getStatus().code() == ErrorCodes::NetworkInterfaceExceededTimeLimit); } -template <typename T> -void dropConnectionsTest(ConnectionPool& pool, T& t) { +template <typename Ptr> +void dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t) { auto now = Date_t::now(); PoolImpl::setNow(now); @@ -1325,69 +1339,70 @@ void dropConnectionsTest(ConnectionPool& pool, T& t) { // Successfully get connections to two hosts ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(hap1, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + pool->get_forTest(hap1, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { doneWith(swConn.getValue()); }); ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(hap2, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + pool->get_forTest(hap2, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { doneWith(swConn.getValue()); }); ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(hap3, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + pool->get_forTest(hap3, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { doneWith(swConn.getValue()); }); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap1)); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap2)); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap3)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap1)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap2)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap3)); - t.dropConnections(transport::Session::kPending); + t->dropConnections(transport::Session::kPending); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap1)); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap2)); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap3)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap1)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap2)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap3)); - t.mutateTags(hap1, [](transport::Session::TagMask tags) { + t->mutateTags(hap1, [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen | transport::Session::kInternalClient; }); - t.mutateTags(hap2, - [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; }); + t->mutateTags(hap2, + [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; }); - t.mutateTags( + t->mutateTags( hap3, [](transport::Session::TagMask tags) { return transport::Session::kEmptyTagMask; }); - t.dropConnections(transport::Session::kKeepOpen); + t->dropConnections(transport::Session::kKeepOpen); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap1)); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap2)); - ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap3)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap1)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap2)); + ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap3)); - t.dropConnections(transport::Session::kInternalClient); + t->dropConnections(transport::Session::kInternalClient); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap1)); - ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap2)); - ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap3)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap1)); + ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap2)); + ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap3)); ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest(hap4, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + pool->get_forTest(hap4, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { doneWith(swConn.getValue()); }); // drop connections by hostAndPort - t.dropConnections(hap1); + t->dropConnections(hap1); - ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap1)); - ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap2)); - ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap3)); - ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap4)); + ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap1)); + ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap2)); + ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap3)); + ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap4)); } TEST_F(ConnectionPoolTest, DropConnections) { ConnectionPool::Options options; - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + options.minConnections = 0; + auto pool = makePool(options); dropConnectionsTest(pool, pool); } @@ -1395,16 +1410,17 @@ TEST_F(ConnectionPoolTest, DropConnections) { TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) { EgressTagCloserManager manager; ConnectionPool::Options options; + options.minConnections = 0; options.egressTagCloserManager = &manager; - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); - dropConnectionsTest(pool, manager); + dropConnectionsTest(pool, &manager); } TEST_F(ConnectionPoolTest, AsyncGet) { ConnectionPool::Options options; options.maxConnections = 1; - ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); + auto pool = makePool(options); auto now = Date_t::now(); PoolImpl::setNow(now); @@ -1414,7 +1430,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) { size_t connId = 0; // no connections in the pool, our future is not satisfied - auto connFuture = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + auto connFuture = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); ASSERT_FALSE(connFuture.isReady()); // Successfully get a new connection @@ -1438,8 +1454,8 @@ TEST_F(ConnectionPoolTest, AsyncGet) { size_t connId2 = 0; size_t connId3 = 0; - auto connFuture1 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); - auto connFuture2 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10}); + auto connFuture1 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + auto connFuture2 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10}); // Queue up the second future to resolve as soon as it is ready std::move(connFuture2).getAsync([&](StatusWithConn swConn) mutable { @@ -1455,7 +1471,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) { decltype(connFuture1) connFuture3; std::move(connFuture1).getAsync([&](StatusWithConn swConn) mutable { // Grab our third future while our first one is being fulfilled - connFuture3 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + connFuture3 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); connId1 = verifyAndGetId(swConn); doneWith(swConn.getValue()); diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index 86adce43faa..6c0321fa619 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -226,6 +226,10 @@ std::shared_ptr<ConnectionPool::TimerInterface> PoolImpl::makeTimer() { return stdx::make_unique<TimerImpl>(this); } +const std::shared_ptr<OutOfLineExecutor>& PoolImpl::getExecutor() { + return _executor; +} + Date_t PoolImpl::now() { return _now.get_value_or(Date_t::now()); } diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index 2ecb6ca5279..cbd6657a81a 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -139,8 +139,27 @@ private: class InlineOutOfLineExecutor : public OutOfLineExecutor { public: void schedule(Task task) override { - std::move(task)(Status::OK()); + // Add the task to our queue + _taskQueue.emplace_back(std::move(task)); + + // Make sure we're not already inline executing + if (std::exchange(_inSchedule, true)) { + return; + } + + // Clear out our queue + while (!_taskQueue.empty()) { + auto task = std::move(_taskQueue.front()); + std::move(task)(Status::OK()); + _taskQueue.pop_front(); + } + + // Admit we're not working on the queue anymore + _inSchedule = false; } + + bool _inSchedule; + std::deque<Task> _taskQueue; }; /** @@ -150,6 +169,7 @@ class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface { friend class ConnectionImpl; public: + PoolImpl() = default; std::shared_ptr<ConnectionPool::ConnectionInterface> makeConnection( const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, @@ -157,10 +177,7 @@ public: std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override; - OutOfLineExecutor& getExecutor() override { - static InlineOutOfLineExecutor _executor; - return _executor; - } + const std::shared_ptr<OutOfLineExecutor>& getExecutor() override; Date_t now() override; @@ -175,6 +192,7 @@ public: private: ConnectionPool* _pool = nullptr; + std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>(); static boost::optional<Date_t> _now; }; diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp index e4a0d9e35ae..44ca5e60f79 100644 --- a/src/mongo/executor/connection_pool_tl.cpp +++ b/src/mongo/executor/connection_pool_tl.cpp @@ -327,10 +327,14 @@ void TLConnection::cancelAsync() { _client->cancel(); } +auto TLTypeFactory::reactor() { + return checked_pointer_cast<transport::Reactor>(_executor); +} + std::shared_ptr<ConnectionPool::ConnectionInterface> TLTypeFactory::makeConnection( const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, size_t generation) { auto conn = std::make_shared<TLConnection>(shared_from_this(), - _reactor, + reactor(), getGlobalServiceContext(), hostAndPort, sslMode, @@ -342,13 +346,13 @@ std::shared_ptr<ConnectionPool::ConnectionInterface> TLTypeFactory::makeConnecti } std::shared_ptr<ConnectionPool::TimerInterface> TLTypeFactory::makeTimer() { - auto timer = std::make_shared<TLTimer>(shared_from_this(), _reactor); + auto timer = std::make_shared<TLTimer>(shared_from_this(), reactor()); fasten(timer.get()); return timer; } Date_t TLTypeFactory::now() { - return _reactor->now(); + return checked_cast<transport::Reactor*>(_executor.get())->now(); } } // namespace connection_pool_tl diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h index 31317c280ef..7a138589055 100644 --- a/src/mongo/executor/connection_pool_tl.h +++ b/src/mongo/executor/connection_pool_tl.h @@ -50,7 +50,7 @@ public: transport::TransportLayer* tl, std::unique_ptr<NetworkConnectionHook> onConnectHook, const ConnectionPool::Options& connPoolOptions) - : _reactor(std::move(reactor)), + : _executor(std::move(reactor)), _tl(tl), _onConnectHook(std::move(onConnectHook)), _connPoolOptions(connPoolOptions) {} @@ -60,8 +60,8 @@ public: transport::ConnectSSLMode sslMode, size_t generation) override; std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override; - OutOfLineExecutor& getExecutor() override { - return *_reactor; + const std::shared_ptr<OutOfLineExecutor>& getExecutor() override { + return _executor; } Date_t now() override; @@ -72,7 +72,9 @@ public: void release(Type* type); private: - transport::ReactorHandle _reactor; + auto reactor(); + + std::shared_ptr<OutOfLineExecutor> _executor; // This is always a transport::Reactor transport::TransportLayer* _tl; std::unique_ptr<NetworkConnectionHook> _onConnectHook; const ConnectionPool::Options _connPoolOptions; diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index a760e6f120d..643cce32ae9 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -98,7 +98,7 @@ void NetworkInterfaceTL::startup() { _reactor = _tl->getReactor(transport::TransportLayer::kNewReactor); auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>( _reactor, _tl, std::move(_onConnectHook), _connPoolOpts); - _pool = std::make_unique<ConnectionPool>( + _pool = std::make_shared<ConnectionPool>( std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts); _ioThread = stdx::thread([this] { setThreadName(_instanceName); diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index d6f50032111..1906220a96f 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -141,7 +141,7 @@ private: mutable stdx::mutex _mutex; ConnectionPool::Options _connPoolOpts; std::unique_ptr<NetworkConnectionHook> _onConnectHook; - std::unique_ptr<ConnectionPool> _pool; + std::shared_ptr<ConnectionPool> _pool; Counters _counters; std::unique_ptr<rpc::EgressMetadataHook> _metadataHook; diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h index 8371d878e3f..ff1f4a511a1 100644 --- a/src/mongo/util/out_of_line_executor.h +++ b/src/mongo/util/out_of_line_executor.h @@ -70,8 +70,7 @@ public: */ virtual void schedule(Task func) = 0; -protected: - ~OutOfLineExecutor() noexcept {} + virtual ~OutOfLineExecutor() = default; }; using ExecutorPtr = std::shared_ptr<OutOfLineExecutor>; |