summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/executor/connection_pool.cpp82
-rw-r--r--src/mongo/executor/connection_pool.h6
-rw-r--r--src/mongo/executor/connection_pool_test.cpp778
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp4
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.h28
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp10
-rw-r--r--src/mongo/executor/connection_pool_tl.h10
-rw-r--r--src/mongo/executor/network_interface_tl.cpp2
-rw-r--r--src/mongo/executor/network_interface_tl.h2
-rw-r--r--src/mongo/util/out_of_line_executor.h3
10 files changed, 469 insertions, 456 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 8d934581ca2..ac43bbaed6f 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -36,7 +36,6 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/remote_command_request.h"
-#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
@@ -94,15 +93,8 @@ class ConnectionPool::SpecificPool final
: public std::enable_shared_from_this<ConnectionPool::SpecificPool> {
public:
/**
- * Whenever a function enters a specific pool, the function needs to be guarded.
- * The presence of one of these guards will bump a counter on the specific pool
- * which will prevent the pool from removing itself from the map of pools.
- *
- * The complexity comes from the need to hold a lock when writing to the
- * _activeClients param on the specific pool. Because the code beneath the client needs to lock
- * and unlock the parent mutex (and can leave unlocked), we want to start the client with the
- * lock acquired, move it into the client, then re-acquire to decrement the counter on the way
- * out.
+ * Whenever a function enters a specific pool,
+ * the function needs to be guarded by the pool lock.
*
* This callback also (perhaps overly aggressively) binds a shared pointer to the guard.
* It is *always* safe to reference the original specific pool in the guarded function object.
@@ -116,19 +108,12 @@ public:
template <typename Callback>
auto guardCallback(Callback&& cb) {
return [ cb = std::forward<Callback>(cb), anchor = shared_from_this() ](auto&&... args) {
- stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex);
- ++(anchor->_activeClients);
-
- ON_BLOCK_EXIT([anchor]() {
- stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex);
- --(anchor->_activeClients);
- });
-
- return cb(std::move(lk), std::forward<decltype(args)>(args)...);
+ return cb(stdx::unique_lock(anchor->_parent->_mutex),
+ std::forward<decltype(args)>(args)...);
};
}
- SpecificPool(ConnectionPool* parent,
+ SpecificPool(std::shared_ptr<ConnectionPool> parent,
const HostAndPort& hostAndPort,
transport::ConnectSSLMode sslMode);
~SpecificPool();
@@ -247,7 +232,7 @@ private:
void updateStateInLock();
private:
- ConnectionPool* const _parent;
+ const std::shared_ptr<ConnectionPool> _parent;
const transport::ConnectSSLMode _sslMode;
const HostAndPort _hostAndPort;
@@ -261,7 +246,6 @@ private:
std::shared_ptr<TimerInterface> _requestTimer;
Date_t _requestTimerExpiration;
- size_t _activeClients;
size_t _generation;
bool _inFulfillRequests;
bool _inSpawnConnections;
@@ -408,7 +392,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort&
auto iter = _pools.find(hostAndPort);
if (iter == _pools.end()) {
- pool = stdx::make_unique<SpecificPool>(this, hostAndPort, sslMode);
+ pool = std::make_shared<SpecificPool>(shared_from_this(), hostAndPort, sslMode);
_pools[hostAndPort] = pool;
} else {
pool = iter->second;
@@ -445,33 +429,21 @@ size_t ConnectionPool::getNumConnectionsPerHost(const HostAndPort& hostAndPort)
return 0;
}
-void ConnectionPool::returnConnection(ConnectionInterface* conn) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- auto iter = _pools.find(conn->getHostAndPort());
-
- invariant(iter != _pools.end(),
- str::stream() << "Tried to return connection but no pool found for "
- << conn->getHostAndPort());
-
- auto pool = iter->second;
- pool->returnConnection(conn, std::move(lk));
-}
-
-ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent,
+ConnectionPool::SpecificPool::SpecificPool(std::shared_ptr<ConnectionPool> parent,
const HostAndPort& hostAndPort,
transport::ConnectSSLMode sslMode)
- : _parent(parent),
+ : _parent(std::move(parent)),
_sslMode(sslMode),
_hostAndPort(hostAndPort),
_readyPool(std::numeric_limits<size_t>::max()),
- _requestTimer(parent->_factory->makeTimer()),
- _activeClients(0),
_generation(0),
_inFulfillRequests(false),
_inSpawnConnections(false),
_created(0),
- _state(State::kRunning) {}
+ _state(State::kRunning) {
+ invariant(_parent);
+ _requestTimer = _parent->_factory->makeTimer();
+}
ConnectionPool::SpecificPool::~SpecificPool() {
DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();)
@@ -527,7 +499,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
updateStateInLock();
lk.unlock();
- _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
+ _parent->_factory->getExecutor()->schedule(guardCallback([this](auto lk, auto schedStatus) {
fassert(51169, schedStatus);
spawnConnections(lk);
@@ -615,7 +587,7 @@ void ConnectionPool::SpecificPool::finishRefresh(stdx::unique_lock<stdx::mutex>
addToReady(lk, std::move(conn));
lk.unlock();
- _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
+ _parent->_factory->getExecutor()->schedule(guardCallback([this](auto lk, auto schedStatus) {
fassert(51170, schedStatus);
fulfillRequests(lk);
}));
@@ -669,7 +641,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
addToReady(lk, std::move(conn));
lk.unlock();
- _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
+ _parent->_factory->getExecutor()->schedule(guardCallback([this](auto lk, auto schedStatus) {
fassert(51171, schedStatus);
fulfillRequests(lk);
}));
@@ -886,7 +858,7 @@ ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::take
void ConnectionPool::SpecificPool::updateStateInLock() {
if (_state == State::kInShutdown) {
// If we're in shutdown, there is nothing to update. Our clients are all gone.
- if (_processingPool.empty() && !_activeClients) {
+ if (_processingPool.empty()) {
// If we have no more clients that require access to us, delist from the parent pool
LOG(2) << "Delisting connection pool for " << _hostAndPort;
_parent->_pools.erase(_hostAndPort);
@@ -958,16 +930,16 @@ void ConnectionPool::SpecificPool::updateStateInLock() {
auto timeout = _parent->_options.hostTimeout;
// Set the shutdown timer, this gets reset on any request
- _requestTimer->setTimeout(timeout, [ this, anchor = shared_from_this() ]() {
- stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex);
- if (_state != State::kIdle)
- return;
-
- triggerShutdown(
- Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- "Connection pool has been idle for longer than the host timeout"),
- std::move(lk));
- });
+ _requestTimer->setTimeout(
+ timeout, guardCallback([this](auto lk) {
+ if (_state != State::kIdle)
+ return;
+
+ triggerShutdown(
+ Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ "Connection pool has been idle for longer than the host timeout"),
+ std::move(lk));
+ }));
}
}
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index f3bfb691ff5..5944e7a527e 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -62,7 +62,7 @@ struct ConnectionPoolStats;
* The overall workflow here is to manage separate pools for each unique
* HostAndPort. See comments on the various Options for how the pool operates.
*/
-class ConnectionPool : public EgressTagCloser {
+class ConnectionPool : public EgressTagCloser, public std::enable_shared_from_this<ConnectionPool> {
class SpecificPool;
public:
@@ -167,8 +167,6 @@ public:
size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const;
private:
- void returnConnection(ConnectionInterface* connection);
-
std::string _name;
// Options are set at startup and never changed at run time, so these are
@@ -347,7 +345,7 @@ public:
/**
* Return the executor for use with this factory
*/
- virtual OutOfLineExecutor& getExecutor() = 0;
+ virtual const std::shared_ptr<OutOfLineExecutor>& getExecutor() = 0;
/**
* Makes a new timer
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index b36cf8cf749..0bb27501fcf 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -52,11 +52,22 @@ protected:
void setUp() override {}
void tearDown() override {
+ if (_pool) {
+ _pool->shutdown();
+ }
+
ConnectionImpl::clear();
TimerImpl::clear();
}
+ auto makePool(ConnectionPool::Options options = {}) {
+ _pool =
+ std::make_shared<ConnectionPool>(std::make_shared<PoolImpl>(), "test pool", options);
+ return _pool;
+ }
+
private:
+ std::shared_ptr<ConnectionPool> _pool;
};
void doneWith(const ConnectionPool::ConnectionHandle& conn) {
@@ -76,27 +87,27 @@ auto verifyAndGetId(StatusWithConn& swConn) {
* another.
*/
TEST_F(ConnectionPoolTest, SameConn) {
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool");
+ auto pool = makePool();
// Grab and stash an id for the first request
size_t conn1Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn1Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
// Grab and stash an id for the second request
size_t conn2Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn2Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
// Verify that we hit them, and that they're the same
ASSERT(conn1Id);
@@ -108,7 +119,7 @@ TEST_F(ConnectionPoolTest, SameConn) {
* Verify that connections are obtained in MRU order.
*/
TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool");
+ auto pool = makePool();
// Obtain a set of connections
constexpr size_t kSize = 100;
@@ -132,13 +143,14 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
for (size_t i = 0; i != kSize; ++i) {
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- connections.push_back(std::move(swConn.getValue()));
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ connections.push_back(std::move(swConn.getValue()));
+ });
}
+ ASSERT_EQ(connections.size(), kSize);
// Shuffle them into a random order
std::random_device rd;
@@ -153,21 +165,23 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
ids.push(static_cast<ConnectionImpl*>(conn.get())->id());
conn->indicateSuccess();
}
+ ASSERT_EQ(ids.size(), kSize);
// Re-obtain the connections. They should come back in the same order
// as the IDs in the stack, since the pool returns them in MRU order.
for (size_t i = 0; i != kSize; ++i) {
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- const auto id = verifyAndGetId(swConn);
- connections.push_back(std::move(swConn.getValue()));
- ASSERT(id == ids.top());
- ids.pop();
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ const auto id = verifyAndGetId(swConn);
+ connections.push_back(std::move(swConn.getValue()));
+ ASSERT_EQ(id, ids.top());
+ ids.pop();
+ });
}
+ ASSERT(ids.empty());
}
/**
@@ -179,9 +193,9 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
options.refreshRequirement = Milliseconds(1000);
options.refreshTimeout = Milliseconds(5000);
options.hostTimeout = Minutes(1);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
- ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), 0U);
+ ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), 0U);
// Obtain a set of connections
constexpr size_t kSize = 100;
@@ -210,17 +224,17 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
std::set<size_t> original_ids;
for (size_t i = 0; i != kSize; ++i) {
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- original_ids.insert(verifyAndGetId(swConn));
- connections.push_back(std::move(swConn.getValue()));
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ original_ids.insert(verifyAndGetId(swConn));
+ connections.push_back(std::move(swConn.getValue()));
+ });
}
ASSERT_EQ(original_ids.size(), kSize);
- ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize);
+ ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize);
// Shuffle them into a random order
std::random_device rd;
@@ -236,25 +250,25 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
// Advance the time, but not enough to age out connections. We should still have them all.
PoolImpl::setNow(now + Milliseconds(500));
- ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize);
+ ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize);
// Re-obtain a quarter of the connections, and record their IDs in a set.
std::set<size_t> reacquired_ids;
for (size_t i = 0; i < kSize / 4; ++i) {
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- reacquired_ids.insert(verifyAndGetId(swConn));
- connections.push_back(std::move(swConn.getValue()));
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ reacquired_ids.insert(verifyAndGetId(swConn));
+ connections.push_back(std::move(swConn.getValue()));
+ });
}
ASSERT_EQ(reacquired_ids.size(), kSize / 4);
ASSERT(std::includes(
original_ids.begin(), original_ids.end(), reacquired_ids.begin(), reacquired_ids.end()));
- ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize);
+ ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize);
// Put them right back in.
while (!connections.empty()) {
@@ -264,40 +278,40 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
}
// We should still have all of them in the pool
- ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize);
+ ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize);
// Advance across the host timeout for the 75 connections we
// didn't use. Afterwards, the pool should contain only those
// kSize/4 connections we used above.
PoolImpl::setNow(now + Milliseconds(1000));
- ASSERT_EQ(pool.getNumConnectionsPerHost(HostAndPort()), kSize / 4);
+ ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize / 4);
}
/**
* Verify that a failed connection isn't returned to the pool
*/
TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool");
+ auto pool = makePool();
// Grab the first connection and indicate that it failed
size_t conn1Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
- swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error"));
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn1Id = verifyAndGetId(swConn);
+ swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error"));
+ });
// Grab the second id
size_t conn2Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn2Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
// Verify that we hit them, and that they're different
ASSERT(conn1Id);
@@ -310,27 +324,27 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
* connections.
*/
TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool");
+ auto pool = makePool();
// Conn 1 from port 30000
size_t conn1Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort("localhost:30000"),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort("localhost:30000"),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn1Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
// Conn 2 from port 30001
size_t conn2Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort("localhost:30001"),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort("localhost:30001"),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn2Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
// Hit them and not the same
ASSERT(conn1Id);
@@ -342,27 +356,27 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
* Verify that not returning handle's to the pool spins up new connections.
*/
TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) {
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool");
+ auto pool = makePool();
// Get the first connection, move it out rather than letting it return
ConnectionPool::ConnectionHandle conn1;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- conn1 = std::move(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ conn1 = std::move(swConn.getValue());
+ });
// Get the second connection, move it out rather than letting it return
ConnectionPool::ConnectionHandle conn2;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- conn2 = std::move(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ conn2 = std::move(swConn.getValue());
+ });
// Verify that the two connections are different
ASSERT_NE(conn1.get(), conn2.get());
@@ -378,7 +392,7 @@ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) {
* Note that the lack of pushSetup() calls delays the get.
*/
TEST_F(ConnectionPoolTest, TimeoutOnSetup) {
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool");
+ auto pool = makePool();
bool notOk = false;
@@ -386,7 +400,7 @@ TEST_F(ConnectionPoolTest, TimeoutOnSetup) {
PoolImpl::setNow(now);
- pool.get_forTest(
+ pool->get_forTest(
HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { notOk = !swConn.isOK(); });
@@ -414,7 +428,7 @@ TEST_F(ConnectionPoolTest, refreshHappens) {
ConnectionPool::Options options;
options.refreshRequirement = Milliseconds(1000);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
@@ -422,12 +436,12 @@ TEST_F(ConnectionPoolTest, refreshHappens) {
// Get a connection
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ });
// After 1 second, one refresh has occurred
PoolImpl::setNow(now + Milliseconds(1000));
@@ -450,7 +464,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
ConnectionPool::Options options;
options.refreshRequirement = Milliseconds(1000);
options.refreshTimeout = Milliseconds(2000);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
@@ -460,23 +474,23 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
// Grab a connection and verify it's good
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn1Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
PoolImpl::setNow(now + Milliseconds(500));
size_t conn2Id = 0;
// Make sure we still get the first one
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn2Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
ASSERT_EQ(conn1Id, conn2Id);
// This should trigger a refresh, but not time it out. So now we have one
@@ -487,13 +501,13 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
// This will wait because we have a refreshing connection, so it'll wait to
// see if that pans out. In this case, we'll get a failure on timeout.
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(1000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(!swConn.isOK());
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(!swConn.isOK());
- reachedA = true;
- });
+ reachedA = true;
+ });
ASSERT(!reachedA);
PoolImpl::setNow(now + Milliseconds(3000));
@@ -503,13 +517,13 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
bool reachedB = false;
// Make sure we can get a new connection
- pool.get_forTest(HostAndPort(),
- Milliseconds(1000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(verifyAndGetId(swConn), conn1Id);
- reachedB = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(verifyAndGetId(swConn), conn1Id);
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedA);
ASSERT(reachedB);
@@ -519,31 +533,31 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
* Verify that requests are served in expiration order, not insertion order
*/
TEST_F(ConnectionPoolTest, requestsServedByUrgency) {
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool");
+ auto pool = makePool();
bool reachedA = false;
bool reachedB = false;
ConnectionPool::ConnectionHandle conn;
- pool.get_forTest(HostAndPort(),
- Milliseconds(2000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(2000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
- reachedA = true;
- doneWith(swConn.getValue());
- });
+ reachedA = true;
+ doneWith(swConn.getValue());
+ });
- pool.get_forTest(HostAndPort(),
- Milliseconds(1000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
- reachedB = true;
+ reachedB = true;
- conn = std::move(swConn.getValue());
- });
+ conn = std::move(swConn.getValue());
+ });
ConnectionImpl::pushSetup(Status::OK());
@@ -566,7 +580,7 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) {
ConnectionPool::Options options;
options.minConnections = 1;
options.maxConnections = 2;
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
ConnectionPool::ConnectionHandle conn1;
ConnectionPool::ConnectionHandle conn2;
@@ -574,27 +588,27 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) {
// Make 3 requests, each which keep their connection (don't return it to
// the pool)
- pool.get_forTest(HostAndPort(),
- Milliseconds(3000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
-
- conn3 = std::move(swConn.getValue());
- });
- pool.get_forTest(HostAndPort(),
- Milliseconds(2000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
-
- conn2 = std::move(swConn.getValue());
- });
- pool.get_forTest(HostAndPort(),
- Milliseconds(1000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
-
- conn1 = std::move(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(3000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn3 = std::move(swConn.getValue());
+ });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(2000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn2 = std::move(swConn.getValue());
+ });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn1 = std::move(swConn.getValue());
+ });
ConnectionImpl::pushSetup(Status::OK());
ConnectionImpl::pushSetup(Status::OK());
@@ -624,7 +638,7 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) {
ConnectionPool::Options options;
options.minConnections = 1;
options.maxConnecting = 2;
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
ConnectionPool::ConnectionHandle conn1;
ConnectionPool::ConnectionHandle conn2;
@@ -632,27 +646,27 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) {
// Make 3 requests, each which keep their connection (don't return it to
// the pool)
- pool.get_forTest(HostAndPort(),
- Milliseconds(3000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
-
- conn3 = std::move(swConn.getValue());
- });
- pool.get_forTest(HostAndPort(),
- Milliseconds(2000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
-
- conn2 = std::move(swConn.getValue());
- });
- pool.get_forTest(HostAndPort(),
- Milliseconds(1000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
-
- conn1 = std::move(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(3000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn3 = std::move(swConn.getValue());
+ });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(2000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn2 = std::move(swConn.getValue());
+ });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn1 = std::move(swConn.getValue());
+ });
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
ConnectionImpl::pushSetup(Status::OK());
@@ -683,7 +697,7 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) {
ConnectionPool::Options options;
options.maxConnecting = 1;
options.refreshRequirement = Milliseconds(1000);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
@@ -691,12 +705,12 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) {
// Get a connection
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ });
ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u);
@@ -707,13 +721,13 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) {
bool reachedA = false;
// Try to get another connection
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- doneWith(swConn.getValue());
- reachedA = true;
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ reachedA = true;
+ });
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
ASSERT(!reachedA);
@@ -730,19 +744,19 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) {
options.maxConnecting = 2;
options.minConnections = 3;
options.refreshRequirement = Milliseconds(1000);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
// Get us spun up to 3 connections in the pool
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ });
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
ConnectionImpl::pushSetup(Status::OK());
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
@@ -758,12 +772,12 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) {
// Start 5 new requests
for (size_t i = 0; i < conns.size(); ++i) {
- pool.get_forTest(HostAndPort(),
- Milliseconds(static_cast<int>(1000 + i)),
- [&conns, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- conns[i] = std::move(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(static_cast<int>(1000 + i)),
+ [&conns, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ conns[i] = std::move(swConn.getValue());
+ });
}
auto firstNBound = [&](size_t n) {
@@ -822,7 +836,7 @@ TEST_F(ConnectionPoolTest, minPoolRespected) {
options.maxConnections = 3;
options.refreshRequirement = Milliseconds(1000);
options.refreshTimeout = Milliseconds(2000);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
@@ -833,13 +847,13 @@ TEST_F(ConnectionPoolTest, minPoolRespected) {
ConnectionPool::ConnectionHandle conn3;
// Grab one connection without returning it
- pool.get_forTest(HostAndPort(),
- Milliseconds(1000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
- conn1 = std::move(swConn.getValue());
- });
+ conn1 = std::move(swConn.getValue());
+ });
bool reachedA = false;
bool reachedB = false;
@@ -865,20 +879,20 @@ TEST_F(ConnectionPoolTest, minPoolRespected) {
ASSERT(!reachedC);
// Two more get's without returns
- pool.get_forTest(HostAndPort(),
- Milliseconds(2000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(2000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
- conn2 = std::move(swConn.getValue());
- });
- pool.get_forTest(HostAndPort(),
- Milliseconds(3000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ conn2 = std::move(swConn.getValue());
+ });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(3000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
- conn3 = std::move(swConn.getValue());
- });
+ conn3 = std::move(swConn.getValue());
+ });
ASSERT(conn2);
ASSERT(conn3);
@@ -931,7 +945,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
options.refreshRequirement = Milliseconds(5000);
options.refreshTimeout = Milliseconds(5000);
options.hostTimeout = Milliseconds(1000);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
@@ -942,13 +956,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
bool reachedA = false;
// Grab 1 connection and return it
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = verifyAndGetId(swConn);
- reachedA = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ connId = verifyAndGetId(swConn);
+ reachedA = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedA);
@@ -959,13 +973,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
// Verify that a new connection was spawned
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, verifyAndGetId(swConn));
- reachedB = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(connId, verifyAndGetId(swConn));
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedB);
}
@@ -980,7 +994,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
options.refreshRequirement = Milliseconds(5000);
options.refreshTimeout = Milliseconds(5000);
options.hostTimeout = Milliseconds(1000);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
@@ -992,13 +1006,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
// Grab and return
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = verifyAndGetId(swConn);
- reachedA = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ connId = verifyAndGetId(swConn);
+ reachedA = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedA);
// Jump almost up to the hostTimeout
@@ -1006,26 +1020,26 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
bool reachedB = false;
// Same connection
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, verifyAndGetId(swConn));
- reachedB = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_EQ(connId, verifyAndGetId(swConn));
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedB);
// Now our timeout should be 1999 ms from 'now' instead of 1000 ms
// if we do another 'get' we should still get the original connection
PoolImpl::setNow(now + Milliseconds(1500));
bool reachedB2 = false;
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, verifyAndGetId(swConn));
- reachedB2 = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_EQ(connId, verifyAndGetId(swConn));
+ reachedB2 = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedB2);
// We should time out when we get to 'now' + 2500 ms
@@ -1034,13 +1048,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
bool reachedC = false;
// Different id
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, verifyAndGetId(swConn));
- reachedC = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(connId, verifyAndGetId(swConn));
+ reachedC = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedC);
}
@@ -1055,7 +1069,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
options.refreshRequirement = Milliseconds(5000);
options.refreshTimeout = Milliseconds(5000);
options.hostTimeout = Milliseconds(1000);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
@@ -1067,23 +1081,23 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
// save 1 connection
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
- conn1 = std::move(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn1Id = verifyAndGetId(swConn);
+ conn1 = std::move(swConn.getValue());
+ });
ASSERT(conn1Id);
// return the second
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn2Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
ASSERT(conn2Id);
@@ -1093,13 +1107,13 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
bool reachedA = false;
// conn 2 is still there
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(conn2Id, verifyAndGetId(swConn));
- reachedA = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_EQ(conn2Id, verifyAndGetId(swConn));
+ reachedA = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedA);
@@ -1114,14 +1128,14 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
// make sure that this is a new id
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(conn1Id, verifyAndGetId(swConn));
- ASSERT_NE(conn2Id, verifyAndGetId(swConn));
- reachedB = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(conn1Id, verifyAndGetId(swConn));
+ ASSERT_NE(conn2Id, verifyAndGetId(swConn));
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedB);
}
@@ -1135,7 +1149,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
options.maxConnections = 1;
options.refreshRequirement = Seconds(1);
options.refreshTimeout = Seconds(2);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
@@ -1143,22 +1157,22 @@ TEST_F(ConnectionPoolTest, dropConnections) {
// Grab the first connection id
size_t conn1Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn1Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
ASSERT(conn1Id);
// Grab it and this time keep it out of the pool
ConnectionPool::ConnectionHandle handle;
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(verifyAndGetId(swConn), conn1Id);
- handle = std::move(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_EQ(verifyAndGetId(swConn), conn1Id);
+ handle = std::move(swConn.getValue());
+ });
ASSERT(handle);
@@ -1166,16 +1180,16 @@ TEST_F(ConnectionPoolTest, dropConnections) {
// Queue up a request. This won't fire until we drop connections, then it
// will fail.
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(!swConn.isOK());
- reachedA = true;
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(!swConn.isOK());
+ reachedA = true;
+ });
ASSERT(!reachedA);
// fails the previous get
- pool.dropConnections(HostAndPort());
+ pool->dropConnections(HostAndPort());
ASSERT(reachedA);
@@ -1187,20 +1201,20 @@ TEST_F(ConnectionPoolTest, dropConnections) {
// connection
size_t conn2Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
- ASSERT_NE(conn2Id, conn1Id);
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn2Id = verifyAndGetId(swConn);
+ ASSERT_NE(conn2Id, conn1Id);
+ doneWith(swConn.getValue());
+ });
ASSERT(conn2Id);
// Push conn2 into refresh
PoolImpl::setNow(now + Milliseconds(1500));
// drop the connections
- pool.dropConnections(HostAndPort());
+ pool->dropConnections(HostAndPort());
// refresh still pending
PoolImpl::setNow(now + Milliseconds(2500));
@@ -1209,13 +1223,13 @@ TEST_F(ConnectionPoolTest, dropConnections) {
// being pending
bool reachedB = false;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(verifyAndGetId(swConn), conn2Id);
- reachedB = true;
- doneWith(swConn.getValue());
- });
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(verifyAndGetId(swConn), conn2Id);
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
ASSERT(reachedB);
}
@@ -1228,13 +1242,13 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) {
options.maxConnections = 1;
options.refreshTimeout = Seconds(2);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn1;
- pool.get_forTest(
+ pool->get_forTest(
HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
conn1 = std::move(swConn);
});
@@ -1248,7 +1262,7 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) {
ASSERT(!conn1);
// Get conn2 (which should have an extra second before the timeout)
- pool.get_forTest(
+ pool->get_forTest(
HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress);
});
@@ -1269,7 +1283,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
options.maxConnections = 1;
options.refreshTimeout = Seconds(2);
options.refreshRequirement = Seconds(3);
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
@@ -1277,7 +1291,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
// Successfully get a new connection
size_t conn1Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(
+ pool->get_forTest(
HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
@@ -1288,7 +1302,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
PoolImpl::setNow(now + Seconds(3));
boost::optional<StatusWith<ConnectionPool::ConnectionHandle>> conn1;
- pool.get_forTest(
+ pool->get_forTest(
HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
conn1 = std::move(swConn);
});
@@ -1301,7 +1315,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
ASSERT(!conn1);
// Get conn2 (which should have an extra second before the timeout)
- pool.get_forTest(
+ pool->get_forTest(
HostAndPort(), Seconds(10), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress);
});
@@ -1313,8 +1327,8 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
ASSERT(conn1->getStatus().code() == ErrorCodes::NetworkInterfaceExceededTimeLimit);
}
-template <typename T>
-void dropConnectionsTest(ConnectionPool& pool, T& t) {
+template <typename Ptr>
+void dropConnectionsTest(std::shared_ptr<ConnectionPool> const& pool, Ptr t) {
auto now = Date_t::now();
PoolImpl::setNow(now);
@@ -1325,69 +1339,70 @@ void dropConnectionsTest(ConnectionPool& pool, T& t) {
// Successfully get connections to two hosts
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(hap1, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ pool->get_forTest(hap1, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
doneWith(swConn.getValue());
});
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(hap2, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ pool->get_forTest(hap2, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
doneWith(swConn.getValue());
});
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(hap3, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ pool->get_forTest(hap3, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
doneWith(swConn.getValue());
});
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap1));
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap2));
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap3));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap1));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap2));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap3));
- t.dropConnections(transport::Session::kPending);
+ t->dropConnections(transport::Session::kPending);
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap1));
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap2));
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap3));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap1));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap2));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap3));
- t.mutateTags(hap1, [](transport::Session::TagMask tags) {
+ t->mutateTags(hap1, [](transport::Session::TagMask tags) {
return transport::Session::kKeepOpen | transport::Session::kInternalClient;
});
- t.mutateTags(hap2,
- [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; });
+ t->mutateTags(hap2,
+ [](transport::Session::TagMask tags) { return transport::Session::kKeepOpen; });
- t.mutateTags(
+ t->mutateTags(
hap3, [](transport::Session::TagMask tags) { return transport::Session::kEmptyTagMask; });
- t.dropConnections(transport::Session::kKeepOpen);
+ t->dropConnections(transport::Session::kKeepOpen);
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap1));
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap2));
- ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap3));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap1));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap2));
+ ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap3));
- t.dropConnections(transport::Session::kInternalClient);
+ t->dropConnections(transport::Session::kInternalClient);
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap1));
- ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap2));
- ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap3));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap1));
+ ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap2));
+ ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap3));
ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(hap4, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ pool->get_forTest(hap4, Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
doneWith(swConn.getValue());
});
// drop connections by hostAndPort
- t.dropConnections(hap1);
+ t->dropConnections(hap1);
- ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap1));
- ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap2));
- ASSERT_EQ(0ul, pool.getNumConnectionsPerHost(hap3));
- ASSERT_EQ(1ul, pool.getNumConnectionsPerHost(hap4));
+ ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap1));
+ ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap2));
+ ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap3));
+ ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap4));
}
TEST_F(ConnectionPoolTest, DropConnections) {
ConnectionPool::Options options;
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ options.minConnections = 0;
+ auto pool = makePool(options);
dropConnectionsTest(pool, pool);
}
@@ -1395,16 +1410,17 @@ TEST_F(ConnectionPoolTest, DropConnections) {
TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) {
EgressTagCloserManager manager;
ConnectionPool::Options options;
+ options.minConnections = 0;
options.egressTagCloserManager = &manager;
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
- dropConnectionsTest(pool, manager);
+ dropConnectionsTest(pool, &manager);
}
TEST_F(ConnectionPoolTest, AsyncGet) {
ConnectionPool::Options options;
options.maxConnections = 1;
- ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
+ auto pool = makePool(options);
auto now = Date_t::now();
PoolImpl::setNow(now);
@@ -1414,7 +1430,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
size_t connId = 0;
// no connections in the pool, our future is not satisfied
- auto connFuture = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ auto connFuture = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
ASSERT_FALSE(connFuture.isReady());
// Successfully get a new connection
@@ -1438,8 +1454,8 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
size_t connId2 = 0;
size_t connId3 = 0;
- auto connFuture1 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
- auto connFuture2 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10});
+ auto connFuture1 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ auto connFuture2 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10});
// Queue up the second future to resolve as soon as it is ready
std::move(connFuture2).getAsync([&](StatusWithConn swConn) mutable {
@@ -1455,7 +1471,7 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
decltype(connFuture1) connFuture3;
std::move(connFuture1).getAsync([&](StatusWithConn swConn) mutable {
// Grab our third future while our first one is being fulfilled
- connFuture3 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ connFuture3 = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
connId1 = verifyAndGetId(swConn);
doneWith(swConn.getValue());
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
index 86adce43faa..6c0321fa619 100644
--- a/src/mongo/executor/connection_pool_test_fixture.cpp
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -226,6 +226,10 @@ std::shared_ptr<ConnectionPool::TimerInterface> PoolImpl::makeTimer() {
return stdx::make_unique<TimerImpl>(this);
}
+const std::shared_ptr<OutOfLineExecutor>& PoolImpl::getExecutor() {
+ return _executor;
+}
+
Date_t PoolImpl::now() {
return _now.get_value_or(Date_t::now());
}
diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h
index 2ecb6ca5279..cbd6657a81a 100644
--- a/src/mongo/executor/connection_pool_test_fixture.h
+++ b/src/mongo/executor/connection_pool_test_fixture.h
@@ -139,8 +139,27 @@ private:
class InlineOutOfLineExecutor : public OutOfLineExecutor {
public:
void schedule(Task task) override {
- std::move(task)(Status::OK());
+ // Add the task to our queue
+ _taskQueue.emplace_back(std::move(task));
+
+ // Make sure we're not already inline executing
+ if (std::exchange(_inSchedule, true)) {
+ return;
+ }
+
+ // Clear out our queue
+ while (!_taskQueue.empty()) {
+ auto task = std::move(_taskQueue.front());
+ std::move(task)(Status::OK());
+ _taskQueue.pop_front();
+ }
+
+ // Admit we're not working on the queue anymore
+ _inSchedule = false;
}
+
+ bool _inSchedule;
+ std::deque<Task> _taskQueue;
};
/**
@@ -150,6 +169,7 @@ class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface {
friend class ConnectionImpl;
public:
+ PoolImpl() = default;
std::shared_ptr<ConnectionPool::ConnectionInterface> makeConnection(
const HostAndPort& hostAndPort,
transport::ConnectSSLMode sslMode,
@@ -157,10 +177,7 @@ public:
std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override;
- OutOfLineExecutor& getExecutor() override {
- static InlineOutOfLineExecutor _executor;
- return _executor;
- }
+ const std::shared_ptr<OutOfLineExecutor>& getExecutor() override;
Date_t now() override;
@@ -175,6 +192,7 @@ public:
private:
ConnectionPool* _pool = nullptr;
+ std::shared_ptr<OutOfLineExecutor> _executor = std::make_shared<InlineOutOfLineExecutor>();
static boost::optional<Date_t> _now;
};
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index e4a0d9e35ae..44ca5e60f79 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -327,10 +327,14 @@ void TLConnection::cancelAsync() {
_client->cancel();
}
+auto TLTypeFactory::reactor() {
+ return checked_pointer_cast<transport::Reactor>(_executor);
+}
+
std::shared_ptr<ConnectionPool::ConnectionInterface> TLTypeFactory::makeConnection(
const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, size_t generation) {
auto conn = std::make_shared<TLConnection>(shared_from_this(),
- _reactor,
+ reactor(),
getGlobalServiceContext(),
hostAndPort,
sslMode,
@@ -342,13 +346,13 @@ std::shared_ptr<ConnectionPool::ConnectionInterface> TLTypeFactory::makeConnecti
}
std::shared_ptr<ConnectionPool::TimerInterface> TLTypeFactory::makeTimer() {
- auto timer = std::make_shared<TLTimer>(shared_from_this(), _reactor);
+ auto timer = std::make_shared<TLTimer>(shared_from_this(), reactor());
fasten(timer.get());
return timer;
}
Date_t TLTypeFactory::now() {
- return _reactor->now();
+ return checked_cast<transport::Reactor*>(_executor.get())->now();
}
} // namespace connection_pool_tl
diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h
index 31317c280ef..7a138589055 100644
--- a/src/mongo/executor/connection_pool_tl.h
+++ b/src/mongo/executor/connection_pool_tl.h
@@ -50,7 +50,7 @@ public:
transport::TransportLayer* tl,
std::unique_ptr<NetworkConnectionHook> onConnectHook,
const ConnectionPool::Options& connPoolOptions)
- : _reactor(std::move(reactor)),
+ : _executor(std::move(reactor)),
_tl(tl),
_onConnectHook(std::move(onConnectHook)),
_connPoolOptions(connPoolOptions) {}
@@ -60,8 +60,8 @@ public:
transport::ConnectSSLMode sslMode,
size_t generation) override;
std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override;
- OutOfLineExecutor& getExecutor() override {
- return *_reactor;
+ const std::shared_ptr<OutOfLineExecutor>& getExecutor() override {
+ return _executor;
}
Date_t now() override;
@@ -72,7 +72,9 @@ public:
void release(Type* type);
private:
- transport::ReactorHandle _reactor;
+ auto reactor();
+
+ std::shared_ptr<OutOfLineExecutor> _executor; // This is always a transport::Reactor
transport::TransportLayer* _tl;
std::unique_ptr<NetworkConnectionHook> _onConnectHook;
const ConnectionPool::Options _connPoolOptions;
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index a760e6f120d..643cce32ae9 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -98,7 +98,7 @@ void NetworkInterfaceTL::startup() {
_reactor = _tl->getReactor(transport::TransportLayer::kNewReactor);
auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>(
_reactor, _tl, std::move(_onConnectHook), _connPoolOpts);
- _pool = std::make_unique<ConnectionPool>(
+ _pool = std::make_shared<ConnectionPool>(
std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts);
_ioThread = stdx::thread([this] {
setThreadName(_instanceName);
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index d6f50032111..1906220a96f 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -141,7 +141,7 @@ private:
mutable stdx::mutex _mutex;
ConnectionPool::Options _connPoolOpts;
std::unique_ptr<NetworkConnectionHook> _onConnectHook;
- std::unique_ptr<ConnectionPool> _pool;
+ std::shared_ptr<ConnectionPool> _pool;
Counters _counters;
std::unique_ptr<rpc::EgressMetadataHook> _metadataHook;
diff --git a/src/mongo/util/out_of_line_executor.h b/src/mongo/util/out_of_line_executor.h
index 8371d878e3f..ff1f4a511a1 100644
--- a/src/mongo/util/out_of_line_executor.h
+++ b/src/mongo/util/out_of_line_executor.h
@@ -70,8 +70,7 @@ public:
*/
virtual void schedule(Task func) = 0;
-protected:
- ~OutOfLineExecutor() noexcept {}
+ virtual ~OutOfLineExecutor() = default;
};
using ExecutorPtr = std::shared_ptr<OutOfLineExecutor>;