summaryrefslogtreecommitdiff
path: root/src/mongo/executor/connection_pool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor/connection_pool.cpp')
-rw-r--r--src/mongo/executor/connection_pool.cpp247
1 files changed, 152 insertions, 95 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 79461a9a5c1..5a2e90f9178 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -32,8 +32,8 @@
#include "mongo/executor/connection_pool.h"
-#include "mongo/executor/connection_pool_stats.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
@@ -59,6 +59,45 @@ namespace executor {
*/
class ConnectionPool::SpecificPool {
public:
+ /**
+ * These active client methods must be used whenever entering a specific pool outside of the
+ * shutdown background task. The presence of an active client will bump a counter on the
+ * specific pool which will prevent the shutdown thread from deleting it.
+ *
+ * The complexity comes from the need to hold a lock when writing to the
+ * _activeClients param on the specific pool. Because the code beneath the client needs to lock
+ * and unlock the parent mutex (and can leave unlocked), we want to start the client with the
+ * lock acquired, move it into the client, then re-acquire to decrement the counter on the way
+ * out.
+ *
+ * It's used like:
+ *
+ * pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); });
+ */
+ template <typename Callback>
+ void runWithActiveClient(Callback&& cb) {
+ runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex),
+ std::forward<Callback>(cb));
+ }
+
+ template <typename Callback>
+ void runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
+ invariant(lk.owns_lock());
+
+ _activeClients++;
+
+ const auto guard = MakeGuard([&] {
+ invariant(!lk.owns_lock());
+ stdx::lock_guard<stdx::mutex> lk(_parent->_mutex);
+ _activeClients--;
+ });
+
+ {
+ decltype(lk) localLk(std::move(lk));
+ cb(std::move(localLk));
+ }
+ }
+
SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort);
~SpecificPool();
@@ -148,6 +187,7 @@ private:
std::unique_ptr<TimerInterface> _requestTimer;
Date_t _requestTimerExpiration;
+ size_t _activeClients;
size_t _generation;
bool _inFulfillRequests;
bool _inSpawnConnections;
@@ -205,8 +245,13 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
if (iter == _pools.end())
return;
- iter->second.get()->processFailure(
- Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk));
+ iter->second->runWithActiveClient(
+ std::move(lk),
+ [&](decltype(lk) lk) {
+ iter->second->processFailure(
+ Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"),
+ std::move(lk));
+ });
}
void ConnectionPool::get(const HostAndPort& hostAndPort,
@@ -228,7 +273,11 @@ void ConnectionPool::get(const HostAndPort& hostAndPort,
invariant(pool);
- pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb));
+ pool->runWithActiveClient(std::move(lk),
+ [&](decltype(lk) lk) {
+ pool->getConnection(
+ hostAndPort, timeout, std::move(lk), std::move(cb));
+ });
}
void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const {
@@ -263,13 +312,16 @@ void ConnectionPool::returnConnection(ConnectionInterface* conn) {
invariant(iter != _pools.end());
- iter->second.get()->returnConnection(conn, std::move(lk));
+ iter->second->runWithActiveClient(
+ std::move(lk),
+ [&](decltype(lk) lk) { iter->second->returnConnection(conn, std::move(lk)); });
}
ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort)
: _parent(parent),
_hostAndPort(hostAndPort),
_requestTimer(parent->_factory->makeTimer()),
+ _activeClients(0),
_generation(0),
_inFulfillRequests(false),
_inSpawnConnections(false),
@@ -365,42 +417,44 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
[this](ConnectionInterface* connPtr, Status status) {
connPtr->indicateUsed();
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
-
- auto conn = takeFromProcessingPool(connPtr);
-
- // If the host and port were dropped, let this lapse
- if (conn->getGeneration() != _generation) {
- spawnConnections(lk);
- return;
- }
-
- // If we're in shutdown, we don't need refreshed connections
- if (_state == State::kInShutdown)
- return;
-
- // If the connection refreshed successfully, throw it back in the ready
- // pool
- if (status.isOK()) {
- addToReady(lk, std::move(conn));
- spawnConnections(lk);
- return;
- }
-
- // If we've exceeded the time limit, start a new connect, rather than
- // failing all operations. We do this because the various callers have
- // their own time limit which is unrelated to our internal one.
- if (status.code() == ErrorCodes::ExceededTimeLimit) {
- log() << "Pending connection to host " << _hostAndPort
- << " did not complete within the connection timeout,"
- << " retrying with a new connection;" << openConnections(lk)
- << " connections to that host remain open";
- spawnConnections(lk);
- return;
- }
-
- // Otherwise pass the failure on through
- processFailure(status, std::move(lk));
+ runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If the host and port were dropped, let this lapse
+ if (conn->getGeneration() != _generation) {
+ spawnConnections(lk);
+ return;
+ }
+
+ // If we're in shutdown, we don't need refreshed connections
+ if (_state == State::kInShutdown)
+ return;
+
+ // If the connection refreshed successfully, throw it back in
+ // the ready pool
+ if (status.isOK()) {
+ addToReady(lk, std::move(conn));
+ spawnConnections(lk);
+ return;
+ }
+
+ // If we've exceeded the time limit, start a new connect,
+ // rather than failing all operations. We do this because the
+ // various callers have their own time limit which is unrelated
+ // to our internal one.
+ if (status.code() == ErrorCodes::ExceededTimeLimit) {
+ log() << "Pending connection to host " << _hostAndPort
+ << " did not complete within the connection timeout,"
+ << " retrying with a new connection;"
+ << openConnections(lk)
+ << " connections to that host remain open";
+ spawnConnections(lk);
+ return;
+ }
+
+ // Otherwise pass the failure on through
+ processFailure(status, std::move(lk));
+ });
});
lk.lock();
} else {
@@ -425,25 +479,25 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
[this, connPtr]() {
OwnedConnection conn;
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
+ runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
+ if (!_readyPool.count(connPtr)) {
+ // We've already been checked out. We don't need to refresh
+ // ourselves.
+ return;
+ }
- if (!_readyPool.count(connPtr)) {
- // We've already been checked out. We don't need to refresh
- // ourselves.
- return;
- }
+ conn = takeFromPool(_readyPool, connPtr);
- conn = takeFromPool(_readyPool, connPtr);
+ // If we're in shutdown, we don't need to refresh connections
+ if (_state == State::kInShutdown)
+ return;
- // If we're in shutdown, we don't need to refresh connections
- if (_state == State::kInShutdown)
- return;
+ _checkedOutPool[connPtr] = std::move(conn);
- _checkedOutPool[connPtr] = std::move(conn);
+ connPtr->indicateSuccess();
- connPtr->indicateSuccess();
-
- returnConnection(connPtr, std::move(lk));
+ returnConnection(connPtr, std::move(lk));
+ });
});
fulfillRequests(lk);
@@ -586,26 +640,29 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
[this](ConnectionInterface* connPtr, Status status) {
connPtr->indicateUsed();
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
-
- auto conn = takeFromProcessingPool(connPtr);
-
- if (conn->getGeneration() != _generation) {
- // If the host and port was dropped, let the
- // connection lapse
- spawnConnections(lk);
- } else if (status.isOK()) {
- addToReady(lk, std::move(conn));
- spawnConnections(lk);
- } else if (status.code() == ErrorCodes::ExceededTimeLimit) {
- // If we've exceeded the time limit, restart the connect, rather than
- // failing all operations. We do this because the various callers
- // have their own time limit which is unrelated to our internal one.
- spawnConnections(lk);
- } else {
- // If the setup failed, cascade the failure edge
- processFailure(status, std::move(lk));
- }
+ runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ if (conn->getGeneration() != _generation) {
+ // If the host and port was dropped, let the
+ // connection lapse
+ spawnConnections(lk);
+ } else if (status.isOK()) {
+ addToReady(lk, std::move(conn));
+ spawnConnections(lk);
+ } else if (status.code() == ErrorCodes::ExceededTimeLimit) {
+ // If we've exceeded the time limit, restart the connect, rather
+ // than
+ // failing all operations. We do this because the various
+ // callers
+ // have their own time limit which is unrelated to our internal
+ // one.
+ spawnConnections(lk);
+ } else {
+ // If the setup failed, cascade the failure edge
+ processFailure(status, std::move(lk));
+ }
+ });
});
// Note that this assumes that the refreshTimeout is sound for the
// setupTimeout
@@ -641,7 +698,7 @@ void ConnectionPool::SpecificPool::shutdown() {
// If we have processing connections, wait for them to finish or timeout
// before shutdown
- if (_processingPool.size() || _droppedProcessingPool.size()) {
+ if (_processingPool.size() || _droppedProcessingPool.size() || _activeClients) {
_requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); });
return;
@@ -695,27 +752,27 @@ void ConnectionPool::SpecificPool::updateStateInLock() {
_requestTimer->setTimeout(
timeout,
[this]() {
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
-
- auto now = _parent->_factory->now();
-
- while (_requests.size()) {
- auto& x = _requests.top();
-
- if (x.first <= now) {
- auto cb = std::move(x.second);
- _requests.pop();
-
- lk.unlock();
- cb(Status(ErrorCodes::ExceededTimeLimit,
- "Couldn't get a connection within the time limit"));
- lk.lock();
- } else {
- break;
+ runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
+ auto now = _parent->_factory->now();
+
+ while (_requests.size()) {
+ auto& x = _requests.top();
+
+ if (x.first <= now) {
+ auto cb = std::move(x.second);
+ _requests.pop();
+
+ lk.unlock();
+ cb(Status(ErrorCodes::ExceededTimeLimit,
+ "Couldn't get a connection within the time limit"));
+ lk.lock();
+ } else {
+ break;
+ }
}
- }
- updateStateInLock();
+ updateStateInLock();
+ });
});
} else if (_checkedOutPool.size()) {
// If we have no requests, but someone's using a connection, we just