summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2017-11-16 14:00:07 -0500
committerJason Carey <jcarey@argv.me>2017-11-16 14:06:25 -0500
commite8438a65d647004e94021e9b25087c7f1aac59f8 (patch)
tree87a0b710dd608bdebd2e746138b75a91f79b0cc3
parent1bfe7521da868638a8e88a47b3625e6ffc2336d6 (diff)
downloadmongo-e8438a65d647004e94021e9b25087c7f1aac59f8.tar.gz
SERVER-31440 Fix Connpool HostTimeout race
The executor connection pool host timeout is racy with respect to other code that's unlocked the parent mutex to allow for callback execution. While effort was spent to protect against background threads with active requests and those participating in refresh, after those tasks have been executed we race with callbacks in how quickly they can return. When we lose that race, we destroy the specific pool out from under those callbacks. Fix that by adding an ActiveClient wrapper that ensures a refcount on the specific pool is increased for the lifetime of those calls. (cherry picked from commit c3e174cab7b8e4a19772746942c7e68daa53bc5e)
-rw-r--r--src/mongo/executor/connection_pool.cpp247
1 files changed, 152 insertions, 95 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 79461a9a5c1..5a2e90f9178 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -32,8 +32,8 @@
#include "mongo/executor/connection_pool.h"
-#include "mongo/executor/connection_pool_stats.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
@@ -59,6 +59,45 @@ namespace executor {
*/
class ConnectionPool::SpecificPool {
public:
+ /**
+ * These active client methods must be used whenever entering a specific pool outside of the
+ * shutdown background task. The presence of an active client will bump a counter on the
+ * specific pool which will prevent the shutdown thread from deleting it.
+ *
+ * The complexity comes from the need to hold a lock when writing to the
+ * _activeClients param on the specific pool. Because the code beneath the client needs to lock
+ * and unlock the parent mutex (and can leave unlocked), we want to start the client with the
+ * lock acquired, move it into the client, then re-acquire to decrement the counter on the way
+ * out.
+ *
+ * It's used like:
+ *
+ * pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); });
+ */
+ template <typename Callback>
+ void runWithActiveClient(Callback&& cb) {
+ runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex),
+ std::forward<Callback>(cb));
+ }
+
+ template <typename Callback>
+ void runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
+ invariant(lk.owns_lock());
+
+ _activeClients++;
+
+ const auto guard = MakeGuard([&] {
+ invariant(!lk.owns_lock());
+ stdx::lock_guard<stdx::mutex> lk(_parent->_mutex);
+ _activeClients--;
+ });
+
+ {
+ decltype(lk) localLk(std::move(lk));
+ cb(std::move(localLk));
+ }
+ }
+
SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort);
~SpecificPool();
@@ -148,6 +187,7 @@ private:
std::unique_ptr<TimerInterface> _requestTimer;
Date_t _requestTimerExpiration;
+ size_t _activeClients;
size_t _generation;
bool _inFulfillRequests;
bool _inSpawnConnections;
@@ -205,8 +245,13 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
if (iter == _pools.end())
return;
- iter->second.get()->processFailure(
- Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk));
+ iter->second->runWithActiveClient(
+ std::move(lk),
+ [&](decltype(lk) lk) {
+ iter->second->processFailure(
+ Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"),
+ std::move(lk));
+ });
}
void ConnectionPool::get(const HostAndPort& hostAndPort,
@@ -228,7 +273,11 @@ void ConnectionPool::get(const HostAndPort& hostAndPort,
invariant(pool);
- pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb));
+ pool->runWithActiveClient(std::move(lk),
+ [&](decltype(lk) lk) {
+ pool->getConnection(
+ hostAndPort, timeout, std::move(lk), std::move(cb));
+ });
}
void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const {
@@ -263,13 +312,16 @@ void ConnectionPool::returnConnection(ConnectionInterface* conn) {
invariant(iter != _pools.end());
- iter->second.get()->returnConnection(conn, std::move(lk));
+ iter->second->runWithActiveClient(
+ std::move(lk),
+ [&](decltype(lk) lk) { iter->second->returnConnection(conn, std::move(lk)); });
}
ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort)
: _parent(parent),
_hostAndPort(hostAndPort),
_requestTimer(parent->_factory->makeTimer()),
+ _activeClients(0),
_generation(0),
_inFulfillRequests(false),
_inSpawnConnections(false),
@@ -365,42 +417,44 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
[this](ConnectionInterface* connPtr, Status status) {
connPtr->indicateUsed();
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
-
- auto conn = takeFromProcessingPool(connPtr);
-
- // If the host and port were dropped, let this lapse
- if (conn->getGeneration() != _generation) {
- spawnConnections(lk);
- return;
- }
-
- // If we're in shutdown, we don't need refreshed connections
- if (_state == State::kInShutdown)
- return;
-
- // If the connection refreshed successfully, throw it back in the ready
- // pool
- if (status.isOK()) {
- addToReady(lk, std::move(conn));
- spawnConnections(lk);
- return;
- }
-
- // If we've exceeded the time limit, start a new connect, rather than
- // failing all operations. We do this because the various callers have
- // their own time limit which is unrelated to our internal one.
- if (status.code() == ErrorCodes::ExceededTimeLimit) {
- log() << "Pending connection to host " << _hostAndPort
- << " did not complete within the connection timeout,"
- << " retrying with a new connection;" << openConnections(lk)
- << " connections to that host remain open";
- spawnConnections(lk);
- return;
- }
-
- // Otherwise pass the failure on through
- processFailure(status, std::move(lk));
+ runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If the host and port were dropped, let this lapse
+ if (conn->getGeneration() != _generation) {
+ spawnConnections(lk);
+ return;
+ }
+
+ // If we're in shutdown, we don't need refreshed connections
+ if (_state == State::kInShutdown)
+ return;
+
+ // If the connection refreshed successfully, throw it back in
+ // the ready pool
+ if (status.isOK()) {
+ addToReady(lk, std::move(conn));
+ spawnConnections(lk);
+ return;
+ }
+
+ // If we've exceeded the time limit, start a new connect,
+ // rather than failing all operations. We do this because the
+ // various callers have their own time limit which is unrelated
+ // to our internal one.
+ if (status.code() == ErrorCodes::ExceededTimeLimit) {
+ log() << "Pending connection to host " << _hostAndPort
+ << " did not complete within the connection timeout,"
+ << " retrying with a new connection;"
+ << openConnections(lk)
+ << " connections to that host remain open";
+ spawnConnections(lk);
+ return;
+ }
+
+ // Otherwise pass the failure on through
+ processFailure(status, std::move(lk));
+ });
});
lk.lock();
} else {
@@ -425,25 +479,25 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
[this, connPtr]() {
OwnedConnection conn;
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
+ runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
+ if (!_readyPool.count(connPtr)) {
+ // We've already been checked out. We don't need to refresh
+ // ourselves.
+ return;
+ }
- if (!_readyPool.count(connPtr)) {
- // We've already been checked out. We don't need to refresh
- // ourselves.
- return;
- }
+ conn = takeFromPool(_readyPool, connPtr);
- conn = takeFromPool(_readyPool, connPtr);
+ // If we're in shutdown, we don't need to refresh connections
+ if (_state == State::kInShutdown)
+ return;
- // If we're in shutdown, we don't need to refresh connections
- if (_state == State::kInShutdown)
- return;
+ _checkedOutPool[connPtr] = std::move(conn);
- _checkedOutPool[connPtr] = std::move(conn);
+ connPtr->indicateSuccess();
- connPtr->indicateSuccess();
-
- returnConnection(connPtr, std::move(lk));
+ returnConnection(connPtr, std::move(lk));
+ });
});
fulfillRequests(lk);
@@ -586,26 +640,29 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
[this](ConnectionInterface* connPtr, Status status) {
connPtr->indicateUsed();
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
-
- auto conn = takeFromProcessingPool(connPtr);
-
- if (conn->getGeneration() != _generation) {
- // If the host and port was dropped, let the
- // connection lapse
- spawnConnections(lk);
- } else if (status.isOK()) {
- addToReady(lk, std::move(conn));
- spawnConnections(lk);
- } else if (status.code() == ErrorCodes::ExceededTimeLimit) {
- // If we've exceeded the time limit, restart the connect, rather than
- // failing all operations. We do this because the various callers
- // have their own time limit which is unrelated to our internal one.
- spawnConnections(lk);
- } else {
- // If the setup failed, cascade the failure edge
- processFailure(status, std::move(lk));
- }
+ runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ if (conn->getGeneration() != _generation) {
+ // If the host and port was dropped, let the
+ // connection lapse
+ spawnConnections(lk);
+ } else if (status.isOK()) {
+ addToReady(lk, std::move(conn));
+ spawnConnections(lk);
+ } else if (status.code() == ErrorCodes::ExceededTimeLimit) {
+ // If we've exceeded the time limit, restart the connect, rather
+ // than
+ // failing all operations. We do this because the various
+ // callers
+ // have their own time limit which is unrelated to our internal
+ // one.
+ spawnConnections(lk);
+ } else {
+ // If the setup failed, cascade the failure edge
+ processFailure(status, std::move(lk));
+ }
+ });
});
// Note that this assumes that the refreshTimeout is sound for the
// setupTimeout
@@ -641,7 +698,7 @@ void ConnectionPool::SpecificPool::shutdown() {
// If we have processing connections, wait for them to finish or timeout
// before shutdown
- if (_processingPool.size() || _droppedProcessingPool.size()) {
+ if (_processingPool.size() || _droppedProcessingPool.size() || _activeClients) {
_requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); });
return;
@@ -695,27 +752,27 @@ void ConnectionPool::SpecificPool::updateStateInLock() {
_requestTimer->setTimeout(
timeout,
[this]() {
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
-
- auto now = _parent->_factory->now();
-
- while (_requests.size()) {
- auto& x = _requests.top();
-
- if (x.first <= now) {
- auto cb = std::move(x.second);
- _requests.pop();
-
- lk.unlock();
- cb(Status(ErrorCodes::ExceededTimeLimit,
- "Couldn't get a connection within the time limit"));
- lk.lock();
- } else {
- break;
+ runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
+ auto now = _parent->_factory->now();
+
+ while (_requests.size()) {
+ auto& x = _requests.top();
+
+ if (x.first <= now) {
+ auto cb = std::move(x.second);
+ _requests.pop();
+
+ lk.unlock();
+ cb(Status(ErrorCodes::ExceededTimeLimit,
+ "Couldn't get a connection within the time limit"));
+ lk.lock();
+ } else {
+ break;
+ }
}
- }
- updateStateInLock();
+ updateStateInLock();
+ });
});
} else if (_checkedOutPool.size()) {
// If we have no requests, but someone's using a connection, we just