diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2019-07-18 17:32:39 -0400 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2019-07-24 15:10:59 -0400 |
commit | 8656f757f809200b094658d3424e4454e0571ff2 (patch) | |
tree | 7df75ec00a306322e649dcb75d819fcf39b6234e | |
parent | f8ea0937ec194347a4dcaacadc80d2608e137e1e (diff) | |
download | mongo-8656f757f809200b094658d3424e4454e0571ff2.tar.gz |
SERVER-42286 Return connections inline, spawn connections out-of-line
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 55 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 56 |
2 files changed, 50 insertions, 61 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 6a3390b9e27..2381f2bceec 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -325,18 +325,6 @@ public: } } - template <typename CallableT> - void runOnExecutor(CallableT&& cb) { - ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) // - .getAsync([ this, anchor = shared_from_this(), cb = std::forward<CallableT>(cb) ]( - Status && status) mutable { - invariant(status); - - stdx::lock_guard lk(_parent->_mutex); - cb(); - }); - } - private: using OwnedConnection = std::shared_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; @@ -407,8 +395,11 @@ private: // it will be discarded on return/refresh. size_t _generation = 0; - bool _inFulfillRequests = false; - bool _inControlLoop = false; + // When the pool needs to potentially die or spawn connections, updateController() is scheduled + // onto the executor and this flag is set. When updateController() finishes running, this flag + // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once + // it is closer to steady state. + bool _updateScheduled = false; size_t _created = 0; @@ -657,12 +648,10 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle { auto deleter = [ this, anchor = shared_from_this() ](ConnectionInterface * connection) { - runOnExecutor([this, connection]() { - returnConnection(connection); - - _lastActiveTime = _parent->_factory->now(); - updateState(); - }); + stdx::lock_guard lk(_parent->_mutex); + returnConnection(connection); + _lastActiveTime = _parent->_factory->now(); + updateState(); }; return ConnectionHandle(connection, std::move(deleter)); } @@ -890,13 +879,6 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status) { // fulfills as many outstanding requests as possible void ConnectionPool::SpecificPool::fulfillRequests() { - // If some other thread (possibly this thread) is fulfilling requests, - // don't keep padding the callstack. - if (_inFulfillRequests) - return; - - _inFulfillRequests = true; - auto guard = makeGuard([&] { _inFulfillRequests = false; }); while (_requests.size()) { // Marking this as our newest active time _lastActiveTime = _parent->_factory->now(); @@ -1065,10 +1047,9 @@ void ConnectionPool::SpecificPool::updateEventTimer() { } void ConnectionPool::SpecificPool::updateController() { - if (std::exchange(_inControlLoop, true)) { + if (_health.isShutdown) { return; } - const auto guard = makeGuard([&] { _inControlLoop = false; }); auto& controller = *_parent->_controller; @@ -1116,7 +1097,7 @@ void ConnectionPool::SpecificPool::updateController() { } } - runOnExecutor([this]() { spawnConnections(); }); + spawnConnections(); } // Updates our state and manages the request timer @@ -1129,7 +1110,19 @@ void ConnectionPool::SpecificPool::updateState() { updateEventTimer(); updateHealth(); - updateController(); + + if (std::exchange(_updateScheduled, true)) { + return; + } + + ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) // + .getAsync([ this, anchor = shared_from_this() ](Status && status) mutable { + invariant(status); + + stdx::lock_guard lk(_parent->_mutex); + _updateScheduled = false; + updateController(); + }); } } // namespace executor diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index cf616c7cb67..c47641441ef 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -81,26 +81,31 @@ protected: .semi(); } + void doneWith(ConnectionPool::ConnectionHandle& conn) { + dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess(); + + ExecutorFuture(_executor).getAsync([conn = std::move(conn)](auto){}); + } + + using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>; + + auto getId(const ConnectionPool::ConnectionHandle& conn) { + return dynamic_cast<ConnectionImpl*>(conn.get())->id(); + } + auto verifyAndGetId(StatusWithConn& swConn) { + ASSERT(swConn.isOK()); + auto& conn = swConn.getValue(); + return getId(conn); + } + + template <typename Ptr> + void dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t); + private: std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>(); std::shared_ptr<ConnectionPool> _pool; }; -void doneWith(const ConnectionPool::ConnectionHandle& conn) { - dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess(); -} - -using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>; - -auto getId(const ConnectionPool::ConnectionHandle& conn) { - return dynamic_cast<ConnectionImpl*>(conn.get())->id(); -} -auto verifyAndGetId(StatusWithConn& swConn) { - ASSERT(swConn.isOK()); - auto& conn = swConn.getValue(); - return getId(conn); -} - /** * Verify that we get the same connection if we grab one, return it and grab * another. @@ -154,7 +159,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { try { ConnectionPool::ConnectionHandle conn = std::move(connections.back()); connections.pop_back(); - conn->indicateSuccess(); + doneWith(conn); } catch (...) { } } @@ -182,7 +187,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { ConnectionPool::ConnectionHandle conn = std::move(connections.back()); connections.pop_back(); ids.push(static_cast<ConnectionImpl*>(conn.get())->id()); - conn->indicateSuccess(); + doneWith(conn); } ASSERT_EQ(ids.size(), kSize); @@ -230,7 +235,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { try { ConnectionPool::ConnectionHandle conn = std::move(connections.back()); connections.pop_back(); - conn->indicateSuccess(); + doneWith(conn); } catch (...) { } } @@ -264,7 +269,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { while (!connections.empty()) { ConnectionPool::ConnectionHandle conn = std::move(connections.back()); connections.pop_back(); - conn->indicateSuccess(); + doneWith(conn); } // Advance the time, but not enough to age out connections. We should still have them all. @@ -293,7 +298,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { while (!connections.empty()) { ConnectionPool::ConnectionHandle conn = std::move(connections.back()); connections.pop_back(); - conn->indicateSuccess(); + doneWith(conn); } // We should still have all of them in the pool @@ -585,7 +590,6 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) { ASSERT(!reachedA); doneWith(conn); - conn.reset(); // Now that we've returned the connection, we see the second has been // called @@ -641,7 +645,6 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { // Return 1 ConnectionPool::ConnectionInterface* conn1Ptr = conn1.get(); doneWith(conn1); - conn1.reset(); // Verify that it's the one that pops out for request 3 ASSERT_EQ(conn1Ptr, conn3.get()); @@ -936,15 +939,12 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { // Return each connection over 1, 2 and 3 ms PoolImpl::setNow(now + Milliseconds(1)); doneWith(conn1); - conn1.reset(); PoolImpl::setNow(now + Milliseconds(2)); doneWith(conn2); - conn2.reset(); PoolImpl::setNow(now + Milliseconds(3)); doneWith(conn3); - conn3.reset(); // Jump 5 seconds and verify that refreshes only two refreshes occurred PoolImpl::setNow(now + Milliseconds(5000)); @@ -1138,7 +1138,6 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { // return conn 1 doneWith(conn1); - conn1.reset(); // expire the pool PoolImpl::setNow(now + Milliseconds(2000)); @@ -1214,7 +1213,6 @@ TEST_F(ConnectionPoolTest, dropConnections) { // return the connection doneWith(handle); - handle.reset(); // Make sure that a new connection request properly disposed of the gen1 // connection @@ -1347,7 +1345,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { } template <typename Ptr> -void dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t) { +void ConnectionPoolTest::dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t) { auto now = Date_t::now(); PoolImpl::setNow(now); @@ -1488,7 +1486,6 @@ TEST_F(ConnectionPoolTest, AsyncGet) { connId1 = getId(conn1); doneWith(conn1); - conn1.reset(); ASSERT(connId1); // Since the third future has a smaller timeout than the second, @@ -1504,7 +1501,6 @@ TEST_F(ConnectionPoolTest, AsyncGet) { connId3 = getId(conn3); doneWith(conn3); - conn3.reset(); // The second future is now finally ready ASSERT_TRUE(connFuture2.isReady()); |