summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-07-18 17:32:39 -0400
committerBen Caimano <ben.caimano@10gen.com>2019-07-24 15:10:59 -0400
commit8656f757f809200b094658d3424e4454e0571ff2 (patch)
tree7df75ec00a306322e649dcb75d819fcf39b6234e
parentf8ea0937ec194347a4dcaacadc80d2608e137e1e (diff)
downloadmongo-8656f757f809200b094658d3424e4454e0571ff2.tar.gz
SERVER-42286 Return connections inline, spawn connections out-of-line
-rw-r--r--src/mongo/executor/connection_pool.cpp55
-rw-r--r--src/mongo/executor/connection_pool_test.cpp56
2 files changed, 50 insertions, 61 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 6a3390b9e27..2381f2bceec 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -325,18 +325,6 @@ public:
}
}
- template <typename CallableT>
- void runOnExecutor(CallableT&& cb) {
- ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) //
- .getAsync([ this, anchor = shared_from_this(), cb = std::forward<CallableT>(cb) ](
- Status && status) mutable {
- invariant(status);
-
- stdx::lock_guard lk(_parent->_mutex);
- cb();
- });
- }
-
private:
using OwnedConnection = std::shared_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
@@ -407,8 +395,11 @@ private:
// it will be discarded on return/refresh.
size_t _generation = 0;
- bool _inFulfillRequests = false;
- bool _inControlLoop = false;
+ // When the pool needs to potentially die or spawn connections, updateController() is scheduled
+ // onto the executor and this flag is set. When updateController() finishes running, this flag
+ // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once
+ // it is closer to steady state.
+ bool _updateScheduled = false;
size_t _created = 0;
@@ -657,12 +648,10 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle {
auto deleter = [ this, anchor = shared_from_this() ](ConnectionInterface * connection) {
- runOnExecutor([this, connection]() {
- returnConnection(connection);
-
- _lastActiveTime = _parent->_factory->now();
- updateState();
- });
+ stdx::lock_guard lk(_parent->_mutex);
+ returnConnection(connection);
+ _lastActiveTime = _parent->_factory->now();
+ updateState();
};
return ConnectionHandle(connection, std::move(deleter));
}
@@ -890,13 +879,6 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status) {
// fulfills as many outstanding requests as possible
void ConnectionPool::SpecificPool::fulfillRequests() {
- // If some other thread (possibly this thread) is fulfilling requests,
- // don't keep padding the callstack.
- if (_inFulfillRequests)
- return;
-
- _inFulfillRequests = true;
- auto guard = makeGuard([&] { _inFulfillRequests = false; });
while (_requests.size()) {
// Marking this as our newest active time
_lastActiveTime = _parent->_factory->now();
@@ -1065,10 +1047,9 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
}
void ConnectionPool::SpecificPool::updateController() {
- if (std::exchange(_inControlLoop, true)) {
+ if (_health.isShutdown) {
return;
}
- const auto guard = makeGuard([&] { _inControlLoop = false; });
auto& controller = *_parent->_controller;
@@ -1116,7 +1097,7 @@ void ConnectionPool::SpecificPool::updateController() {
}
}
- runOnExecutor([this]() { spawnConnections(); });
+ spawnConnections();
}
// Updates our state and manages the request timer
@@ -1129,7 +1110,19 @@ void ConnectionPool::SpecificPool::updateState() {
updateEventTimer();
updateHealth();
- updateController();
+
+ if (std::exchange(_updateScheduled, true)) {
+ return;
+ }
+
+ ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) //
+ .getAsync([ this, anchor = shared_from_this() ](Status && status) mutable {
+ invariant(status);
+
+ stdx::lock_guard lk(_parent->_mutex);
+ _updateScheduled = false;
+ updateController();
+ });
}
} // namespace executor
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index cf616c7cb67..c47641441ef 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -81,26 +81,31 @@ protected:
.semi();
}
+ void doneWith(ConnectionPool::ConnectionHandle& conn) {
+ dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess();
+
+ ExecutorFuture(_executor).getAsync([conn = std::move(conn)](auto){});
+ }
+
+ using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>;
+
+ auto getId(const ConnectionPool::ConnectionHandle& conn) {
+ return dynamic_cast<ConnectionImpl*>(conn.get())->id();
+ }
+ auto verifyAndGetId(StatusWithConn& swConn) {
+ ASSERT(swConn.isOK());
+ auto& conn = swConn.getValue();
+ return getId(conn);
+ }
+
+ template <typename Ptr>
+ void dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t);
+
private:
std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>();
std::shared_ptr<ConnectionPool> _pool;
};
-void doneWith(const ConnectionPool::ConnectionHandle& conn) {
- dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess();
-}
-
-using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>;
-
-auto getId(const ConnectionPool::ConnectionHandle& conn) {
- return dynamic_cast<ConnectionImpl*>(conn.get())->id();
-}
-auto verifyAndGetId(StatusWithConn& swConn) {
- ASSERT(swConn.isOK());
- auto& conn = swConn.getValue();
- return getId(conn);
-}
-
/**
* Verify that we get the same connection if we grab one, return it and grab
* another.
@@ -154,7 +159,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
try {
ConnectionPool::ConnectionHandle conn = std::move(connections.back());
connections.pop_back();
- conn->indicateSuccess();
+ doneWith(conn);
} catch (...) {
}
}
@@ -182,7 +187,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
ConnectionPool::ConnectionHandle conn = std::move(connections.back());
connections.pop_back();
ids.push(static_cast<ConnectionImpl*>(conn.get())->id());
- conn->indicateSuccess();
+ doneWith(conn);
}
ASSERT_EQ(ids.size(), kSize);
@@ -230,7 +235,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
try {
ConnectionPool::ConnectionHandle conn = std::move(connections.back());
connections.pop_back();
- conn->indicateSuccess();
+ doneWith(conn);
} catch (...) {
}
}
@@ -264,7 +269,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
while (!connections.empty()) {
ConnectionPool::ConnectionHandle conn = std::move(connections.back());
connections.pop_back();
- conn->indicateSuccess();
+ doneWith(conn);
}
// Advance the time, but not enough to age out connections. We should still have them all.
@@ -293,7 +298,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
while (!connections.empty()) {
ConnectionPool::ConnectionHandle conn = std::move(connections.back());
connections.pop_back();
- conn->indicateSuccess();
+ doneWith(conn);
}
// We should still have all of them in the pool
@@ -585,7 +590,6 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) {
ASSERT(!reachedA);
doneWith(conn);
- conn.reset();
// Now that we've returned the connection, we see the second has been
// called
@@ -641,7 +645,6 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) {
// Return 1
ConnectionPool::ConnectionInterface* conn1Ptr = conn1.get();
doneWith(conn1);
- conn1.reset();
// Verify that it's the one that pops out for request 3
ASSERT_EQ(conn1Ptr, conn3.get());
@@ -936,15 +939,12 @@ TEST_F(ConnectionPoolTest, minPoolRespected) {
// Return each connection over 1, 2 and 3 ms
PoolImpl::setNow(now + Milliseconds(1));
doneWith(conn1);
- conn1.reset();
PoolImpl::setNow(now + Milliseconds(2));
doneWith(conn2);
- conn2.reset();
PoolImpl::setNow(now + Milliseconds(3));
doneWith(conn3);
- conn3.reset();
// Jump 5 seconds and verify that refreshes only two refreshes occurred
PoolImpl::setNow(now + Milliseconds(5000));
@@ -1138,7 +1138,6 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
// return conn 1
doneWith(conn1);
- conn1.reset();
// expire the pool
PoolImpl::setNow(now + Milliseconds(2000));
@@ -1214,7 +1213,6 @@ TEST_F(ConnectionPoolTest, dropConnections) {
// return the connection
doneWith(handle);
- handle.reset();
// Make sure that a new connection request properly disposed of the gen1
// connection
@@ -1347,7 +1345,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
}
template <typename Ptr>
-void dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t) {
+void ConnectionPoolTest::dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t) {
auto now = Date_t::now();
PoolImpl::setNow(now);
@@ -1488,7 +1486,6 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
connId1 = getId(conn1);
doneWith(conn1);
- conn1.reset();
ASSERT(connId1);
// Since the third future has a smaller timeout than the second,
@@ -1504,7 +1501,6 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
connId3 = getId(conn3);
doneWith(conn3);
- conn3.reset();
// The second future is now finally ready
ASSERT_TRUE(connFuture2.isReady());