summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/executor/connection_pool.cpp412
-rw-r--r--src/mongo/executor/connection_pool.h16
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp18
3 files changed, 200 insertions, 246 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 337e0b52c49..581ec8e3194 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -63,9 +63,9 @@ class ConnectionPool::SpecificPool final
: public std::enable_shared_from_this<ConnectionPool::SpecificPool> {
public:
/**
- * These active client methods must be used whenever entering a specific pool outside of the
- * shutdown background task. The presence of an active client will bump a counter on the
- * specific pool which will prevent the shutdown thread from deleting it.
+ * Whenever a function enters a specific pool, the function needs to be guarded.
+ * The presence of one of these guards will bump a counter on the specific pool
+ * which will prevent the pool from removing itself from the map of pools.
*
* The complexity comes from the need to hold a lock when writing to the
* _activeClients param on the specific pool. Because the code beneath the client needs to lock
@@ -73,32 +73,28 @@ public:
* lock acquired, move it into the client, then re-acquire to decrement the counter on the way
* out.
*
- * It's used like:
+ * This callback also (perhaps overly aggressively) binds a shared pointer to the guard.
+ * It is *always* safe to reference the original specific pool in the guarded function object.
*
- * pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); });
+ * For a function object of signature:
+ * R riskyBusiness(stdx::unique_lock<stdx::mutex>, ArgTypes...);
+ *
+ * It returns a function object of signature:
+ * R safeCallback(ArgTypes...);
*/
template <typename Callback>
- auto runWithActiveClient(Callback&& cb) {
- return runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex),
- std::forward<Callback>(cb));
- }
-
- template <typename Callback>
- auto runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) {
- invariant(lk.owns_lock());
-
- _activeClients++;
-
- const auto guard = MakeGuard([&] {
- invariant(!lk.owns_lock());
- stdx::lock_guard<stdx::mutex> lk(_parent->_mutex);
- _activeClients--;
- });
+ auto guardCallback(Callback&& cb) {
+ return [ cb = std::forward<Callback>(cb), anchor = shared_from_this() ](auto&&... args) {
+ stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex);
+ ++(anchor->_activeClients);
+
+ ON_BLOCK_EXIT([anchor]() {
+ stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex);
+ --(anchor->_activeClients);
+ });
- {
- decltype(lk) localLk(std::move(lk));
- return cb(std::move(localLk));
- }
+ return cb(std::move(lk), std::forward<decltype(args)>(args)...);
+ };
}
SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort);
@@ -113,6 +109,14 @@ public:
stdx::unique_lock<stdx::mutex> lk);
/**
+ * Triggers the shutdown procedure. This function marks the state as kInShutdown
+ * and calls processFailure below with the status provided. This may not immediately
+ * delist or destruct this pool. However, both will happen eventually as ConnectionHandles
+ * are deleted.
+ */
+ void triggerShutdown(const Status& status, stdx::unique_lock<stdx::mutex> lk);
+
+ /**
* Cascades a failure across existing connections and requests. Invoking
* this function drops all current connections and fails all current
* requests with the passed status.
@@ -169,18 +173,6 @@ public:
_tags = mutateFunc(_tags);
}
- /**
- * See runWithActiveClient for what this controls, and be very very careful to manage the
- * refcount correctly.
- */
- void incActiveClients(const stdx::unique_lock<stdx::mutex>& lk) {
- _activeClients++;
- }
-
- void decActiveClients(const stdx::unique_lock<stdx::mutex>& lk) {
- _activeClients--;
- }
-
private:
using OwnedConnection = std::shared_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
@@ -198,8 +190,6 @@ private:
void spawnConnections(stdx::unique_lock<stdx::mutex>& lk);
- void shutdown();
-
template <typename OwnershipPoolType>
typename OwnershipPoolType::mapped_type takeFromPool(
OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr);
@@ -292,33 +282,15 @@ ConnectionPool::~ConnectionPool() {
void ConnectionPool::shutdown() {
_factory->shutdown();
- std::vector<SpecificPool*> pools;
-
- // Ensure we decrement active clients for all pools that we inc on (because we intend to process
- // failures)
- const auto guard = MakeGuard([&] {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- for (const auto& pool : pools) {
- pool->decActiveClients(lk);
- }
- });
-
// Grab all current pools (under the lock)
- {
+ auto pools = [&] {
stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _pools;
+ }();
- for (auto& pair : _pools) {
- pools.push_back(pair.second.get());
- pair.second->incActiveClients(lk);
- }
- }
-
- // Reacquire the lock per pool and process failures. We'll dec active clients when we're all
- // through in the guard
- for (const auto& pool : pools) {
+ for (const auto& pair : pools) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- pool->processFailure(
+ pair.second->triggerShutdown(
Status(ErrorCodes::ShutdownInProgress, "Shutting down the connection pool"),
std::move(lk));
}
@@ -332,42 +304,25 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
if (iter == _pools.end())
return;
- iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
- iter->second->processFailure(
- Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"),
- std::move(lk));
- });
+ auto pool = iter->second;
+ pool->processFailure(Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"),
+ std::move(lk));
}
void ConnectionPool::dropConnections(transport::Session::TagMask tags) {
- std::vector<SpecificPool*> pools;
-
- // Ensure we decrement active clients for all pools that we inc on (because we intend to process
- // failures)
- const auto guard = MakeGuard([&] {
+ // Grab all current pools (under the lock)
+ auto pools = [&] {
stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _pools;
+ }();
- for (const auto& pool : pools) {
- pool->decActiveClients(lk);
- }
- });
+ for (const auto& pair : pools) {
+ auto& pool = pair.second;
- // Grab all current pools that don't match tags (under the lock)
- {
stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (pool->matchesTags(lk, tags))
+ continue;
- for (auto& pair : _pools) {
- if (!pair.second->matchesTags(lk, tags)) {
- pools.push_back(pair.second.get());
- pair.second->incActiveClients(lk);
- }
- }
- }
-
- // Reacquire the lock per pool and process failures. We'll dec active clients when we're all
- // through in the guard
- for (const auto& pool : pools) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
pool->processFailure(
Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"),
std::move(lk));
@@ -384,7 +339,8 @@ void ConnectionPool::mutateTags(
if (iter == _pools.end())
return;
- iter->second->mutateTags(lk, mutateFunc);
+ auto pool = iter->second;
+ pool->mutateTags(lk, mutateFunc);
}
void ConnectionPool::get(const HostAndPort& hostAndPort,
@@ -395,25 +351,22 @@ void ConnectionPool::get(const HostAndPort& hostAndPort,
Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort,
Milliseconds timeout) {
- SpecificPool* pool;
+ std::shared_ptr<SpecificPool> pool;
stdx::unique_lock<stdx::mutex> lk(_mutex);
auto iter = _pools.find(hostAndPort);
if (iter == _pools.end()) {
- auto handle = stdx::make_unique<SpecificPool>(this, hostAndPort);
- pool = handle.get();
- _pools[hostAndPort] = std::move(handle);
+ pool = stdx::make_unique<SpecificPool>(this, hostAndPort);
+ _pools[hostAndPort] = pool;
} else {
- pool = iter->second.get();
+ pool = iter->second;
}
invariant(pool);
- return pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
- return pool->getConnection(hostAndPort, timeout, std::move(lk));
- });
+ return pool->getConnection(hostAndPort, timeout, std::move(lk));
}
void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const {
@@ -450,9 +403,8 @@ void ConnectionPool::returnConnection(ConnectionInterface* conn) {
str::stream() << "Tried to return connection but no pool found for "
<< conn->getHostAndPort());
- iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) {
- iter->second->returnConnection(conn, std::move(lk));
- });
+ auto pool = iter->second;
+ pool->returnConnection(conn, std::move(lk));
}
ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort)
@@ -469,6 +421,9 @@ ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAnd
ConnectionPool::SpecificPool::~SpecificPool() {
DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();)
+
+ invariant(_requests.empty());
+ invariant(_checkedOutPool.empty());
}
size_t ConnectionPool::SpecificPool::inUseConnections(const stdx::unique_lock<stdx::mutex>& lk) {
@@ -495,6 +450,8 @@ size_t ConnectionPool::SpecificPool::openConnections(const stdx::unique_lock<std
Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection(
const HostAndPort& hostAndPort, Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) {
+ invariant(_state != State::kInShutdown);
+
if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) {
timeout = _parent->_options.refreshTimeout;
}
@@ -555,46 +512,43 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
// Unlock in case refresh can occur immediately
lk.unlock();
- connPtr->refresh(
- _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) {
- runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
- auto conn = takeFromProcessingPool(connPtr);
-
- // If the host and port were dropped, let this lapse
- if (conn->getGeneration() != _generation) {
- spawnConnections(lk);
- return;
- }
-
- // If we're in shutdown, we don't need refreshed connections
- if (_state == State::kInShutdown)
- return;
-
- // If the connection refreshed successfully, throw it back in
- // the ready pool
- if (status.isOK()) {
- addToReady(lk, std::move(conn));
- spawnConnections(lk);
- return;
- }
-
- // If we've exceeded the time limit, start a new connect,
- // rather than failing all operations. We do this because the
- // various callers have their own time limit which is unrelated
- // to our internal one.
- if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
- log() << "Pending connection to host " << _hostAndPort
- << " did not complete within the connection timeout,"
- << " retrying with a new connection;" << openConnections(lk)
- << " connections to that host remain open";
- spawnConnections(lk);
- return;
- }
-
- // Otherwise pass the failure on through
- processFailure(status, std::move(lk));
- });
- });
+ connPtr->refresh(_parent->_options.refreshTimeout,
+ guardCallback([this](stdx::unique_lock<stdx::mutex> lk,
+ ConnectionInterface* connPtr,
+ Status status) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If we're in shutdown, we don't need refreshed connections
+ if (_state == State::kInShutdown)
+ return;
+
+ // If the connection refreshed successfully, throw it back in
+ // the ready pool
+ if (status.isOK()) {
+ // If the host and port were dropped, let this lapse
+ if (conn->getGeneration() == _generation) {
+ addToReady(lk, std::move(conn));
+ }
+ spawnConnections(lk);
+ return;
+ }
+
+ // If we've exceeded the time limit, start a new connect,
+ // rather than failing all operations. We do this because the
+ // various callers have their own time limit which is unrelated
+ // to our internal one.
+ if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ log() << "Pending connection to host " << _hostAndPort
+ << " did not complete within the connection timeout,"
+ << " retrying with a new connection;" << openConnections(lk)
+ << " connections to that host remain open";
+ spawnConnections(lk);
+ return;
+ }
+
+ // Otherwise pass the failure on through
+ processFailure(status, std::move(lk));
+ }));
lk.lock();
} else {
// If it's fine as it is, just put it in the ready queue
@@ -616,30 +570,36 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
// immediately check them back in (which kicks off the refresh logic in
// returnConnection
connPtr->setTimeout(_parent->_options.refreshRequirement,
- [ this, connPtr, anchor = shared_from_this() ]() {
- runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
- auto conn = takeFromPool(_readyPool, connPtr);
+ guardCallback([this, connPtr](stdx::unique_lock<stdx::mutex> lk) {
+ auto conn = takeFromPool(_readyPool, connPtr);
- // We've already been checked out. We don't need to refresh
- // ourselves.
- if (!conn)
- return;
+ // We've already been checked out. We don't need to refresh
+ // ourselves.
+ if (!conn)
+ return;
- // If we're in shutdown, we don't need to refresh connections
- if (_state == State::kInShutdown)
- return;
+ // If we're in shutdown, we don't need to refresh connections
+ if (_state == State::kInShutdown)
+ return;
- _checkedOutPool[connPtr] = std::move(conn);
+ _checkedOutPool[connPtr] = std::move(conn);
- connPtr->indicateSuccess();
+ connPtr->indicateSuccess();
- returnConnection(connPtr, std::move(lk));
- });
- });
+ returnConnection(connPtr, std::move(lk));
+ }));
fulfillRequests(lk);
}
+// Sets state to shutdown and kicks off the failure protocol to tank existing connections
+void ConnectionPool::SpecificPool::triggerShutdown(const Status& status,
+ stdx::unique_lock<stdx::mutex> lk) {
+ _state = State::kInShutdown;
+ _droppedProcessingPool.clear();
+ processFailure(status, std::move(lk));
+}
+
// Drop connections and fail all requests
void ConnectionPool::SpecificPool::processFailure(const Status& status,
stdx::unique_lock<stdx::mutex> lk) {
@@ -647,7 +607,11 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status,
// connections
_generation++;
- // Drop ready connections
+ // When a connection enters the ready pool, its timer is set to eventually refresh the
+ // connection. This requires a lifetime extension of the specific pool because the connection
+ // timer is tied to the lifetime of the connection, not the pool. That said, we can destruct
+ // all of the connections and thus timers of which we have ownership.
+ // In short, clearing the ready pool helps the SpecificPool drain.
_readyPool.clear();
// Log something helpful
@@ -655,7 +619,10 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status,
// Migrate processing connections to the dropped pool
for (auto&& x : _processingPool) {
- _droppedProcessingPool[x.first] = std::move(x.second);
+ if (_state != State::kInShutdown) {
+ // If we're just dropping the pool, we can reuse them later
+ _droppedProcessingPool[x.first] = std::move(x.second);
+ }
}
_processingPool.clear();
@@ -731,7 +698,12 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
// pass it to the user
connPtr->resetToUnknown();
lk.unlock();
- promise.emplaceValue(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent)));
+ ConnectionHandle handle(connPtr,
+ guardCallback([this](stdx::unique_lock<stdx::mutex> localLk,
+ ConnectionPool::ConnectionInterface* conn) {
+ returnConnection(conn, std::move(localLk));
+ }));
+ promise.emplaceValue(std::move(handle));
lk.lock();
}
}
@@ -755,8 +727,10 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
};
// While all of our inflight connections are less than our target
- while ((_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) &&
+ while ((_state != State::kInShutdown) &&
+ (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) &&
(_processingPool.size() < _parent->_options.maxConnecting)) {
+
OwnedConnection handle;
try {
// make a new connection and put it in processing
@@ -773,28 +747,31 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
// Run the setup callback
lk.unlock();
handle->setup(
- _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) {
- runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
- auto conn = takeFromProcessingPool(connPtr);
-
- if (conn->getGeneration() != _generation) {
- // If the host and port was dropped, let the
- // connection lapse
- spawnConnections(lk);
- } else if (status.isOK()) {
+ _parent->_options.refreshTimeout,
+ guardCallback([this](
+ stdx::unique_lock<stdx::mutex> lk, ConnectionInterface* connPtr, Status status) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If we're in shutdown, we don't need this conn
+ if (_state == State::kInShutdown)
+ return;
+
+ if (status.isOK()) {
+ // If the host and port was dropped, let the connection lapse
+ if (conn->getGeneration() == _generation) {
addToReady(lk, std::move(conn));
- spawnConnections(lk);
- } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
- // If we've exceeded the time limit, restart the connect, rather than
- // failing all operations. We do this because the various callers
- // have their own time limit which is unrelated to our internal one.
- spawnConnections(lk);
- } else {
- // If the setup failed, cascade the failure edge
- processFailure(status, std::move(lk));
}
- });
- });
+ spawnConnections(lk);
+ } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ // If we've exceeded the time limit, restart the connect, rather than
+ // failing all operations. We do this because the various callers
+ // have their own time limit which is unrelated to our internal one.
+ spawnConnections(lk);
+ } else {
+ // If the setup failed, cascade the failure edge
+ processFailure(status, std::move(lk));
+ }
+ }));
// Note that this assumes that the refreshTimeout is sound for the
// setupTimeout
@@ -802,45 +779,6 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
}
}
-// Called every second after hostTimeout until all processing connections reap
-void ConnectionPool::SpecificPool::shutdown() {
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
-
- // We're racing:
- //
- // Thread A (this thread)
- // * Fired the shutdown timer
- // * Came into shutdown() and blocked
- //
- // Thread B (some new consumer)
- // * Requested a new connection
- // * Beat thread A to the mutex
- // * Cancelled timer (but thread A already made it in)
- // * Set state to running
- // * released the mutex
- //
- // So we end up in shutdown, but with kRunning. If we're here we raced and
- // we should just bail.
- if (_state == State::kRunning) {
- return;
- }
-
- _state = State::kInShutdown;
-
- // If we have processing connections, wait for them to finish or timeout
- // before shutdown
- if (_processingPool.size() || _droppedProcessingPool.size() || _activeClients) {
- _requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); });
-
- return;
- }
-
- invariant(_requests.empty());
- invariant(_checkedOutPool.empty());
-
- _parent->_pools.erase(_hostAndPort);
-}
-
template <typename OwnershipPoolType>
typename OwnershipPoolType::mapped_type ConnectionPool::SpecificPool::takeFromPool(
OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr) {
@@ -856,8 +794,10 @@ typename OwnershipPoolType::mapped_type ConnectionPool::SpecificPool::takeFromPo
ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromProcessingPool(
ConnectionInterface* connPtr) {
auto conn = takeFromPool(_processingPool, connPtr);
- if (conn)
+ if (conn) {
+ invariant(_state != State::kInShutdown);
return conn;
+ }
return takeFromPool(_droppedProcessingPool, connPtr);
}
@@ -865,6 +805,16 @@ ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::take
// Updates our state and manages the request timer
void ConnectionPool::SpecificPool::updateStateInLock() {
+ if (_state == State::kInShutdown) {
+ // If we're in shutdown, there is nothing to update. Our clients are all gone.
+ if (_processingPool.empty() && !_activeClients) {
+ // If we have no more clients that require access to us, delist from the parent pool
+ LOG(2) << "Delisting connection pool for " << _hostAndPort;
+ _parent->_pools.erase(_hostAndPort);
+ }
+ return;
+ }
+
if (_requests.size()) {
// We have some outstanding requests, we're live
@@ -883,8 +833,8 @@ void ConnectionPool::SpecificPool::updateStateInLock() {
// We set a timer for the most recent request, then invoke each timed
// out request we couldn't service
- _requestTimer->setTimeout(timeout, [this]() {
- runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) {
+ _requestTimer->setTimeout(
+ timeout, guardCallback([this](stdx::unique_lock<stdx::mutex> lk) {
auto now = _parent->_factory->now();
while (_requests.size()) {
@@ -905,8 +855,7 @@ void ConnectionPool::SpecificPool::updateStateInLock() {
}
updateStateInLock();
- });
- });
+ }));
} else if (_checkedOutPool.size()) {
// If we have no requests, but someone's using a connection, we just
// hang around until the next request or a return
@@ -929,8 +878,17 @@ void ConnectionPool::SpecificPool::updateStateInLock() {
auto timeout = _parent->_options.hostTimeout;
- // Set the shutdown timer
- _requestTimer->setTimeout(timeout, [this]() { shutdown(); });
+ // Set the shutdown timer, this gets reset on any request
+ _requestTimer->setTimeout(timeout, [ this, anchor = shared_from_this() ]() {
+ stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex);
+ if (_state != State::kIdle)
+ return;
+
+ triggerShutdown(
+ Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ "Connection pool has been idle for longer than the host timeout"),
+ std::move(lk));
+ });
}
}
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index a6539cf8beb..25f98fa63cd 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -63,11 +63,11 @@ class ConnectionPool : public EgressTagCloser {
class SpecificPool;
public:
- class ConnectionHandleDeleter;
class ConnectionInterface;
class DependentTypeFactoryInterface;
class TimerInterface;
+ using ConnectionHandleDeleter = stdx::function<void(ConnectionInterface* connection)>;
using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>;
using GetConnectionCallback = stdx::function<void(StatusWith<ConnectionHandle>)>;
@@ -172,20 +172,6 @@ private:
EgressTagCloserManager* _manager;
};
-class ConnectionPool::ConnectionHandleDeleter {
-public:
- ConnectionHandleDeleter() = default;
- ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {}
-
- void operator()(ConnectionInterface* connection) const {
- if (_pool && connection)
- _pool->returnConnection(connection);
- }
-
-private:
- ConnectionPool* _pool = nullptr;
-};
-
/**
* Interface for a basic timer
*
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index 75f5811baca..c18f229ba28 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -98,8 +98,7 @@ class HangingHook : public executor::NetworkConnectionHook {
return response.status;
}
- return {ErrorCodes::NetworkInterfaceExceededTimeLimit,
- "No ping command. Simulating timeout"};
+ return {ErrorCodes::ExceededTimeLimit, "No ping command. Returning pseudo-timeout."};
}
};
@@ -108,8 +107,19 @@ class HangingHook : public executor::NetworkConnectionHook {
TEST_F(NetworkInterfaceIntegrationFixture, HookHangs) {
startNet(stdx::make_unique<HangingHook>());
- assertCommandFailsOnClient(
- "admin", BSON("ping" << 1), ErrorCodes::NetworkInterfaceExceededTimeLimit, Seconds(1));
+ /**
+ * Since mongos's have no ping command, we effectively skip this test by returning
+ * ExceededTimeLimit above. (That ErrorCode is used heavily in repl and sharding code.)
+ * If we return NetworkInterfaceExceededTimeLimit, it will make the ConnectionPool
+ * attempt to reform the connection, which can lead to an accepted but unfortunate
+ * race between TLConnection::setup and TLTypeFactory::shutdown.
+ * We assert here that the error code we get is in the error class of timeouts,
+ * which covers both NetworkInterfaceExceededTimeLimit and ExceededTimeLimit.
+ */
+ RemoteCommandRequest request{
+ fixture().getServers()[0], "admin", BSON("ping" << 1), BSONObj(), nullptr, Seconds(1)};
+ auto res = runCommandSync(request);
+ ASSERT(ErrorCodes::isExceededTimeLimitError(res.status.code()));
}
using ResponseStatus = TaskExecutor::ResponseStatus;