summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/connpool.cpp1
-rw-r--r--src/mongo/executor/SConscript1
-rw-r--r--src/mongo/executor/connection_pool.cpp137
-rw-r--r--src/mongo/executor/connection_pool.h38
-rw-r--r--src/mongo/executor/connection_pool_stats.cpp8
-rw-r--r--src/mongo/executor/connection_pool_stats.h3
-rw-r--r--src/mongo/executor/connection_pool_test.cpp228
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp5
-rw-r--r--src/mongo/executor/network_interface.h41
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp25
-rw-r--r--src/mongo/executor/network_interface_mock.h6
-rw-r--r--src/mongo/executor/network_interface_tl.cpp30
-rw-r--r--src/mongo/executor/network_interface_tl.h30
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.cpp2
14 files changed, 481 insertions, 74 deletions
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp
index d155e238c2c..a1fd97e3e9d 100644
--- a/src/mongo/client/connpool.cpp
+++ b/src/mongo/client/connpool.cpp
@@ -618,6 +618,7 @@ void DBConnectionPool::appendConnectionStats(executor::ConnectionPoolStats* stat
executor::ConnectionStatsPer hostStats{static_cast<size_t>(i->second.numInUse()),
static_cast<size_t>(i->second.numAvailable()),
+ 0,
static_cast<size_t>(i->second.numCreated()),
0,
0,
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index d8b929cd37b..095ee4acba7 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -383,6 +383,7 @@ env.CppIntegrationTest(
'thread_pool_task_executor_integration_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/client/async_client',
'$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/wire_version',
'$BUILD_DIR/mongo/executor/network_interface_factory',
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index c4c70b74916..4cc00399900 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -146,8 +146,8 @@ std::string ConnectionPool::ConnectionControls::toString() const {
}
std::string ConnectionPool::HostState::toString() const {
- return "{{ requests: {}, ready: {}, pending: {}, active: {}, isExpired: {} }}"_format(
- requests, ready, pending, active, health.isExpired);
+ return "{{ requests: {}, ready: {}, pending: {}, active: {}, leased: {}, isExpired: {} }}"_format(
+ requests, ready, pending, active, leased, health.isExpired);
}
/**
@@ -171,7 +171,7 @@ public:
const auto minConns = getPool()->_options.minConnections;
const auto maxConns = getPool()->_options.maxConnections;
- data.target = stats.requests + stats.active;
+ data.target = stats.requests + stats.active + stats.leased;
if (data.target < minConns) {
data.target = minConns;
} else if (data.target > maxConns) {
@@ -284,7 +284,9 @@ public:
* Gets a connection from the specific pool. Sinks a unique_lock from the
* parent to preserve the lock on _mutex
*/
- Future<ConnectionHandle> getConnection(Milliseconds timeout, ErrorCodes::Error timeoutCode);
+ Future<ConnectionHandle> getConnection(Milliseconds timeout,
+ bool lease,
+ ErrorCodes::Error timeoutCode);
/**
* Triggers the shutdown procedure. This function sets isShutdown to true
@@ -307,6 +309,11 @@ public:
size_t inUseConnections() const;
/**
+ * Returns the number of leased connections from the pool.
+ */
+ size_t leasedConnections() const;
+
+ /**
* Returns the number of available connections in the pool.
*/
size_t availableConnections() const;
@@ -344,7 +351,7 @@ public:
/**
* Returns the total number of connections currently open that belong to
* this pool. This is the sum of refreshingConnections, availableConnections,
- * and inUseConnections.
+ * inUseConnections, and leasedConnections.
*/
size_t openConnections() const;
@@ -404,6 +411,8 @@ private:
struct Request {
Date_t expiration;
Promise<ConnectionHandle> promise;
+ // Whether or not the requested connection should be "leased".
+ bool lease;
ErrorCodes::Error timeoutCode;
};
@@ -413,7 +422,7 @@ private:
}
};
- ConnectionHandle makeHandle(ConnectionInterface* connection);
+ ConnectionHandle makeHandle(ConnectionInterface* connection, bool isLeased);
/**
* Given a uniquely-owned OwnedConnection, returns an OwnedConnection
@@ -451,11 +460,11 @@ private:
void fulfillRequests();
- void returnConnection(ConnectionInterface* connPtr);
+ void returnConnection(ConnectionInterface* connPtr, bool isLeased);
// This internal helper is used both by get and by _fulfillRequests and differs in that it
// skips some bookkeeping that the other callers do on their own
- ConnectionHandle tryGetConnection();
+ ConnectionHandle tryGetConnection(bool lease);
template <typename OwnershipPoolType>
typename OwnershipPoolType::mapped_type takeFromPool(
@@ -484,6 +493,7 @@ private:
OwnershipPool _processingPool;
OwnershipPool _droppedProcessingPool;
OwnershipPool _checkedOutPool;
+ OwnershipPool _leasedPool;
std::vector<Request> _requests;
Date_t _lastActiveTime;
@@ -626,24 +636,40 @@ void ConnectionPool::mutateTags(
pool->mutateTags(mutateFunc);
}
-void ConnectionPool::get_forTest(const HostAndPort& hostAndPort,
- Milliseconds timeout,
- ErrorCodes::Error timeoutCode,
- GetConnectionCallback cb) {
+void ConnectionPool::retrieve_forTest(RetrieveConnection retrieve, GetConnectionCallback cb) {
// We kick ourselves onto the executor queue to prevent us from deadlocking with our own thread
auto getConnectionFunc =
- [this, hostAndPort, timeout, timeoutCode, cb = std::move(cb)](Status&&) mutable {
- get(hostAndPort, transport::kGlobalSSLMode, timeout, timeoutCode)
- .thenRunOn(_factory->getExecutor())
- .getAsync(std::move(cb));
+ [this, retrieve = std::move(retrieve), cb = std::move(cb)](Status&&) mutable {
+ retrieve().thenRunOn(_factory->getExecutor()).getAsync(std::move(cb));
};
_factory->getExecutor()->schedule(std::move(getConnectionFunc));
}
-SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort,
- transport::ConnectSSLMode sslMode,
- Milliseconds timeout,
- ErrorCodes::Error timeoutCode) {
+void ConnectionPool::get_forTest(const HostAndPort& hostAndPort,
+ Milliseconds timeout,
+ ErrorCodes::Error timeoutCode,
+ GetConnectionCallback cb) {
+ auto getConnectionFunc = [this, hostAndPort, timeout, timeoutCode]() mutable {
+ return get(hostAndPort, transport::kGlobalSSLMode, timeout, timeoutCode);
+ };
+ retrieve_forTest(getConnectionFunc, std::move(cb));
+}
+
+void ConnectionPool::lease_forTest(const HostAndPort& hostAndPort,
+ Milliseconds timeout,
+ ErrorCodes::Error timeoutCode,
+ GetConnectionCallback cb) {
+ auto getConnectionFunc = [this, hostAndPort, timeout, timeoutCode]() mutable {
+ return lease(hostAndPort, transport::kGlobalSSLMode, timeout, timeoutCode);
+ };
+ retrieve_forTest(getConnectionFunc, std::move(cb));
+}
+
+SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::_get(const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout,
+ bool lease,
+ ErrorCodes::Error timeoutCode) {
auto connRequestedAt = _factory->now();
stdx::lock_guard lk(_mutex);
@@ -657,10 +683,12 @@ SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPo
invariant(pool);
- auto connFuture = pool->getConnection(timeout, timeoutCode);
+ auto connFuture = pool->getConnection(timeout, lease, timeoutCode);
pool->updateState();
- if (gFeatureFlagConnHealthMetrics.isEnabledAndIgnoreFCV()) {
+ // Only count connections being checked-out for ordinary use, not lease, towards cumulative wait
+ // time.
+ if (gFeatureFlagConnHealthMetrics.isEnabledAndIgnoreFCV() && !lease) {
connFuture = std::move(connFuture).tap([connRequestedAt, pool = pool](const auto& conn) {
pool->recordConnectionWaitTime(connRequestedAt);
});
@@ -679,6 +707,7 @@ void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const {
auto& pool = kv.second;
ConnectionStatsPer hostStats{pool->inUseConnections(),
pool->availableConnections(),
+ pool->leasedConnections(),
pool->createdConnections(),
pool->refreshingConnections(),
pool->refreshedConnections(),
@@ -721,6 +750,7 @@ ConnectionPool::SpecificPool::~SpecificPool() {
if (shouldInvariantOnPoolCorrectness()) {
invariant(_requests.empty());
invariant(_checkedOutPool.empty());
+ invariant(_leasedPool.empty());
}
}
@@ -732,6 +762,10 @@ size_t ConnectionPool::SpecificPool::availableConnections() const {
return _readyPool.size();
}
+size_t ConnectionPool::SpecificPool::leasedConnections() const {
+ return _leasedPool.size();
+}
+
size_t ConnectionPool::SpecificPool::refreshingConnections() const {
return _processingPool.size();
}
@@ -757,7 +791,7 @@ Milliseconds ConnectionPool::SpecificPool::getTotalConnUsageTime() const {
}
size_t ConnectionPool::SpecificPool::openConnections() const {
- return _checkedOutPool.size() + _readyPool.size() + _processingPool.size();
+ return _checkedOutPool.size() + _readyPool.size() + _processingPool.size() + _leasedPool.size();
}
size_t ConnectionPool::SpecificPool::requestsPending() const {
@@ -765,7 +799,7 @@ size_t ConnectionPool::SpecificPool::requestsPending() const {
}
Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection(
- Milliseconds timeout, ErrorCodes::Error timeoutCode) {
+ Milliseconds timeout, bool lease, ErrorCodes::Error timeoutCode) {
// Reset our activity timestamp
auto now = _parent->_factory->now();
@@ -799,7 +833,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
// If we do not have requests, then we can fulfill immediately
if (_requests.size() == 0) {
- auto conn = tryGetConnection();
+ auto conn = tryGetConnection(lease);
if (conn) {
LOGV2_DEBUG(22559,
@@ -821,26 +855,32 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
const auto expiration = now + timeout;
auto pf = makePromiseFuture<ConnectionHandle>();
- _requests.push_back({expiration, std::move(pf.promise), timeoutCode});
+ _requests.push_back({expiration, std::move(pf.promise), lease, timeoutCode});
std::push_heap(begin(_requests), end(_requests), RequestComparator{});
return std::move(pf.future);
}
-auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle {
+auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection, bool isLeased)
+ -> ConnectionHandle {
auto connUseStartedAt = _parent->_getFastClockSource()->now();
- auto deleter =
- [this, anchor = shared_from_this(), connUseStartedAt](ConnectionInterface* connection) {
- stdx::lock_guard lk(_parent->_mutex);
+ auto deleter = [this, anchor = shared_from_this(), connUseStartedAt, isLeased](
+ ConnectionInterface* connection) {
+ stdx::lock_guard lk(_parent->_mutex);
+
+ // Leased connections don't count towards the pool's total connection usage time.
+ if (!isLeased) {
_totalConnUsageTime += _parent->_getFastClockSource()->now() - connUseStartedAt;
- returnConnection(connection);
- _lastActiveTime = _parent->_factory->now();
- updateState();
- };
+ }
+
+ returnConnection(connection, isLeased);
+ _lastActiveTime = _parent->_factory->now();
+ updateState();
+ };
return ConnectionHandle(connection, std::move(deleter));
}
-ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection() {
+ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(bool lease) {
while (_readyPool.size()) {
// _readyPool is an LRUCache, so its begin() object is the MRU item.
auto iter = _readyPool.begin();
@@ -862,12 +902,15 @@ ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(
auto connPtr = conn.get();
- // check out the connection
- _checkedOutPool[connPtr] = std::move(conn);
+ if (lease) {
+ _leasedPool[connPtr] = std::move(conn);
+ } else {
+ _checkedOutPool[connPtr] = std::move(conn);
+ }
// pass it to the user
connPtr->resetToUnknown();
- auto handle = makeHandle(connPtr);
+ auto handle = makeHandle(connPtr, lease);
return handle;
}
@@ -948,10 +991,10 @@ void ConnectionPool::SpecificPool::finishRefresh(ConnectionInterface* connPtr, S
fulfillRequests();
}
-void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr) {
+void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, bool isLeased) {
auto needsRefreshTP = connPtr->getLastUsed() + _parent->_controller->toRefreshTimeout();
- auto conn = takeFromPool(_checkedOutPool, connPtr);
+ auto conn = takeFromPool(isLeased ? _leasedPool : _checkedOutPool, connPtr);
invariant(conn);
if (_health.isShutdown) {
@@ -986,8 +1029,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
if (shouldRefreshConnection) {
auto controls = _parent->_controller->getControls(_id);
- if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >=
- controls.targetConnections) {
+ if (openConnections() >= controls.targetConnections) {
// If we already have minConnections, just let the connection lapse
LOGV2(22567,
"Ending idle connection to host {hostAndPort} because the pool meets "
@@ -1054,7 +1096,7 @@ void ConnectionPool::SpecificPool::addToReady(OwnedConnection conn) {
connPtr->indicateSuccess();
- returnConnection(connPtr);
+ returnConnection(connPtr, false);
});
connPtr->setTimeout(_parent->_controller->toRefreshTimeout(), std::move(returnConnectionFunc));
}
@@ -1143,7 +1185,7 @@ void ConnectionPool::SpecificPool::fulfillRequests() {
// deadlock).
//
// None of the heap manipulation code throws, but it's something to keep in mind.
- auto conn = tryGetConnection();
+ auto conn = tryGetConnection(_requests.front().lease);
if (!conn) {
break;
@@ -1259,7 +1301,8 @@ void ConnectionPool::SpecificPool::updateHealth() {
const auto now = _parent->_factory->now();
// We're expired if we have no sign of connection use and are past our expiry
- _health.isExpired = _requests.empty() && _checkedOutPool.empty() && (_hostExpiration <= now);
+ _health.isExpired = _requests.empty() && _checkedOutPool.empty() && _leasedPool.empty() &&
+ (_hostExpiration <= now);
// We're failed until we get new requests or our timer triggers
if (_health.isFailed) {
@@ -1277,7 +1320,7 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
}
// If our expiration comes before our next event, then it is the next event
- if (_requests.empty() && _checkedOutPool.empty()) {
+ if (_requests.empty() && _checkedOutPool.empty() && _leasedPool.empty()) {
_hostExpiration = _lastActiveTime + _parent->_controller->hostTimeout();
if ((_hostExpiration > now) && (_hostExpiration < nextEventTime)) {
nextEventTime = _hostExpiration;
@@ -1343,6 +1386,7 @@ void ConnectionPool::SpecificPool::updateController() {
refreshingConnections(),
availableConnections(),
inUseConnections(),
+ leasedConnections(),
};
LOGV2_DEBUG(22578,
kDiagnosticLogLevel,
@@ -1381,6 +1425,7 @@ void ConnectionPool::SpecificPool::updateController() {
if (shouldInvariantOnPoolCorrectness()) {
invariant(pool->_checkedOutPool.empty());
invariant(pool->_requests.empty());
+ invariant(pool->_leasedPool.empty());
}
pool->triggerShutdown(Status(ErrorCodes::ConnectionPoolExpired,
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 74f4a3adc88..de1adc2a5fa 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -81,6 +81,7 @@ public:
using ConnectionHandleDeleter = std::function<void(ConnectionInterface* connection)>;
using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>;
+ using RetrieveConnection = unique_function<SemiFuture<ConnectionHandle>()>;
using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>;
using PoolId = uint64_t;
@@ -212,6 +213,7 @@ public:
size_t pending = 0;
size_t ready = 0;
size_t active = 0;
+ size_t leased = 0;
std::string toString() const;
};
@@ -254,16 +256,39 @@ public:
const std::function<transport::Session::TagMask(transport::Session::TagMask)>&
mutateFunc) override;
- SemiFuture<ConnectionHandle> get(
+ inline SemiFuture<ConnectionHandle> get(
const HostAndPort& hostAndPort,
transport::ConnectSSLMode sslMode,
Milliseconds timeout,
- ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit);
+ ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ return _get(hostAndPort, sslMode, timeout, false /*lease*/, timeoutCode);
+ }
+
void get_forTest(const HostAndPort& hostAndPort,
Milliseconds timeout,
ErrorCodes::Error timeoutCode,
GetConnectionCallback cb);
+ /**
+ * "Lease" a connection from the pool.
+ *
+ * Connections retrieved via this method are not assumed to be in active use for the duration of
+ * their lease and are reported separately in metrics. Otherwise, this method behaves similarly
+ * to `ConnectionPool::get`.
+ */
+ inline SemiFuture<ConnectionHandle> lease(
+ const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout,
+ ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ return _get(hostAndPort, sslMode, timeout, true /*lease*/, timeoutCode);
+ }
+
+ void lease_forTest(const HostAndPort& hostAndPort,
+ Milliseconds timeout,
+ ErrorCodes::Error timeoutCode,
+ GetConnectionCallback cb);
+
void appendConnectionStats(ConnectionPoolStats* stats) const;
size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const;
@@ -275,6 +300,15 @@ public:
private:
ClockSource* _getFastClockSource() const;
+ SemiFuture<ConnectionHandle> _get(
+ const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout,
+ bool leased,
+ ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit);
+
+ void retrieve_forTest(RetrieveConnection retrieve, GetConnectionCallback cb);
+
std::string _name;
const std::shared_ptr<DependentTypeFactoryInterface> _factory;
diff --git a/src/mongo/executor/connection_pool_stats.cpp b/src/mongo/executor/connection_pool_stats.cpp
index 83f48c923f8..42ea5027b90 100644
--- a/src/mongo/executor/connection_pool_stats.cpp
+++ b/src/mongo/executor/connection_pool_stats.cpp
@@ -48,6 +48,7 @@ constexpr auto kAcquisitionWaitTimesKey = "acquisitionWaitTimes"_sd;
ConnectionStatsPer::ConnectionStatsPer(size_t nInUse,
size_t nAvailable,
+ size_t nLeased,
size_t nCreated,
size_t nRefreshing,
size_t nRefreshed,
@@ -56,6 +57,7 @@ ConnectionStatsPer::ConnectionStatsPer(size_t nInUse,
Milliseconds nConnUsageTime)
: inUse(nInUse),
available(nAvailable),
+ leased(nLeased),
created(nCreated),
refreshing(nRefreshing),
refreshed(nRefreshed),
@@ -68,6 +70,7 @@ ConnectionStatsPer::ConnectionStatsPer() = default;
ConnectionStatsPer& ConnectionStatsPer::operator+=(const ConnectionStatsPer& other) {
inUse += other.inUse;
available += other.available;
+ leased += other.leased;
created += other.created;
refreshing += other.refreshing;
refreshed += other.refreshed;
@@ -98,6 +101,7 @@ void ConnectionPoolStats::updateStatsForHost(std::string pool,
// Update total connection stats.
totalInUse += newStats.inUse;
totalAvailable += newStats.available;
+ totalLeased += newStats.leased;
totalCreated += newStats.created;
totalRefreshing += newStats.refreshing;
totalRefreshed += newStats.refreshed;
@@ -112,6 +116,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT
result.appendNumber("totalInUse", static_cast<long long>(totalInUse));
result.appendNumber("totalAvailable", static_cast<long long>(totalAvailable));
+ result.appendNumber("totalLeased", static_cast<long long>(totalLeased));
result.appendNumber("totalCreated", static_cast<long long>(totalCreated));
result.appendNumber("totalRefreshing", static_cast<long long>(totalRefreshing));
result.appendNumber("totalRefreshed", static_cast<long long>(totalRefreshed));
@@ -156,6 +161,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT
BSONObjBuilder poolInfo(poolBuilder.subobjStart(pool));
poolInfo.appendNumber("poolInUse", static_cast<long long>(stats.inUse));
poolInfo.appendNumber("poolAvailable", static_cast<long long>(stats.available));
+ poolInfo.appendNumber("poolLeased", static_cast<long long>(stats.leased));
poolInfo.appendNumber("poolCreated", static_cast<long long>(stats.created));
poolInfo.appendNumber("poolRefreshing", static_cast<long long>(stats.refreshing));
poolInfo.appendNumber("poolRefreshed", static_cast<long long>(stats.refreshed));
@@ -169,6 +175,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT
BSONObjBuilder hostInfo(poolInfo.subobjStart(host.toString()));
hostInfo.appendNumber("inUse", static_cast<long long>(stats.inUse));
hostInfo.appendNumber("available", static_cast<long long>(stats.available));
+ hostInfo.appendNumber("leased", static_cast<long long>(stats.leased));
hostInfo.appendNumber("created", static_cast<long long>(stats.created));
hostInfo.appendNumber("refreshing", static_cast<long long>(stats.refreshing));
hostInfo.appendNumber("refreshed", static_cast<long long>(stats.refreshed));
@@ -188,6 +195,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT
BSONObjBuilder hostInfo(hostBuilder.subobjStart(host.toString()));
hostInfo.appendNumber("inUse", static_cast<long long>(stats.inUse));
hostInfo.appendNumber("available", static_cast<long long>(stats.available));
+ hostInfo.appendNumber("leased", static_cast<long long>(stats.leased));
hostInfo.appendNumber("created", static_cast<long long>(stats.created));
hostInfo.appendNumber("refreshing", static_cast<long long>(stats.refreshing));
hostInfo.appendNumber("refreshed", static_cast<long long>(stats.refreshed));
diff --git a/src/mongo/executor/connection_pool_stats.h b/src/mongo/executor/connection_pool_stats.h
index f6f62ba2f6c..dec837141b0 100644
--- a/src/mongo/executor/connection_pool_stats.h
+++ b/src/mongo/executor/connection_pool_stats.h
@@ -56,6 +56,7 @@ public:
struct ConnectionStatsPer {
ConnectionStatsPer(size_t nInUse,
size_t nAvailable,
+ size_t nLeased,
size_t nCreated,
size_t nRefreshing,
size_t nRefreshed,
@@ -69,6 +70,7 @@ struct ConnectionStatsPer {
size_t inUse = 0u;
size_t available = 0u;
+ size_t leased = 0u;
size_t created = 0u;
size_t refreshing = 0u;
size_t refreshed = 0u;
@@ -91,6 +93,7 @@ struct ConnectionPoolStats {
size_t totalInUse = 0u;
size_t totalAvailable = 0u;
+ size_t totalLeased = 0u;
size_t totalCreated = 0u;
size_t totalRefreshing = 0u;
size_t totalRefreshed = 0u;
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index feaabe0bd32..1c657e26ec8 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -29,6 +29,8 @@
#include "mongo/executor/connection_pool_test_fixture.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/net/hostandport.h"
#include <algorithm>
#include <memory>
#include <random>
@@ -39,6 +41,7 @@
#include <fmt/ostream.h>
#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/connection_pool_stats.h"
#include "mongo/stdx/future.h"
#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/unittest/unittest.h"
@@ -50,6 +53,8 @@ namespace connection_pool_test_details {
class ConnectionPoolTest : public unittest::Test {
public:
+ constexpr static Milliseconds kNoTimeout = Milliseconds{-1};
+
protected:
void setUp() override {}
@@ -180,6 +185,8 @@ TEST_F(ConnectionPoolTest, SameConn) {
*/
TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
auto pool = makePool();
+ std::random_device rd;
+ std::mt19937 rng(rd());
// Obtain a set of connections
constexpr size_t kSize = 100;
@@ -202,18 +209,25 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
}
});
+ std::uniform_int_distribution<> dist{0, 1};
for (size_t i = 0; i != kSize; ++i) {
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- ErrorCodes::NetworkInterfaceExceededTimeLimit,
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- monitors[i].exec([&]() {
- ASSERT(swConn.isOK());
- connections.push_back(std::move(swConn.getValue()));
- monitors[i].notifyDone();
- });
- });
+ auto cb = [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitors[i].exec([&]() {
+ ASSERT(swConn.isOK());
+ connections.push_back(std::move(swConn.getValue()));
+ monitors[i].notifyDone();
+ });
+ };
+ auto timeout = Milliseconds(5000);
+ auto errorCode = ErrorCodes::NetworkInterfaceExceededTimeLimit;
+
+ // Randomly lease or check out connection.
+ if (dist(rng)) {
+ pool->get_forTest(HostAndPort(), timeout, errorCode, cb);
+ } else {
+ pool->lease_forTest(HostAndPort(), timeout, errorCode, cb);
+ }
}
for (auto& monitor : monitors) {
@@ -223,8 +237,6 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
ASSERT_EQ(connections.size(), kSize);
// Shuffle them into a random order
- std::random_device rd;
- std::mt19937 rng(rd());
std::shuffle(connections.begin(), connections.end(), rng);
// Return them to the pool in that random order, recording IDs in a stack
@@ -244,19 +256,25 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
// as the IDs in the stack, since the pool returns them in MRU order.
for (size_t i = 0; i != kSize; ++i) {
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- ErrorCodes::NetworkInterfaceExceededTimeLimit,
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- monitors[i].exec([&]() {
- ASSERT(swConn.isOK());
- const auto id = verifyAndGetId(swConn);
- connections.push_back(std::move(swConn.getValue()));
- ASSERT_EQ(id, ids.top());
- ids.pop();
- monitors[i].notifyDone();
- });
- });
+ auto cb = [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitors[i].exec([&]() {
+ ASSERT(swConn.isOK());
+ const auto id = verifyAndGetId(swConn);
+ connections.push_back(std::move(swConn.getValue()));
+ ASSERT_EQ(id, ids.top());
+ ids.pop();
+ monitors[i].notifyDone();
+ });
+ };
+ auto timeout = Milliseconds(5000);
+ auto errorCode = ErrorCodes::NetworkInterfaceExceededTimeLimit;
+
+ // Randomly lease or check out connection.
+ if (dist(rng)) {
+ pool->get_forTest(HostAndPort(), timeout, errorCode, cb);
+ } else {
+ pool->lease_forTest(HostAndPort(), timeout, errorCode, cb);
+ }
}
for (auto& monitor : monitors) {
@@ -1845,6 +1863,166 @@ TEST_F(ConnectionPoolTest, ReturnAfterShutdown) {
pool->shutdown();
}
+TEST_F(ConnectionPoolTest, TotalConnUseTimeIncreasedForCheckedOutConnection) {
+ constexpr Milliseconds checkOutLength = Milliseconds(10);
+ auto pool = makePool();
+
+ ConnectionPoolStats initialStats;
+ pool->appendConnectionStats(&initialStats);
+
+ auto startTimePoint = Date_t::now();
+ auto endTimePoint = startTimePoint + checkOutLength;
+ PoolImpl::setNow(startTimePoint);
+
+ ConnectionImpl::pushSetup(Status::OK());
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ PoolImpl::setNow(endTimePoint);
+ doneWith(swConn.getValue());
+ });
+
+ ConnectionPoolStats finalStats;
+ pool->appendConnectionStats(&finalStats);
+
+ auto totalTimeUsageDelta = finalStats.totalConnUsageTime - initialStats.totalConnUsageTime;
+ ASSERT_GREATER_THAN_OR_EQUALS(totalTimeUsageDelta, checkOutLength);
+}
+
+TEST_F(ConnectionPoolTest, OverlappingCheckoutsAdditivelyContributeToTotalUsageTime) {
+ constexpr Milliseconds checkOutLength = Milliseconds(10);
+ auto pool = makePool();
+
+ ConnectionPoolStats initialStats;
+ pool->appendConnectionStats(&initialStats);
+
+ auto startTimePoint = Date_t::now();
+ auto endTimePoint = startTimePoint + checkOutLength;
+ PoolImpl::setNow(startTimePoint);
+
+ // Check out multiple connections.
+ constexpr int numConnections = 2;
+ std::vector<ConnectionPool::ConnectionHandle> connections;
+ // Ensure that no matter how we leave the test, we mark any
+ // checked out connections as OK before implicity returning them
+ // to the pool by destroying the 'connections' vector. Otherwise,
+ // this test would cause an invariant failure instead of a normal
+ // test failure if it fails, which would be confusing.
+ ScopeGuard guard([&] {
+ while (!connections.empty()) {
+ try {
+ ConnectionPool::ConnectionHandle conn = std::move(connections.back());
+ connections.pop_back();
+ doneWith(conn);
+ } catch (...) {
+ }
+ }
+ });
+
+ for (int i = 0; i < numConnections; ++i) {
+ ConnectionImpl::pushSetup(Status::OK());
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ connections.push_back(std::move(swConn.getValue()));
+ });
+ }
+ ASSERT_EQ(connections.size(), numConnections);
+ // Advance the time and return the connections.
+ PoolImpl::setNow(endTimePoint);
+ while (!connections.empty()) {
+ try {
+ ConnectionPool::ConnectionHandle conn = std::move(connections.back());
+ ASSERT(conn);
+ connections.pop_back();
+ doneWith(conn);
+ } catch (...) {
+ }
+ }
+ guard.dismiss();
+
+ ConnectionPoolStats finalStats;
+ pool->appendConnectionStats(&finalStats);
+
+ auto totalTimeUsageDelta = finalStats.totalConnUsageTime - initialStats.totalConnUsageTime;
+ // Since each connection was used for checkOutLength, the total usage time should be >= the
+ // product.
+ ASSERT_GREATER_THAN_OR_EQUALS(totalTimeUsageDelta, checkOutLength * numConnections);
+}
+
+TEST_F(ConnectionPoolTest, LeasedConnectionsDontCountTowardsUsageTime) {
+ constexpr Milliseconds checkOutLength = Milliseconds(10);
+ auto pool = makePool();
+
+ ConnectionPoolStats initialStats;
+ pool->appendConnectionStats(&initialStats);
+
+ auto startTimePoint = Date_t::now();
+ auto endTimePoint = startTimePoint + checkOutLength;
+ PoolImpl::setNow(startTimePoint);
+
+ ConnectionImpl::pushSetup(Status::OK());
+ pool->lease_forTest(HostAndPort(),
+ Milliseconds(5000),
+ ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ PoolImpl::setNow(endTimePoint);
+ doneWith(swConn.getValue());
+ });
+
+ ConnectionPoolStats finalStats;
+ pool->appendConnectionStats(&finalStats);
+
+ auto totalTimeUsageDelta = finalStats.totalConnUsageTime - initialStats.totalConnUsageTime;
+ ASSERT_EQ(totalTimeUsageDelta, Milliseconds(0));
+}
+
+TEST_F(ConnectionPoolTest, LeasedConnectionsDontInterfereWithOrdinaryCheckoutUsageTime) {
+ constexpr Milliseconds checkOutLength = Milliseconds(10);
+ auto pool = makePool();
+
+ ConnectionPoolStats initialStats;
+ pool->appendConnectionStats(&initialStats);
+
+ auto startTimePoint = Date_t::now();
+ auto endTimePoint = startTimePoint + checkOutLength;
+ PoolImpl::setNow(startTimePoint);
+
+ ConnectionImpl::pushSetup(Status::OK());
+
+ // Checkout one connection and lease one connection.
+ ConnectionPool::ConnectionHandle normal;
+ ConnectionPool::ConnectionHandle leased;
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ normal = std::move(swConn.getValue());
+ });
+ pool->lease_forTest(HostAndPort(),
+ Milliseconds(5000),
+ ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ leased = std::move(swConn.getValue());
+ });
+
+ // Advance the time and return the connections.
+ PoolImpl::setNow(endTimePoint);
+ doneWith(normal);
+ doneWith(leased);
+
+ ConnectionPoolStats finalStats;
+ pool->appendConnectionStats(&finalStats);
+
+ auto totalTimeUsageDelta = finalStats.totalConnUsageTime - initialStats.totalConnUsageTime;
+ // Should only include usage time from the checkout, not the lease
+ ASSERT_GREATER_THAN_OR_EQUALS(totalTimeUsageDelta, checkOutLength);
+ ASSERT_LESS_THAN(totalTimeUsageDelta, checkOutLength * 2);
+}
+
} // namespace connection_pool_test_details
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
index c83529ad482..bb98af43b54 100644
--- a/src/mongo/executor/connection_pool_test_fixture.cpp
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -238,6 +238,11 @@ Date_t PoolImpl::now() {
}
void PoolImpl::setNow(Date_t now) {
+ if (_now) {
+ // If we're not initializing the virtual clock, advance the fast clock source as well.
+ Milliseconds diff = now - *_now;
+ _fastClockSource.advance(diff);
+ }
_now = now;
TimerImpl::fireIfNecessary();
}
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 91cd5fd3a64..8547dc2d81a 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -34,6 +34,7 @@
#include <string>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/async_client.h"
#include "mongo/executor/task_executor.h"
#include "mongo/transport/baton.h"
#include "mongo/util/fail_point.h"
@@ -251,6 +252,46 @@ public:
Milliseconds timeout,
Status status) = 0;
+ /**
+ * An RAII type that NetworkInterface uses to allow users to access a stream corresponding to a
+ * single network-level (i.e. TCP, UDS) connection on which to run commands directly. Generally
+ * speaking, users should use the NetworkInterface itself to run commands to take advantage of
+ * connection-pooling, hedging, and other features - but for special cases where users need to
+ * borrow their own network-stream for manual use, they can lease one through this type.
+ * LeasedStreams are minimal and do not offer automated health-management/automated refreshing
+ * while on lease - users are responsible for examining the health of the stream as-needed if
+ * they desire. Users are also responsible for reporting on the health of stream before the
+ * lease ends so that it can be subsequently re-used; see comments below for detail.
+ */
+ class LeasedStream {
+ public:
+ virtual ~LeasedStream() = default;
+
+ // AsyncDBClient provides the mongoRPC-API for running commands over this stream. This
+ // stream owns the AsyncDBClient and no outstanding networking should be scheduled on the
+ // client when it is destroyed.
+ virtual AsyncDBClient* getClient() = 0;
+
+ // Indicates that the user is done with this leased stream, and no failures on it occured.
+ // Users MUST call either this function or indicateFailure before the LeasedStream is
+ // destroyed.
+ virtual void indicateSuccess() = 0;
+ // Indicates that the stream is unhealthy (i.e. the user received a network error indicating
+ // the stream failed). This prevents the stream from being reused.
+ virtual void indicateFailure(Status) = 0;
+ // Indicates that the stream has successfully performed networking over the stream. Updates
+ // metadata indicating the last healthy networking over the stream so that appropriate
+ // health-checks can be done after the lease ends.
+ virtual void indicateUsed() = 0;
+ };
+
+ /**
+ * Lease a stream from this NetworkInterface for manual use.
+ */
+ virtual SemiFuture<std::unique_ptr<LeasedStream>> leaseStream(const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout) = 0;
+
protected:
NetworkInterface();
};
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index c2b7c7a6c0a..3d84a640cbf 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -1117,6 +1117,31 @@ TEST_F(NetworkInterfaceTest, TearDownWaitsForInProgress) {
ASSERT_EQ(getInProgress(), 0);
}
+TEST_F(NetworkInterfaceTest, RunCommandOnLeasedStream) {
+ auto cs = fixture();
+ auto target = cs.getServers().front();
+ auto leasedStream = net().leaseStream(target, transport::kGlobalSSLMode, kNoTimeout).get();
+ auto* client = leasedStream->getClient();
+
+ auto request = RemoteCommandRequest(target, "admin", makeEchoCmdObj(), nullptr, kNoTimeout);
+ auto deferred = client->runCommandRequest(request);
+
+ auto res = deferred.get();
+
+ ASSERT(res.elapsed);
+ uassertStatusOK(res.status);
+ leasedStream->indicateSuccess();
+ leasedStream->indicateUsed();
+
+ // This opmsg request expect the following reply, which is generated below
+ // { echo: { echo: 1, foo: "bar", $db: "admin" }, ok: 1.0 }
+ auto cmdObj = res.data.getObjectField("echo");
+ ASSERT_EQ(1, cmdObj.getIntField("echo"));
+ ASSERT_EQ("bar"_sd, cmdObj.getStringField("foo"));
+ ASSERT_EQ("admin"_sd, cmdObj.getStringField("$db"));
+ ASSERT_EQ(1, res.data.getIntField("ok"));
+}
+
} // namespace
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index b63b3c6ba77..d528338674d 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -148,6 +148,12 @@ public:
void testEgress(const HostAndPort&, transport::ConnectSSLMode, Milliseconds, Status) override {}
+ // Stream-leasing functionality is not mocked at this time.
+ SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> leaseStream(
+ const HostAndPort&, transport::ConnectSSLMode, Milliseconds) override {
+ MONGO_UNIMPLEMENTED;
+ }
+
////////////////////////////////////////////////////////////////////////////////
//
// Methods for simulating network operations and the passage of time.
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index c1b3def21ef..794603e356c 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -32,6 +32,7 @@
#include <fmt/format.h>
+#include "mongo/base/checked_cast.h"
#include "mongo/config.h"
#include "mongo/db/auth/validated_tenancy_scope.h"
#include "mongo/db/commands/server_status_metric.h"
@@ -42,6 +43,7 @@
#include "mongo/executor/connection_pool_tl.h"
#include "mongo/executor/hedge_options_util.h"
#include "mongo/executor/hedging_metrics.h"
+#include "mongo/executor/network_interface.h"
#include "mongo/executor/network_interface_tl_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -1465,5 +1467,33 @@ bool NetworkInterfaceTL::onNetworkThread() {
void NetworkInterfaceTL::dropConnections(const HostAndPort& hostAndPort) {
_pool->dropConnections(hostAndPort);
}
+
+AsyncDBClient* NetworkInterfaceTL::LeasedStream::getClient() {
+ return checked_cast<connection_pool_tl::TLConnection*>(_conn.get())->client();
+}
+
+void NetworkInterfaceTL::LeasedStream::indicateSuccess() {
+ return _conn->indicateSuccess();
+}
+
+void NetworkInterfaceTL::LeasedStream::indicateFailure(Status status) {
+ _conn->indicateFailure(status);
+}
+
+void NetworkInterfaceTL::LeasedStream::indicateUsed() {
+ _conn->indicateUsed();
+}
+
+SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> NetworkInterfaceTL::leaseStream(
+ const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, Milliseconds timeout) {
+
+ return _pool->lease(hostAndPort, sslMode, timeout)
+ .thenRunOn(_reactor)
+ .then([](auto conn) -> std::unique_ptr<NetworkInterface::LeasedStream> {
+ auto ptr = std::make_unique<NetworkInterfaceTL::LeasedStream>(std::move(conn));
+ return ptr;
+ })
+ .semi();
+}
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index a9351ca689f..52bbe07273a 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -31,9 +31,12 @@
#include <deque>
+#include <boost/optional.hpp>
+
#include "mongo/client/async_client.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/connection_pool_tl.h"
#include "mongo/executor/network_interface.h"
#include "mongo/logv2/log_severity.h"
#include "mongo/platform/mutex.h"
@@ -103,6 +106,33 @@ public:
Milliseconds timeout,
Status status) override;
+ /**
+ * NetworkInterfaceTL's implementation of a leased network-stream
+ * provided for manual use outside of the NITL's usual RPC API.
+ * When this type is destroyed, the destructor of the ConnectionHandle
+ * member will return the connection to this NetworkInterface's ConnectionPool.
+ */
+ class LeasedStream : public NetworkInterface::LeasedStream {
+ public:
+ AsyncDBClient* getClient() override;
+
+ LeasedStream(ConnectionPool::ConnectionHandle&& conn) : _conn{std::move(conn)} {}
+
+ // These pass-through indications of the health of the leased
+ // stream to the underlying ConnectionHandle
+ void indicateSuccess() override;
+ void indicateUsed() override;
+ void indicateFailure(Status) override;
+
+ private:
+ ConnectionPool::ConnectionHandle _conn;
+ };
+
+ SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> leaseStream(
+ const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout) override;
+
private:
struct RequestState;
struct RequestManager;
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp
index 4b416d86f05..826145e3339 100644
--- a/src/mongo/s/sharding_task_executor_pool_controller.cpp
+++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp
@@ -252,7 +252,7 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState&
"maxConns"_attr = maxConns);
// Update the target for just the pool first
- poolData.target = stats.requests + stats.active;
+ poolData.target = stats.requests + stats.active + stats.leased;
if (poolData.target < minConns) {
poolData.target = minConns;