diff options
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 202 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 11 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 174 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.h | 15 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 52 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 14 |
7 files changed, 225 insertions, 246 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 41e4e602c65..4cbc42dc2ec 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -140,12 +140,6 @@ public: Future<ConnectionHandle> getConnection(Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk); /** - * Gets a connection from the specific pool if a connection is available and there are no - * outstanding requests. - */ - boost::optional<ConnectionHandle> tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk); - - /** * Triggers the shutdown procedure. This function marks the state as kInShutdown * and calls processFailure below with the status provided. This may not immediately * delist or destruct this pool. However, both will happen eventually as ConnectionHandles @@ -228,15 +222,21 @@ private: } }; + ConnectionHandle makeHandle(ConnectionInterface* connection); + + void finishRefresh(stdx::unique_lock<stdx::mutex> lk, + ConnectionInterface* connPtr, + Status status); + void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn); void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk); void spawnConnections(stdx::unique_lock<stdx::mutex>& lk); - // This internal helper is used both by tryGet and by fulfillRequests and differs in that it + // This internal helper is used both by get and by fulfillRequests and differs in that it // skips some bookkeeping that the other callers do on their own - boost::optional<ConnectionHandle> tryGetInternal(const stdx::unique_lock<stdx::mutex>& lk); + ConnectionHandle tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk); template <typename OwnershipPoolType> typename OwnershipPoolType::mapped_type takeFromPool( @@ -398,23 +398,6 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort, return get(hostAndPort, transport::kGlobalSSLMode, timeout).getAsync(std::move(cb)); } -boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::tryGet( - const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - auto iter = _pools.find(hostAndPort); - - if (iter == _pools.end()) { - return boost::none; - } - - const auto& pool = iter->second; - invariant(pool); - pool->fassertSSLModeIs(sslMode); - - return pool->tryGetConnection(lk); -} - Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, Milliseconds timeout) { @@ -523,6 +506,14 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) { invariant(_state != State::kInShutdown); + auto conn = tryGetConnection(lk); + + updateStateInLock(); + + if (conn) { + return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn)); + } + if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) { timeout = _parent->_options.refreshTimeout; } @@ -535,28 +526,25 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec updateStateInLock(); - spawnConnections(lk); - fulfillRequests(lk); + lk.unlock(); + _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { + fassert(20000, schedStatus); + + spawnConnections(lk); + })); return std::move(pf.future); } -boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetConnection( - const stdx::unique_lock<stdx::mutex>& lk) { - invariant(_state != State::kInShutdown); - - if (_requests.size()) { - return boost::none; - } +auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle { + auto fun = guardCallback( + [this](auto lk, auto connection) { returnConnection(connection, std::move(lk)); }); - auto conn = tryGetInternal(lk); - - updateStateInLock(); - - return conn; + auto handle = ConnectionHandle(connection, fun); + return handle; } -boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetInternal( +ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection( const stdx::unique_lock<stdx::mutex>&) { while (_readyPool.size()) { @@ -582,14 +570,55 @@ boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool:: // pass it to the user connPtr->resetToUnknown(); - return ConnectionHandle(connPtr, - guardCallback([this](stdx::unique_lock<stdx::mutex> localLk, - ConnectionPool::ConnectionInterface* conn) { - returnConnection(conn, std::move(localLk)); - })); + auto handle = makeHandle(connPtr); + return handle; } - return boost::none; + return {}; +} + +void ConnectionPool::SpecificPool::finishRefresh(stdx::unique_lock<stdx::mutex> lk, + ConnectionInterface* connPtr, + Status status) { + auto conn = takeFromProcessingPool(connPtr); + + // If we're in shutdown, we don't need refreshed connections + if (_state == State::kInShutdown) + return; + + // If we've exceeded the time limit, start a new connect, + // rather than failing all operations. We do this because the + // various callers have their own time limit which is unrelated + // to our internal one. + if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { + LOG(0) << "Pending connection to host " << _hostAndPort + << " did not complete within the connection timeout," + << " retrying with a new connection;" << openConnections(lk) + << " connections to that host remain open"; + spawnConnections(lk); + return; + } + + // Pass a failure on through + if (!status.isOK()) { + processFailure(status, std::move(lk)); + return; + } + + // If the host and port were dropped, let this lapse and spawn new connections + if (conn->getGeneration() != _generation) { + spawnConnections(lk); + return; + } + + // If the connection refreshed successfully, throw it back in the ready pool + addToReady(lk, std::move(conn)); + + lk.unlock(); + _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { + fassert(20003, schedStatus); + fulfillRequests(lk); + })); } void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, @@ -631,49 +660,19 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // Unlock in case refresh can occur immediately lk.unlock(); connPtr->refresh(_parent->_options.refreshTimeout, - guardCallback([this](stdx::unique_lock<stdx::mutex> lk, - ConnectionInterface* connPtr, - Status status) { - auto conn = takeFromProcessingPool(connPtr); - - // If we're in shutdown, we don't need refreshed connections - if (_state == State::kInShutdown) - return; - - // If the connection refreshed successfully, throw it back in - // the ready pool - if (status.isOK()) { - // If the host and port were dropped, let this lapse - if (conn->getGeneration() == _generation) { - addToReady(lk, std::move(conn)); - fulfillRequests(lk); - } - - return; - } - - // If we've exceeded the time limit, start a new connect, - // rather than failing all operations. We do this because the - // various callers have their own time limit which is unrelated - // to our internal one. - if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { - log() << "Pending connection to host " << _hostAndPort - << " did not complete within the connection timeout," - << " retrying with a new connection;" << openConnections(lk) - << " connections to that host remain open"; - spawnConnections(lk); - return; - } - - // Otherwise pass the failure on through - processFailure(status, std::move(lk)); + guardCallback([this](auto lk, auto conn, auto status) { + finishRefresh(std::move(lk), conn, status); })); lk.lock(); } else { // If it's fine as it is, just put it in the ready queue addToReady(lk, std::move(conn)); - // TODO This should be scheduled on an executor once we have executor-aware pooling - fulfillRequests(lk); + + lk.unlock(); + _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { + fassert(20004, schedStatus); + fulfillRequests(lk); + })); } updateStateInLock(); @@ -784,7 +783,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex // deadlock). // // None of the heap manipulation code throws, but it's something to keep in mind. - auto conn = tryGetInternal(lk); + auto conn = tryGetConnection(lk); if (!conn) { break; @@ -796,7 +795,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex _requests.pop_back(); lk.unlock(); - promise.emplaceValue(std::move(*conn)); + promise.emplaceValue(std::move(conn)); lk.lock(); updateStateInLock(); @@ -847,32 +846,11 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute // Run the setup callback lk.unlock(); - handle->setup( - _parent->_options.refreshTimeout, - guardCallback([this]( - stdx::unique_lock<stdx::mutex> lk, ConnectionInterface* connPtr, Status status) { - auto conn = takeFromProcessingPool(connPtr); - - // If we're in shutdown, we don't need this conn - if (_state == State::kInShutdown) - return; - - if (status.isOK()) { - // If the host and port was dropped, let the connection lapse - if (conn->getGeneration() == _generation) { - addToReady(lk, std::move(conn)); - fulfillRequests(lk); - } - } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { - // If we've exceeded the time limit, restart the connect, rather than - // failing all operations. We do this because the various callers - // have their own time limit which is unrelated to our internal one. - spawnConnections(lk); - } else { - // If the setup failed, cascade the failure edge - processFailure(status, std::move(lk)); - } - })); + handle->setup(_parent->_options.refreshTimeout, + guardCallback([this](auto lk, auto conn, auto status) { + finishRefresh(std::move(lk), conn, status); + })); + // Note that this assumes that the refreshTimeout is sound for the // setupTimeout diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 5903d1b0bda..7066177e438 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -42,6 +42,7 @@ #include "mongo/transport/transport_layer.h" #include "mongo/util/future.h" #include "mongo/util/net/hostandport.h" +#include "mongo/util/out_of_line_executor.h" #include "mongo/util/time_support.h" namespace mongo { @@ -72,7 +73,7 @@ public: using ConnectionHandleDeleter = stdx::function<void(ConnectionInterface* connection)>; using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>; - using GetConnectionCallback = stdx::function<void(StatusWith<ConnectionHandle>)>; + using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>; static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000); // 5mins static const size_t kDefaultMaxConns; @@ -156,9 +157,6 @@ public: Milliseconds timeout, GetConnectionCallback cb); - boost::optional<ConnectionHandle> tryGet(const HostAndPort& hostAndPort, - transport::ConnectSSLMode sslMode); - void appendConnectionStats(ConnectionPoolStats* stats) const; size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const; @@ -342,6 +340,11 @@ public: size_t generation) = 0; /** + * Return the executor for use with this factory + */ + virtual OutOfLineExecutor& getExecutor() = 0; + + /** * Makes a new timer */ virtual std::shared_ptr<TimerInterface> makeTimer() = 0; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 0e689456afa..b36cf8cf749 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -32,6 +32,7 @@ #include <algorithm> #include <random> #include <stack> +#include <tuple> #include "mongo/executor/connection_pool_test_fixture.h" @@ -58,15 +59,17 @@ protected: private: }; -void doneWith(const ConnectionPool::ConnectionHandle& swConn) { - static_cast<ConnectionImpl*>(swConn.get())->indicateSuccess(); +void doneWith(const ConnectionPool::ConnectionHandle& conn) { + dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess(); } -#define CONN2ID(swConn) \ - [](StatusWith<ConnectionPool::ConnectionHandle>& swConn) { \ - ASSERT(swConn.isOK()); \ - return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \ - }(swConn) +using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>; + +auto verifyAndGetId(StatusWithConn& swConn) { + ASSERT(swConn.isOK()); + auto& conn = swConn.getValue(); + return dynamic_cast<ConnectionImpl*>(conn.get())->id(); +} /** * Verify that we get the same connection if we grab one, return it and grab @@ -81,7 +84,7 @@ TEST_F(ConnectionPoolTest, SameConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -91,7 +94,7 @@ TEST_F(ConnectionPoolTest, SameConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -159,7 +162,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); - const auto id = CONN2ID(swConn); + const auto id = verifyAndGetId(swConn); connections.push_back(std::move(swConn.getValue())); ASSERT(id == ids.top()); ids.pop(); @@ -211,7 +214,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); - original_ids.insert(CONN2ID(swConn)); + original_ids.insert(verifyAndGetId(swConn)); connections.push_back(std::move(swConn.getValue())); }); } @@ -243,7 +246,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); - reacquired_ids.insert(CONN2ID(swConn)); + reacquired_ids.insert(verifyAndGetId(swConn)); connections.push_back(std::move(swConn.getValue())); }); } @@ -282,7 +285,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error")); }); @@ -292,7 +295,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -315,7 +318,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { pool.get_forTest(HostAndPort("localhost:30000"), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -325,7 +328,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { pool.get_forTest(HostAndPort("localhost:30001"), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -460,7 +463,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -471,7 +474,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); ASSERT_EQ(conn1Id, conn2Id); @@ -503,7 +506,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(1000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(CONN2ID(swConn), conn1Id); + ASSERT_NE(verifyAndGetId(swConn), conn1Id); reachedB = true; doneWith(swConn.getValue()); }); @@ -942,7 +945,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = CONN2ID(swConn); + connId = verifyAndGetId(swConn); reachedA = true; doneWith(swConn.getValue()); }); @@ -959,7 +962,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, CONN2ID(swConn)); + ASSERT_NE(connId, verifyAndGetId(swConn)); reachedB = true; doneWith(swConn.getValue()); }); @@ -992,7 +995,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = CONN2ID(swConn); + connId = verifyAndGetId(swConn); reachedA = true; doneWith(swConn.getValue()); }); @@ -1006,7 +1009,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, CONN2ID(swConn)); + ASSERT_EQ(connId, verifyAndGetId(swConn)); reachedB = true; doneWith(swConn.getValue()); }); @@ -1019,7 +1022,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, CONN2ID(swConn)); + ASSERT_EQ(connId, verifyAndGetId(swConn)); reachedB2 = true; doneWith(swConn.getValue()); }); @@ -1034,7 +1037,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, CONN2ID(swConn)); + ASSERT_NE(connId, verifyAndGetId(swConn)); reachedC = true; doneWith(swConn.getValue()); }); @@ -1067,7 +1070,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); conn1 = std::move(swConn.getValue()); }); @@ -1078,7 +1081,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -1093,7 +1096,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(conn2Id, CONN2ID(swConn)); + ASSERT_EQ(conn2Id, verifyAndGetId(swConn)); reachedA = true; doneWith(swConn.getValue()); }); @@ -1114,8 +1117,8 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(conn1Id, CONN2ID(swConn)); - ASSERT_NE(conn2Id, CONN2ID(swConn)); + ASSERT_NE(conn1Id, verifyAndGetId(swConn)); + ASSERT_NE(conn2Id, verifyAndGetId(swConn)); reachedB = true; doneWith(swConn.getValue()); }); @@ -1143,7 +1146,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); ASSERT(conn1Id); @@ -1153,7 +1156,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(CONN2ID(swConn), conn1Id); + ASSERT_EQ(verifyAndGetId(swConn), conn1Id); handle = std::move(swConn.getValue()); }); @@ -1187,7 +1190,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); ASSERT_NE(conn2Id, conn1Id); doneWith(swConn.getValue()); }); @@ -1209,7 +1212,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(CONN2ID(swConn), conn2Id); + ASSERT_NE(verifyAndGetId(swConn), conn2Id); reachedB = true; doneWith(swConn.getValue()); }); @@ -1276,7 +1279,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { ConnectionImpl::pushSetup(Status::OK()); pool.get_forTest( HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); ASSERT(conn1Id); @@ -1398,7 +1401,7 @@ TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) { dropConnectionsTest(pool, manager); } -TEST_F(ConnectionPoolTest, TryGetWorks) { +TEST_F(ConnectionPoolTest, AsyncGet) { ConnectionPool::Options options; options.maxConnections = 1; ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); @@ -1406,60 +1409,77 @@ TEST_F(ConnectionPoolTest, TryGetWorks) { auto now = Date_t::now(); PoolImpl::setNow(now); - // no connections in the pool, tryGet should fail - ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode)); + // Make our initial connection, use and return it + { + size_t connId = 0; - // Successfully get a new connection - size_t conn1Id = 0; - ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest( - HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + // no connections in the pool, our future is not satisfied + auto connFuture = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + ASSERT_FALSE(connFuture.isReady()); + + // Successfully get a new connection + ConnectionImpl::pushSetup(Status::OK()); + + // Future should be ready now + ASSERT_TRUE(connFuture.isReady()); + std::move(connFuture).getAsync([&](StatusWithConn swConn) mutable { + connId = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); - ASSERT(conn1Id); - - // 1 connection in the pool, tryGet should succeed - auto tryGetConn = pool.tryGet(HostAndPort(), transport::kGlobalSSLMode); - ASSERT(tryGetConn); + ASSERT(connId); + } - // No connection available, this waits in the request queue - size_t conn3Id = 0; - pool.get_forTest( - HostAndPort(), Seconds(2), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn3Id = CONN2ID(swConn); + // There is one connection in the pool: + // * The first get should resolve immediately + // * The second get should should be queued + // * The eventual third should be queued before the second + { + size_t connId1 = 0; + size_t connId2 = 0; + size_t connId3 = 0; + + auto connFuture1 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + auto connFuture2 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10}); + + // Queue up the second future to resolve as soon as it is ready + std::move(connFuture2).getAsync([&](StatusWithConn swConn) mutable { + connId2 = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); - ASSERT_EQ(conn3Id, 0ul); + // The first future should be immediately ready. The second should be in the queue. + ASSERT_TRUE(connFuture1.isReady()); + ASSERT_FALSE(connFuture2.isReady()); - // We want to wait if there are any outstanding requests (to provide fair access to the pool), - // so we need to call tryGet while fulfilling requests. This triggers that race by actually - // calling tryGet from within a callback (which works, because we drop locks). Not the cleanest - // way to do it, but gets us at least the code coverage we need. - // - // We run before the previous get because our deadline is 1 sec instead of 2 - size_t conn2Id = 0; - pool.get_forTest( - HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + // Resolve the first future to return the connection and continue on to the second. + decltype(connFuture1) connFuture3; + std::move(connFuture1).getAsync([&](StatusWithConn swConn) mutable { + // Grab our third future while our first one is being fulfilled + connFuture3 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + + connId1 = verifyAndGetId(swConn); doneWith(swConn.getValue()); - swConn.getValue().reset(); + }); + ASSERT(connId1); + ASSERT_FALSE(connId2); - // we do have one connection - ASSERT_EQUALS(pool.getNumConnectionsPerHost(HostAndPort()), 1ul); + // Since the third future has a smaller timeout than the second, + // it should take priority over the second + ASSERT_TRUE(connFuture3.isReady()); + ASSERT_FALSE(connFuture2.isReady()); - // we fail because there's an outstanding request, even though we do have a good - // connection - // available. - ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode)); - }); + // Resolve the third future. This should trigger the second future + std::move(connFuture3).getAsync([&](StatusWithConn swConn) mutable { + // We run before the second future + ASSERT_FALSE(connId2); - doneWith(*tryGetConn); - tryGetConn.reset(); + connId3 = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); - ASSERT(conn2Id); - ASSERT(conn3Id); + ASSERT_EQ(connId1, connId2); + ASSERT_EQ(connId2, connId3); + } } } // namespace connection_pool_test_details diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index f41de316950..2ecb6ca5279 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -134,6 +134,16 @@ private: }; /** + * An "OutOfLineExecutor" that actually runs on the same thread of execution + */ +class InlineOutOfLineExecutor : public OutOfLineExecutor { +public: + void schedule(Task task) override { + std::move(task)(Status::OK()); + } +}; + +/** * Mock for the pool implementation */ class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface { @@ -147,6 +157,11 @@ public: std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override; + OutOfLineExecutor& getExecutor() override { + static InlineOutOfLineExecutor _executor; + return _executor; + } + Date_t now() override; void shutdown() override { diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h index 17f000ffaeb..d614436c49d 100644 --- a/src/mongo/executor/connection_pool_tl.h +++ b/src/mongo/executor/connection_pool_tl.h @@ -56,6 +56,9 @@ public: transport::ConnectSSLMode sslMode, size_t generation) override; std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override; + OutOfLineExecutor& getExecutor() override { + return *_reactor; + } Date_t now() override; diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index a9529d1c825..1ad164f5c44 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -213,48 +213,18 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa return Status::OK(); } - // Interacting with the connection pool can involve more work than just getting a connection - // out. In particular, we can end up having to spin up new connections, and fulfilling promises - // for other requesters. Returning connections has the same issue. - // - // To work around it, we make sure to hop onto the reactor thread before getting a connection, - // then making sure to get back to the client thread to do the work (if on a baton). And we - // hook up a connection returning unique_ptr that ensures that however we exit, we always do the - // return on the reactor thread. - // - // TODO: get rid of this cruft once we have a connection pool that's executor aware. - - auto connFuture = [&] { - auto conn = _pool->tryGet(request.target, request.sslMode); - - if (conn) { - return Future<ConnectionPool::ConnectionHandle>(std::move(*conn)); - } - - return _reactor - ->execute([this, state, request, baton] { - return makeReadyFutureWith([this, request] { - return _pool->get(request.target, request.sslMode, request.timeout); - }); - }) - .tapError([state](Status error) { - LOG(2) << "Failed to get connection from pool for request " << state->request.id - << ": " << error; - }); - }().then([this, baton](ConnectionPool::ConnectionHandle conn) { - auto deleter = conn.get_deleter(); - - // TODO: drop out this shared_ptr once we have a unique_function capable future - return std::make_shared<CommandState::ConnHandle>(conn.release(), - CommandState::Deleter{deleter, _reactor}); - }); + auto connFuture = _pool->get(request.target, request.sslMode, request.timeout) + .tapError([state](Status error) { + LOG(2) << "Failed to get connection from pool for request " + << state->request.id << ": " << error; + }); auto remainingWork = [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ]( - StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable { makeReadyFutureWith([&] { - auto connHandle = uassertStatusOK(std::move(swConn)); - return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton); + auto conn = uassertStatusOK(std::move(swConn)); + return _onAcquireConn(state, std::move(future), std::move(conn), baton); }) .onError([](Status error) -> StatusWith<RemoteCommandResponse> { // The TransportLayer has, for historical reasons returned SocketException for @@ -283,7 +253,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa // connection std::move(connFuture) .getAsync([ baton, reactor = _reactor.get(), rw = std::move(remainingWork) ]( - StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable { baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ](OperationContext * opCtx) mutable { if (opCtx) { @@ -298,7 +268,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa // otherwise we're happy to run inline std::move(connFuture) .getAsync([rw = std::move(remainingWork)]( - StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { + StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable { std::move(rw)(std::move(swConn)); }); } @@ -311,7 +281,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, - CommandState::ConnHandle conn, + ConnectionPool::ConnectionHandle conn, const BatonHandle& baton) { if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) { conn->indicateSuccess(); diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 2ab75f49aaa..d6f50032111 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -97,17 +97,7 @@ private: Date_t deadline = RemoteCommandRequest::kNoExpirationDate; Date_t start; - struct Deleter { - ConnectionPool::ConnectionHandleDeleter returner; - transport::ReactorHandle reactor; - - void operator()(ConnectionPool::ConnectionInterface* ptr) const { - reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); }); - } - }; - using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>; - - ConnHandle conn; + ConnectionPool::ConnectionHandle conn; std::unique_ptr<transport::ReactorTimer> timer; AtomicWord<bool> done; @@ -138,7 +128,7 @@ private: void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle); Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, - CommandState::ConnHandle conn, + ConnectionPool::ConnectionHandle conn, const BatonHandle& baton); std::string _instanceName; |