diff options
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 202 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 11 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 174 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.h | 15 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_tl.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 52 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 14 |
7 files changed, 246 insertions, 225 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 4cbc42dc2ec..41e4e602c65 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -140,6 +140,12 @@ public: Future<ConnectionHandle> getConnection(Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk); /** + * Gets a connection from the specific pool if a connection is available and there are no + * outstanding requests. + */ + boost::optional<ConnectionHandle> tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk); + + /** * Triggers the shutdown procedure. This function marks the state as kInShutdown * and calls processFailure below with the status provided. This may not immediately * delist or destruct this pool. However, both will happen eventually as ConnectionHandles @@ -222,21 +228,15 @@ private: } }; - ConnectionHandle makeHandle(ConnectionInterface* connection); - - void finishRefresh(stdx::unique_lock<stdx::mutex> lk, - ConnectionInterface* connPtr, - Status status); - void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn); void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk); void spawnConnections(stdx::unique_lock<stdx::mutex>& lk); - // This internal helper is used both by get and by fulfillRequests and differs in that it + // This internal helper is used both by tryGet and by fulfillRequests and differs in that it // skips some bookkeeping that the other callers do on their own - ConnectionHandle tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk); + boost::optional<ConnectionHandle> tryGetInternal(const stdx::unique_lock<stdx::mutex>& lk); template <typename OwnershipPoolType> typename OwnershipPoolType::mapped_type takeFromPool( @@ -398,6 +398,23 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort, return get(hostAndPort, transport::kGlobalSSLMode, timeout).getAsync(std::move(cb)); } +boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::tryGet( + const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + auto iter = _pools.find(hostAndPort); + + if (iter == _pools.end()) { + return boost::none; + } + + const auto& pool = iter->second; + invariant(pool); + pool->fassertSSLModeIs(sslMode); + + return pool->tryGetConnection(lk); +} + Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, Milliseconds timeout) { @@ -506,14 +523,6 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) { invariant(_state != State::kInShutdown); - auto conn = tryGetConnection(lk); - - updateStateInLock(); - - if (conn) { - return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn)); - } - if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) { timeout = _parent->_options.refreshTimeout; } @@ -526,25 +535,28 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec updateStateInLock(); - lk.unlock(); - _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { - fassert(20000, schedStatus); - - spawnConnections(lk); - })); + spawnConnections(lk); + fulfillRequests(lk); return std::move(pf.future); } -auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle { - auto fun = guardCallback( - [this](auto lk, auto connection) { returnConnection(connection, std::move(lk)); }); +boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetConnection( + const stdx::unique_lock<stdx::mutex>& lk) { + invariant(_state != State::kInShutdown); + + if (_requests.size()) { + return boost::none; + } - auto handle = ConnectionHandle(connection, fun); - return handle; + auto conn = tryGetInternal(lk); + + updateStateInLock(); + + return conn; } -ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection( +boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetInternal( const stdx::unique_lock<stdx::mutex>&) { while (_readyPool.size()) { @@ -570,55 +582,14 @@ ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection( // pass it to the user connPtr->resetToUnknown(); - auto handle = makeHandle(connPtr); - return handle; + return ConnectionHandle(connPtr, + guardCallback([this](stdx::unique_lock<stdx::mutex> localLk, + ConnectionPool::ConnectionInterface* conn) { + returnConnection(conn, std::move(localLk)); + })); } - return {}; -} - -void ConnectionPool::SpecificPool::finishRefresh(stdx::unique_lock<stdx::mutex> lk, - ConnectionInterface* connPtr, - Status status) { - auto conn = takeFromProcessingPool(connPtr); - - // If we're in shutdown, we don't need refreshed connections - if (_state == State::kInShutdown) - return; - - // If we've exceeded the time limit, start a new connect, - // rather than failing all operations. We do this because the - // various callers have their own time limit which is unrelated - // to our internal one. - if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { - LOG(0) << "Pending connection to host " << _hostAndPort - << " did not complete within the connection timeout," - << " retrying with a new connection;" << openConnections(lk) - << " connections to that host remain open"; - spawnConnections(lk); - return; - } - - // Pass a failure on through - if (!status.isOK()) { - processFailure(status, std::move(lk)); - return; - } - - // If the host and port were dropped, let this lapse and spawn new connections - if (conn->getGeneration() != _generation) { - spawnConnections(lk); - return; - } - - // If the connection refreshed successfully, throw it back in the ready pool - addToReady(lk, std::move(conn)); - - lk.unlock(); - _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { - fassert(20003, schedStatus); - fulfillRequests(lk); - })); + return boost::none; } void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, @@ -660,19 +631,49 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // Unlock in case refresh can occur immediately lk.unlock(); connPtr->refresh(_parent->_options.refreshTimeout, - guardCallback([this](auto lk, auto conn, auto status) { - finishRefresh(std::move(lk), conn, status); + guardCallback([this](stdx::unique_lock<stdx::mutex> lk, + ConnectionInterface* connPtr, + Status status) { + auto conn = takeFromProcessingPool(connPtr); + + // If we're in shutdown, we don't need refreshed connections + if (_state == State::kInShutdown) + return; + + // If the connection refreshed successfully, throw it back in + // the ready pool + if (status.isOK()) { + // If the host and port were dropped, let this lapse + if (conn->getGeneration() == _generation) { + addToReady(lk, std::move(conn)); + fulfillRequests(lk); + } + + return; + } + + // If we've exceeded the time limit, start a new connect, + // rather than failing all operations. We do this because the + // various callers have their own time limit which is unrelated + // to our internal one. + if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { + log() << "Pending connection to host " << _hostAndPort + << " did not complete within the connection timeout," + << " retrying with a new connection;" << openConnections(lk) + << " connections to that host remain open"; + spawnConnections(lk); + return; + } + + // Otherwise pass the failure on through + processFailure(status, std::move(lk)); })); lk.lock(); } else { // If it's fine as it is, just put it in the ready queue addToReady(lk, std::move(conn)); - - lk.unlock(); - _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) { - fassert(20004, schedStatus); - fulfillRequests(lk); - })); + // TODO This should be scheduled on an executor once we have executor-aware pooling + fulfillRequests(lk); } updateStateInLock(); @@ -783,7 +784,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex // deadlock). // // None of the heap manipulation code throws, but it's something to keep in mind. - auto conn = tryGetConnection(lk); + auto conn = tryGetInternal(lk); if (!conn) { break; @@ -795,7 +796,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex _requests.pop_back(); lk.unlock(); - promise.emplaceValue(std::move(conn)); + promise.emplaceValue(std::move(*conn)); lk.lock(); updateStateInLock(); @@ -846,11 +847,32 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute // Run the setup callback lk.unlock(); - handle->setup(_parent->_options.refreshTimeout, - guardCallback([this](auto lk, auto conn, auto status) { - finishRefresh(std::move(lk), conn, status); - })); - + handle->setup( + _parent->_options.refreshTimeout, + guardCallback([this]( + stdx::unique_lock<stdx::mutex> lk, ConnectionInterface* connPtr, Status status) { + auto conn = takeFromProcessingPool(connPtr); + + // If we're in shutdown, we don't need this conn + if (_state == State::kInShutdown) + return; + + if (status.isOK()) { + // If the host and port was dropped, let the connection lapse + if (conn->getGeneration() == _generation) { + addToReady(lk, std::move(conn)); + fulfillRequests(lk); + } + } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { + // If we've exceeded the time limit, restart the connect, rather than + // failing all operations. We do this because the various callers + // have their own time limit which is unrelated to our internal one. + spawnConnections(lk); + } else { + // If the setup failed, cascade the failure edge + processFailure(status, std::move(lk)); + } + })); // Note that this assumes that the refreshTimeout is sound for the // setupTimeout diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 7066177e438..5903d1b0bda 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -42,7 +42,6 @@ #include "mongo/transport/transport_layer.h" #include "mongo/util/future.h" #include "mongo/util/net/hostandport.h" -#include "mongo/util/out_of_line_executor.h" #include "mongo/util/time_support.h" namespace mongo { @@ -73,7 +72,7 @@ public: using ConnectionHandleDeleter = stdx::function<void(ConnectionInterface* connection)>; using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>; - using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>; + using GetConnectionCallback = stdx::function<void(StatusWith<ConnectionHandle>)>; static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000); // 5mins static const size_t kDefaultMaxConns; @@ -157,6 +156,9 @@ public: Milliseconds timeout, GetConnectionCallback cb); + boost::optional<ConnectionHandle> tryGet(const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode); + void appendConnectionStats(ConnectionPoolStats* stats) const; size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const; @@ -340,11 +342,6 @@ public: size_t generation) = 0; /** - * Return the executor for use with this factory - */ - virtual OutOfLineExecutor& getExecutor() = 0; - - /** * Makes a new timer */ virtual std::shared_ptr<TimerInterface> makeTimer() = 0; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index b36cf8cf749..0e689456afa 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -32,7 +32,6 @@ #include <algorithm> #include <random> #include <stack> -#include <tuple> #include "mongo/executor/connection_pool_test_fixture.h" @@ -59,17 +58,15 @@ protected: private: }; -void doneWith(const ConnectionPool::ConnectionHandle& conn) { - dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess(); +void doneWith(const ConnectionPool::ConnectionHandle& swConn) { + static_cast<ConnectionImpl*>(swConn.get())->indicateSuccess(); } -using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>; - -auto verifyAndGetId(StatusWithConn& swConn) { - ASSERT(swConn.isOK()); - auto& conn = swConn.getValue(); - return dynamic_cast<ConnectionImpl*>(conn.get())->id(); -} +#define CONN2ID(swConn) \ + [](StatusWith<ConnectionPool::ConnectionHandle>& swConn) { \ + ASSERT(swConn.isOK()); \ + return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \ + }(swConn) /** * Verify that we get the same connection if we grab one, return it and grab @@ -84,7 +81,7 @@ TEST_F(ConnectionPoolTest, SameConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); + conn1Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); @@ -94,7 +91,7 @@ TEST_F(ConnectionPoolTest, SameConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); + conn2Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); @@ -162,7 +159,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); - const auto id = verifyAndGetId(swConn); + const auto id = CONN2ID(swConn); connections.push_back(std::move(swConn.getValue())); ASSERT(id == ids.top()); ids.pop(); @@ -214,7 +211,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); - original_ids.insert(verifyAndGetId(swConn)); + original_ids.insert(CONN2ID(swConn)); connections.push_back(std::move(swConn.getValue())); }); } @@ -246,7 +243,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); - reacquired_ids.insert(verifyAndGetId(swConn)); + reacquired_ids.insert(CONN2ID(swConn)); connections.push_back(std::move(swConn.getValue())); }); } @@ -285,7 +282,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); + conn1Id = CONN2ID(swConn); swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error")); }); @@ -295,7 +292,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); + conn2Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); @@ -318,7 +315,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { pool.get_forTest(HostAndPort("localhost:30000"), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); + conn1Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); @@ -328,7 +325,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { pool.get_forTest(HostAndPort("localhost:30001"), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); + conn2Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); @@ -463,7 +460,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); + conn1Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); @@ -474,7 +471,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); + conn2Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); ASSERT_EQ(conn1Id, conn2Id); @@ -506,7 +503,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(1000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(verifyAndGetId(swConn), conn1Id); + ASSERT_NE(CONN2ID(swConn), conn1Id); reachedB = true; doneWith(swConn.getValue()); }); @@ -945,7 +942,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = verifyAndGetId(swConn); + connId = CONN2ID(swConn); reachedA = true; doneWith(swConn.getValue()); }); @@ -962,7 +959,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, verifyAndGetId(swConn)); + ASSERT_NE(connId, CONN2ID(swConn)); reachedB = true; doneWith(swConn.getValue()); }); @@ -995,7 +992,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = verifyAndGetId(swConn); + connId = CONN2ID(swConn); reachedA = true; doneWith(swConn.getValue()); }); @@ -1009,7 +1006,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, verifyAndGetId(swConn)); + ASSERT_EQ(connId, CONN2ID(swConn)); reachedB = true; doneWith(swConn.getValue()); }); @@ -1022,7 +1019,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, verifyAndGetId(swConn)); + ASSERT_EQ(connId, CONN2ID(swConn)); reachedB2 = true; doneWith(swConn.getValue()); }); @@ -1037,7 +1034,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, verifyAndGetId(swConn)); + ASSERT_NE(connId, CONN2ID(swConn)); reachedC = true; doneWith(swConn.getValue()); }); @@ -1070,7 +1067,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); + conn1Id = CONN2ID(swConn); conn1 = std::move(swConn.getValue()); }); @@ -1081,7 +1078,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); + conn2Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); @@ -1096,7 +1093,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(conn2Id, verifyAndGetId(swConn)); + ASSERT_EQ(conn2Id, CONN2ID(swConn)); reachedA = true; doneWith(swConn.getValue()); }); @@ -1117,8 +1114,8 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(conn1Id, verifyAndGetId(swConn)); - ASSERT_NE(conn2Id, verifyAndGetId(swConn)); + ASSERT_NE(conn1Id, CONN2ID(swConn)); + ASSERT_NE(conn2Id, CONN2ID(swConn)); reachedB = true; doneWith(swConn.getValue()); }); @@ -1146,7 +1143,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); + conn1Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); ASSERT(conn1Id); @@ -1156,7 +1153,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(verifyAndGetId(swConn), conn1Id); + ASSERT_EQ(CONN2ID(swConn), conn1Id); handle = std::move(swConn.getValue()); }); @@ -1190,7 +1187,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); + conn2Id = CONN2ID(swConn); ASSERT_NE(conn2Id, conn1Id); doneWith(swConn.getValue()); }); @@ -1212,7 +1209,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(verifyAndGetId(swConn), conn2Id); + ASSERT_NE(CONN2ID(swConn), conn2Id); reachedB = true; doneWith(swConn.getValue()); }); @@ -1279,7 +1276,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { ConnectionImpl::pushSetup(Status::OK()); pool.get_forTest( HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); + conn1Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); ASSERT(conn1Id); @@ -1401,7 +1398,7 @@ TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) { dropConnectionsTest(pool, manager); } -TEST_F(ConnectionPoolTest, AsyncGet) { +TEST_F(ConnectionPoolTest, TryGetWorks) { ConnectionPool::Options options; options.maxConnections = 1; ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); @@ -1409,77 +1406,60 @@ TEST_F(ConnectionPoolTest, AsyncGet) { auto now = Date_t::now(); PoolImpl::setNow(now); - // Make our initial connection, use and return it - { - size_t connId = 0; - - // no connections in the pool, our future is not satisfied - auto connFuture = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); - ASSERT_FALSE(connFuture.isReady()); + // no connections in the pool, tryGet should fail + ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode)); - // Successfully get a new connection - ConnectionImpl::pushSetup(Status::OK()); - - // Future should be ready now - ASSERT_TRUE(connFuture.isReady()); - std::move(connFuture).getAsync([&](StatusWithConn swConn) mutable { - connId = verifyAndGetId(swConn); + // Successfully get a new connection + size_t conn1Id = 0; + ConnectionImpl::pushSetup(Status::OK()); + pool.get_forTest( + HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); - ASSERT(connId); - } + ASSERT(conn1Id); - // There is one connection in the pool: - // * The first get should resolve immediately - // * The second get should should be queued - // * The eventual third should be queued before the second - { - size_t connId1 = 0; - size_t connId2 = 0; - size_t connId3 = 0; - - auto connFuture1 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); - auto connFuture2 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10}); - - // Queue up the second future to resolve as soon as it is ready - std::move(connFuture2).getAsync([&](StatusWithConn swConn) mutable { - connId2 = verifyAndGetId(swConn); + // 1 connection in the pool, tryGet should succeed + auto tryGetConn = pool.tryGet(HostAndPort(), transport::kGlobalSSLMode); + ASSERT(tryGetConn); + + // No connection available, this waits in the request queue + size_t conn3Id = 0; + pool.get_forTest( + HostAndPort(), Seconds(2), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn3Id = CONN2ID(swConn); doneWith(swConn.getValue()); }); - // The first future should be immediately ready. The second should be in the queue. - ASSERT_TRUE(connFuture1.isReady()); - ASSERT_FALSE(connFuture2.isReady()); - - // Resolve the first future to return the connection and continue on to the second. - decltype(connFuture1) connFuture3; - std::move(connFuture1).getAsync([&](StatusWithConn swConn) mutable { - // Grab our third future while our first one is being fulfilled - connFuture3 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + ASSERT_EQ(conn3Id, 0ul); - connId1 = verifyAndGetId(swConn); + // We want to wait if there are any outstanding requests (to provide fair access to the pool), + // so we need to call tryGet while fulfilling requests. This triggers that race by actually + // calling tryGet from within a callback (which works, because we drop locks). Not the cleanest + // way to do it, but gets us at least the code coverage we need. + // + // We run before the previous get because our deadline is 1 sec instead of 2 + size_t conn2Id = 0; + pool.get_forTest( + HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = CONN2ID(swConn); doneWith(swConn.getValue()); - }); - ASSERT(connId1); - ASSERT_FALSE(connId2); - - // Since the third future has a smaller timeout than the second, - // it should take priority over the second - ASSERT_TRUE(connFuture3.isReady()); - ASSERT_FALSE(connFuture2.isReady()); + swConn.getValue().reset(); - // Resolve the third future. This should trigger the second future - std::move(connFuture3).getAsync([&](StatusWithConn swConn) mutable { - // We run before the second future - ASSERT_FALSE(connId2); + // we do have one connection + ASSERT_EQUALS(pool.getNumConnectionsPerHost(HostAndPort()), 1ul); - connId3 = verifyAndGetId(swConn); - doneWith(swConn.getValue()); + // we fail because there's an outstanding request, even though we do have a good + // connection + // available. + ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode)); }); - ASSERT_EQ(connId1, connId2); - ASSERT_EQ(connId2, connId3); - } + doneWith(*tryGetConn); + tryGetConn.reset(); + + ASSERT(conn2Id); + ASSERT(conn3Id); } } // namespace connection_pool_test_details diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index 2ecb6ca5279..f41de316950 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -134,16 +134,6 @@ private: }; /** - * An "OutOfLineExecutor" that actually runs on the same thread of execution - */ -class InlineOutOfLineExecutor : public OutOfLineExecutor { -public: - void schedule(Task task) override { - std::move(task)(Status::OK()); - } -}; - -/** * Mock for the pool implementation */ class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface { @@ -157,11 +147,6 @@ public: std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override; - OutOfLineExecutor& getExecutor() override { - static InlineOutOfLineExecutor _executor; - return _executor; - } - Date_t now() override; void shutdown() override { diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h index d614436c49d..17f000ffaeb 100644 --- a/src/mongo/executor/connection_pool_tl.h +++ b/src/mongo/executor/connection_pool_tl.h @@ -56,9 +56,6 @@ public: transport::ConnectSSLMode sslMode, size_t generation) override; std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override; - OutOfLineExecutor& getExecutor() override { - return *_reactor; - } Date_t now() override; diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 1ad164f5c44..a9529d1c825 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -213,18 +213,48 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa return Status::OK(); } - auto connFuture = _pool->get(request.target, request.sslMode, request.timeout) - .tapError([state](Status error) { - LOG(2) << "Failed to get connection from pool for request " - << state->request.id << ": " << error; - }); + // Interacting with the connection pool can involve more work than just getting a connection + // out. In particular, we can end up having to spin up new connections, and fulfilling promises + // for other requesters. Returning connections has the same issue. + // + // To work around it, we make sure to hop onto the reactor thread before getting a connection, + // then making sure to get back to the client thread to do the work (if on a baton). And we + // hook up a connection returning unique_ptr that ensures that however we exit, we always do the + // return on the reactor thread. + // + // TODO: get rid of this cruft once we have a connection pool that's executor aware. + + auto connFuture = [&] { + auto conn = _pool->tryGet(request.target, request.sslMode); + + if (conn) { + return Future<ConnectionPool::ConnectionHandle>(std::move(*conn)); + } + + return _reactor + ->execute([this, state, request, baton] { + return makeReadyFutureWith([this, request] { + return _pool->get(request.target, request.sslMode, request.timeout); + }); + }) + .tapError([state](Status error) { + LOG(2) << "Failed to get connection from pool for request " << state->request.id + << ": " << error; + }); + }().then([this, baton](ConnectionPool::ConnectionHandle conn) { + auto deleter = conn.get_deleter(); + + // TODO: drop out this shared_ptr once we have a unique_function capable future + return std::make_shared<CommandState::ConnHandle>(conn.release(), + CommandState::Deleter{deleter, _reactor}); + }); auto remainingWork = [ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ]( - StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable { + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { makeReadyFutureWith([&] { - auto conn = uassertStatusOK(std::move(swConn)); - return _onAcquireConn(state, std::move(future), std::move(conn), baton); + auto connHandle = uassertStatusOK(std::move(swConn)); + return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton); }) .onError([](Status error) -> StatusWith<RemoteCommandResponse> { // The TransportLayer has, for historical reasons returned SocketException for @@ -253,7 +283,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa // connection std::move(connFuture) .getAsync([ baton, reactor = _reactor.get(), rw = std::move(remainingWork) ]( - StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable { + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ](OperationContext * opCtx) mutable { if (opCtx) { @@ -268,7 +298,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa // otherwise we're happy to run inline std::move(connFuture) .getAsync([rw = std::move(remainingWork)]( - StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable { + StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable { std::move(rw)(std::move(swConn)); }); } @@ -281,7 +311,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn( std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, - ConnectionPool::ConnectionHandle conn, + CommandState::ConnHandle conn, const BatonHandle& baton) { if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) { conn->indicateSuccess(); diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index d6f50032111..2ab75f49aaa 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -97,7 +97,17 @@ private: Date_t deadline = RemoteCommandRequest::kNoExpirationDate; Date_t start; - ConnectionPool::ConnectionHandle conn; + struct Deleter { + ConnectionPool::ConnectionHandleDeleter returner; + transport::ReactorHandle reactor; + + void operator()(ConnectionPool::ConnectionInterface* ptr) const { + reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); }); + } + }; + using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>; + + ConnHandle conn; std::unique_ptr<transport::ReactorTimer> timer; AtomicWord<bool> done; @@ -128,7 +138,7 @@ private: void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle); Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state, Future<RemoteCommandResponse> future, - ConnectionPool::ConnectionHandle conn, + CommandState::ConnHandle conn, const BatonHandle& baton); std::string _instanceName; |