diff options
author | Jason Carey <jcarey@argv.me> | 2017-10-05 16:33:58 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2017-11-07 17:13:20 -0500 |
commit | c3e174cab7b8e4a19772746942c7e68daa53bc5e (patch) | |
tree | d4e7efa9d51f2c07b4ca60f9472f3a4c43f25a77 /src/mongo/executor | |
parent | 5476e222a170d704cc97d1ed9e14c40ae2ec9d91 (diff) | |
download | mongo-c3e174cab7b8e4a19772746942c7e68daa53bc5e.tar.gz |
SERVER-31440 Fix Connpool HostTimeout race
The executor connection pool host timeout is racy with respect to other
code that's unlocked the parent mutex to allow for callback execution.
While effort was spent to protect against background threads with active
requests and those participating in refresh, after those tasks have been
executed we race with callbacks in how quickly they can return. When we
lose that race, we destroy the specific pool out from under those
callbacks.
Fix that by adding an ActiveClient wrapper that ensures a refcount on
the specific pool is increased for the lifetime of those calls.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 247 |
1 files changed, 148 insertions, 99 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 475f3405fb6..880edda2879 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -61,6 +61,45 @@ namespace executor { */ class ConnectionPool::SpecificPool { public: + /** + * These active client methods must be used whenever entering a specific pool outside of the + * shutdown background task. The presence of an active client will bump a counter on the + * specific pool which will prevent the shutdown thread from deleting it. + * + * The complexity comes from the need to hold a lock when writing to the + * _activeClients param on the specific pool. Because the code beneath the client needs to lock + * and unlock the parent mutex (and can leave unlocked), we want to start the client with the + * lock acquired, move it into the client, then re-acquire to decrement the counter on the way + * out. + * + * It's used like: + * + * pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); }); + */ + template <typename Callback> + void runWithActiveClient(Callback&& cb) { + runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex), + std::forward<Callback>(cb)); + } + + template <typename Callback> + void runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { + invariant(lk.owns_lock()); + + _activeClients++; + + const auto guard = MakeGuard([&] { + invariant(!lk.owns_lock()); + stdx::lock_guard<stdx::mutex> lk(_parent->_mutex); + _activeClients--; + }); + + { + decltype(lk) localLk(std::move(lk)); + cb(std::move(localLk)); + } + } + SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort); ~SpecificPool(); @@ -154,6 +193,7 @@ private: std::unique_ptr<TimerInterface> _requestTimer; Date_t _requestTimerExpiration; + size_t _activeClients; size_t _generation; bool _inFulfillRequests; bool _inSpawnConnections; @@ -211,8 +251,11 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) { if (iter == _pools.end()) return; - iter->second.get()->processFailure( - Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk)); + iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { + iter->second->processFailure( + Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), + std::move(lk)); + }); } void ConnectionPool::get(const HostAndPort& hostAndPort, @@ -234,7 +277,9 @@ void ConnectionPool::get(const HostAndPort& hostAndPort, invariant(pool); - pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb)); + pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { + pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb)); + }); } void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const { @@ -269,7 +314,9 @@ void ConnectionPool::returnConnection(ConnectionInterface* conn) { invariant(iter != _pools.end()); - iter->second.get()->returnConnection(conn, std::move(lk)); + iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { + iter->second->returnConnection(conn, std::move(lk)); + }); } ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort) @@ -277,6 +324,7 @@ ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAnd _hostAndPort(hostAndPort), _readyPool(std::numeric_limits<size_t>::max()), _requestTimer(parent->_factory->makeTimer()), + _activeClients(0), _generation(0), _inFulfillRequests(false), _inSpawnConnections(false), @@ -368,47 +416,48 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // Unlock in case refresh can occur immediately lk.unlock(); - connPtr->refresh(_parent->_options.refreshTimeout, - [this](ConnectionInterface* connPtr, Status status) { - connPtr->indicateUsed(); - - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - - auto conn = takeFromProcessingPool(connPtr); - - // If the host and port were dropped, let this lapse - if (conn->getGeneration() != _generation) { - spawnConnections(lk); - return; - } - - // If we're in shutdown, we don't need refreshed connections - if (_state == State::kInShutdown) - return; - - // If the connection refreshed successfully, throw it back in the ready - // pool - if (status.isOK()) { - addToReady(lk, std::move(conn)); - spawnConnections(lk); - return; - } - - // If we've exceeded the time limit, start a new connect, rather than - // failing all operations. We do this because the various callers have - // their own time limit which is unrelated to our internal one. - if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { - log() << "Pending connection to host " << _hostAndPort - << " did not complete within the connection timeout," - << " retrying with a new connection;" << openConnections(lk) - << " connections to that host remain open"; - spawnConnections(lk); - return; - } - - // Otherwise pass the failure on through - processFailure(status, std::move(lk)); - }); + connPtr->refresh( + _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) { + connPtr->indicateUsed(); + + runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { + auto conn = takeFromProcessingPool(connPtr); + + // If the host and port were dropped, let this lapse + if (conn->getGeneration() != _generation) { + spawnConnections(lk); + return; + } + + // If we're in shutdown, we don't need refreshed connections + if (_state == State::kInShutdown) + return; + + // If the connection refreshed successfully, throw it back in + // the ready pool + if (status.isOK()) { + addToReady(lk, std::move(conn)); + spawnConnections(lk); + return; + } + + // If we've exceeded the time limit, start a new connect, + // rather than failing all operations. We do this because the + // various callers have their own time limit which is unrelated + // to our internal one. + if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { + log() << "Pending connection to host " << _hostAndPort + << " did not complete within the connection timeout," + << " retrying with a new connection;" << openConnections(lk) + << " connections to that host remain open"; + spawnConnections(lk); + return; + } + + // Otherwise pass the failure on through + processFailure(status, std::move(lk)); + }); + }); lk.lock(); } else { // If it's fine as it is, just put it in the ready queue @@ -432,25 +481,25 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk connPtr->setTimeout(_parent->_options.refreshRequirement, [this, connPtr]() { OwnedConnection conn; - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - - if (!_readyPool.count(connPtr)) { - // We've already been checked out. We don't need to refresh - // ourselves. - return; - } + runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { + if (!_readyPool.count(connPtr)) { + // We've already been checked out. We don't need to refresh + // ourselves. + return; + } - conn = takeFromPool(_readyPool, connPtr); + conn = takeFromPool(_readyPool, connPtr); - // If we're in shutdown, we don't need to refresh connections - if (_state == State::kInShutdown) - return; + // If we're in shutdown, we don't need to refresh connections + if (_state == State::kInShutdown) + return; - _checkedOutPool[connPtr] = std::move(conn); + _checkedOutPool[connPtr] = std::move(conn); - connPtr->indicateSuccess(); + connPtr->indicateSuccess(); - returnConnection(connPtr, std::move(lk)); + returnConnection(connPtr, std::move(lk)); + }); }); fulfillRequests(lk); @@ -594,26 +643,26 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) { connPtr->indicateUsed(); - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - - auto conn = takeFromProcessingPool(connPtr); - - if (conn->getGeneration() != _generation) { - // If the host and port was dropped, let the - // connection lapse - spawnConnections(lk); - } else if (status.isOK()) { - addToReady(lk, std::move(conn)); - spawnConnections(lk); - } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { - // If we've exceeded the time limit, restart the connect, rather than - // failing all operations. We do this because the various callers - // have their own time limit which is unrelated to our internal one. - spawnConnections(lk); - } else { - // If the setup failed, cascade the failure edge - processFailure(status, std::move(lk)); - } + runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { + auto conn = takeFromProcessingPool(connPtr); + + if (conn->getGeneration() != _generation) { + // If the host and port was dropped, let the + // connection lapse + spawnConnections(lk); + } else if (status.isOK()) { + addToReady(lk, std::move(conn)); + spawnConnections(lk); + } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { + // If we've exceeded the time limit, restart the connect, rather than + // failing all operations. We do this because the various callers + // have their own time limit which is unrelated to our internal one. + spawnConnections(lk); + } else { + // If the setup failed, cascade the failure edge + processFailure(status, std::move(lk)); + } + }); }); // Note that this assumes that the refreshTimeout is sound for the // setupTimeout @@ -649,7 +698,7 @@ void ConnectionPool::SpecificPool::shutdown() { // If we have processing connections, wait for them to finish or timeout // before shutdown - if (_processingPool.size() || _droppedProcessingPool.size()) { + if (_processingPool.size() || _droppedProcessingPool.size() || _activeClients) { _requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); }); return; @@ -702,27 +751,27 @@ void ConnectionPool::SpecificPool::updateStateInLock() { // We set a timer for the most recent request, then invoke each timed // out request we couldn't service _requestTimer->setTimeout(timeout, [this]() { - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - - auto now = _parent->_factory->now(); - - while (_requests.size()) { - auto& x = _requests.top(); - - if (x.first <= now) { - auto cb = std::move(x.second); - _requests.pop(); - - lk.unlock(); - cb(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, - "Couldn't get a connection within the time limit")); - lk.lock(); - } else { - break; + runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { + auto now = _parent->_factory->now(); + + while (_requests.size()) { + auto& x = _requests.top(); + + if (x.first <= now) { + auto cb = std::move(x.second); + _requests.pop(); + + lk.unlock(); + cb(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, + "Couldn't get a connection within the time limit")); + lk.lock(); + } else { + break; + } } - } - updateStateInLock(); + updateStateInLock(); + }); }); } else if (_checkedOutPool.size()) { // If we have no requests, but someone's using a connection, we just |