diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 247 |
1 files changed, 152 insertions, 95 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 79461a9a5c1..5a2e90f9178 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -32,8 +32,8 @@ #include "mongo/executor/connection_pool.h" -#include "mongo/executor/connection_pool_stats.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/remote_command_request.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" @@ -59,6 +59,45 @@ namespace executor { */ class ConnectionPool::SpecificPool { public: + /** + * These active client methods must be used whenever entering a specific pool outside of the + * shutdown background task. The presence of an active client will bump a counter on the + * specific pool which will prevent the shutdown thread from deleting it. + * + * The complexity comes from the need to hold a lock when writing to the + * _activeClients param on the specific pool. Because the code beneath the client needs to lock + * and unlock the parent mutex (and can leave unlocked), we want to start the client with the + * lock acquired, move it into the client, then re-acquire to decrement the counter on the way + * out. + * + * It's used like: + * + * pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); }); + */ + template <typename Callback> + void runWithActiveClient(Callback&& cb) { + runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex), + std::forward<Callback>(cb)); + } + + template <typename Callback> + void runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { + invariant(lk.owns_lock()); + + _activeClients++; + + const auto guard = MakeGuard([&] { + invariant(!lk.owns_lock()); + stdx::lock_guard<stdx::mutex> lk(_parent->_mutex); + _activeClients--; + }); + + { + decltype(lk) localLk(std::move(lk)); + cb(std::move(localLk)); + } + } + SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort); ~SpecificPool(); @@ -148,6 +187,7 @@ private: std::unique_ptr<TimerInterface> _requestTimer; Date_t _requestTimerExpiration; + size_t _activeClients; size_t _generation; bool _inFulfillRequests; bool _inSpawnConnections; @@ -205,8 +245,13 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) { if (iter == _pools.end()) return; - iter->second.get()->processFailure( - Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk)); + iter->second->runWithActiveClient( + std::move(lk), + [&](decltype(lk) lk) { + iter->second->processFailure( + Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), + std::move(lk)); + }); } void ConnectionPool::get(const HostAndPort& hostAndPort, @@ -228,7 +273,11 @@ void ConnectionPool::get(const HostAndPort& hostAndPort, invariant(pool); - pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb)); + pool->runWithActiveClient(std::move(lk), + [&](decltype(lk) lk) { + pool->getConnection( + hostAndPort, timeout, std::move(lk), std::move(cb)); + }); } void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const { @@ -263,13 +312,16 @@ void ConnectionPool::returnConnection(ConnectionInterface* conn) { invariant(iter != _pools.end()); - iter->second.get()->returnConnection(conn, std::move(lk)); + iter->second->runWithActiveClient( + std::move(lk), + [&](decltype(lk) lk) { iter->second->returnConnection(conn, std::move(lk)); }); } ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort) : _parent(parent), _hostAndPort(hostAndPort), _requestTimer(parent->_factory->makeTimer()), + _activeClients(0), _generation(0), _inFulfillRequests(false), _inSpawnConnections(false), @@ -365,42 +417,44 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr [this](ConnectionInterface* connPtr, Status status) { connPtr->indicateUsed(); - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - - auto conn = takeFromProcessingPool(connPtr); - - // If the host and port were dropped, let this lapse - if (conn->getGeneration() != _generation) { - spawnConnections(lk); - return; - } - - // If we're in shutdown, we don't need refreshed connections - if (_state == State::kInShutdown) - return; - - // If the connection refreshed successfully, throw it back in the ready - // pool - if (status.isOK()) { - addToReady(lk, std::move(conn)); - spawnConnections(lk); - return; - } - - // If we've exceeded the time limit, start a new connect, rather than - // failing all operations. We do this because the various callers have - // their own time limit which is unrelated to our internal one. - if (status.code() == ErrorCodes::ExceededTimeLimit) { - log() << "Pending connection to host " << _hostAndPort - << " did not complete within the connection timeout," - << " retrying with a new connection;" << openConnections(lk) - << " connections to that host remain open"; - spawnConnections(lk); - return; - } - - // Otherwise pass the failure on through - processFailure(status, std::move(lk)); + runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { + auto conn = takeFromProcessingPool(connPtr); + + // If the host and port were dropped, let this lapse + if (conn->getGeneration() != _generation) { + spawnConnections(lk); + return; + } + + // If we're in shutdown, we don't need refreshed connections + if (_state == State::kInShutdown) + return; + + // If the connection refreshed successfully, throw it back in + // the ready pool + if (status.isOK()) { + addToReady(lk, std::move(conn)); + spawnConnections(lk); + return; + } + + // If we've exceeded the time limit, start a new connect, + // rather than failing all operations. We do this because the + // various callers have their own time limit which is unrelated + // to our internal one. + if (status.code() == ErrorCodes::ExceededTimeLimit) { + log() << "Pending connection to host " << _hostAndPort + << " did not complete within the connection timeout," + << " retrying with a new connection;" + << openConnections(lk) + << " connections to that host remain open"; + spawnConnections(lk); + return; + } + + // Otherwise pass the failure on through + processFailure(status, std::move(lk)); + }); }); lk.lock(); } else { @@ -425,25 +479,25 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk [this, connPtr]() { OwnedConnection conn; - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); + runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { + if (!_readyPool.count(connPtr)) { + // We've already been checked out. We don't need to refresh + // ourselves. + return; + } - if (!_readyPool.count(connPtr)) { - // We've already been checked out. We don't need to refresh - // ourselves. - return; - } + conn = takeFromPool(_readyPool, connPtr); - conn = takeFromPool(_readyPool, connPtr); + // If we're in shutdown, we don't need to refresh connections + if (_state == State::kInShutdown) + return; - // If we're in shutdown, we don't need to refresh connections - if (_state == State::kInShutdown) - return; + _checkedOutPool[connPtr] = std::move(conn); - _checkedOutPool[connPtr] = std::move(conn); + connPtr->indicateSuccess(); - connPtr->indicateSuccess(); - - returnConnection(connPtr, std::move(lk)); + returnConnection(connPtr, std::move(lk)); + }); }); fulfillRequests(lk); @@ -586,26 +640,29 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute [this](ConnectionInterface* connPtr, Status status) { connPtr->indicateUsed(); - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - - auto conn = takeFromProcessingPool(connPtr); - - if (conn->getGeneration() != _generation) { - // If the host and port was dropped, let the - // connection lapse - spawnConnections(lk); - } else if (status.isOK()) { - addToReady(lk, std::move(conn)); - spawnConnections(lk); - } else if (status.code() == ErrorCodes::ExceededTimeLimit) { - // If we've exceeded the time limit, restart the connect, rather than - // failing all operations. We do this because the various callers - // have their own time limit which is unrelated to our internal one. - spawnConnections(lk); - } else { - // If the setup failed, cascade the failure edge - processFailure(status, std::move(lk)); - } + runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { + auto conn = takeFromProcessingPool(connPtr); + + if (conn->getGeneration() != _generation) { + // If the host and port was dropped, let the + // connection lapse + spawnConnections(lk); + } else if (status.isOK()) { + addToReady(lk, std::move(conn)); + spawnConnections(lk); + } else if (status.code() == ErrorCodes::ExceededTimeLimit) { + // If we've exceeded the time limit, restart the connect, rather + // than + // failing all operations. We do this because the various + // callers + // have their own time limit which is unrelated to our internal + // one. + spawnConnections(lk); + } else { + // If the setup failed, cascade the failure edge + processFailure(status, std::move(lk)); + } + }); }); // Note that this assumes that the refreshTimeout is sound for the // setupTimeout @@ -641,7 +698,7 @@ void ConnectionPool::SpecificPool::shutdown() { // If we have processing connections, wait for them to finish or timeout // before shutdown - if (_processingPool.size() || _droppedProcessingPool.size()) { + if (_processingPool.size() || _droppedProcessingPool.size() || _activeClients) { _requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); }); return; @@ -695,27 +752,27 @@ void ConnectionPool::SpecificPool::updateStateInLock() { _requestTimer->setTimeout( timeout, [this]() { - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - - auto now = _parent->_factory->now(); - - while (_requests.size()) { - auto& x = _requests.top(); - - if (x.first <= now) { - auto cb = std::move(x.second); - _requests.pop(); - - lk.unlock(); - cb(Status(ErrorCodes::ExceededTimeLimit, - "Couldn't get a connection within the time limit")); - lk.lock(); - } else { - break; + runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { + auto now = _parent->_factory->now(); + + while (_requests.size()) { + auto& x = _requests.top(); + + if (x.first <= now) { + auto cb = std::move(x.second); + _requests.pop(); + + lk.unlock(); + cb(Status(ErrorCodes::ExceededTimeLimit, + "Couldn't get a connection within the time limit")); + lk.lock(); + } else { + break; + } } - } - updateStateInLock(); + updateStateInLock(); + }); }); } else if (_checkedOutPool.size()) { // If we have no requests, but someone's using a connection, we just |