summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGeorge Wangensteen <george.wangensteen@mongodb.com>2023-03-15 20:30:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-12 20:54:50 +0000
commit1a6650b6776c2c7cea2e86fe97a7432e801b124b (patch)
tree9d20146d9a943400f6515bbfe3e3346415d8d390
parentc821f218c1174e65570b4c1c2495fdd07b7fd4c6 (diff)
downloadmongo-1a6650b6776c2c7cea2e86fe97a7432e801b124b.tar.gz
SERVER-73610 Introduce Connection-Leasing from ConnectionPool and NetworkInterface
(cherry picked from commit 3eb6f1f4540a9315434341e447fd4b2830211ef6)
-rw-r--r--src/mongo/client/connpool.cpp1
-rw-r--r--src/mongo/executor/SConscript1
-rw-r--r--src/mongo/executor/connection_pool.cpp132
-rw-r--r--src/mongo/executor/connection_pool.h37
-rw-r--r--src/mongo/executor/connection_pool_stats.cpp15
-rw-r--r--src/mongo/executor/connection_pool_stats.h10
-rw-r--r--src/mongo/executor/connection_pool_test.cpp64
-rw-r--r--src/mongo/executor/network_interface.h41
-rw-r--r--src/mongo/executor/network_interface_integration_test.cpp25
-rw-r--r--src/mongo/executor/network_interface_mock.h6
-rw-r--r--src/mongo/executor/network_interface_tl.cpp29
-rw-r--r--src/mongo/executor/network_interface_tl.h30
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.cpp2
13 files changed, 316 insertions, 77 deletions
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp
index e52bd54d102..1df0d747090 100644
--- a/src/mongo/client/connpool.cpp
+++ b/src/mongo/client/connpool.cpp
@@ -589,6 +589,7 @@ void DBConnectionPool::appendConnectionStats(executor::ConnectionPoolStats* stat
executor::ConnectionStatsPer hostStats{static_cast<size_t>(i->second.numInUse()),
static_cast<size_t>(i->second.numAvailable()),
+ 0,
static_cast<size_t>(i->second.numCreated()),
0,
0};
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 627675845a2..1a1424865ac 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -320,6 +320,7 @@ env.CppIntegrationTest(
'thread_pool_task_executor_integration_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/client/async_client',
'$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/wire_version',
'$BUILD_DIR/mongo/executor/network_interface_factory',
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 5900f65a650..e7b2b1ad58a 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -137,8 +137,8 @@ std::string ConnectionPool::ConnectionControls::toString() const {
}
std::string ConnectionPool::HostState::toString() const {
- return "{{ requests: {}, ready: {}, pending: {}, active: {}, isExpired: {} }}"_format(
- requests, ready, pending, active, health.isExpired);
+ return "{{ requests: {}, ready: {}, pending: {}, active: {}, leased: {}, isExpired: {} }}"_format(
+ requests, ready, pending, active, leased, health.isExpired);
}
/**
@@ -162,7 +162,7 @@ public:
const auto minConns = getPool()->_options.minConnections;
const auto maxConns = getPool()->_options.maxConnections;
- data.target = stats.requests + stats.active;
+ data.target = stats.requests + stats.active + stats.leased;
if (data.target < minConns) {
data.target = minConns;
} else if (data.target > maxConns) {
@@ -275,7 +275,7 @@ public:
* Gets a connection from the specific pool. Sinks a unique_lock from the
* parent to preserve the lock on _mutex
*/
- Future<ConnectionHandle> getConnection(Milliseconds timeout);
+ Future<ConnectionHandle> getConnection(Milliseconds timeout, bool lease);
/**
* Triggers the shutdown procedure. This function sets isShutdown to true
@@ -298,6 +298,11 @@ public:
size_t inUseConnections() const;
/**
+ * Returns the number of leased connections from the pool.
+ */
+ size_t leasedConnections() const;
+
+ /**
* Returns the number of available connections in the pool.
*/
size_t availableConnections() const;
@@ -320,7 +325,7 @@ public:
/**
* Returns the total number of connections currently open that belong to
* this pool. This is the sum of refreshingConnections, availableConnections,
- * and inUseConnections.
+ * inUseConnections, and leasedConnections.
*/
size_t openConnections() const;
@@ -361,14 +366,20 @@ private:
using OwnedConnection = std::shared_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>;
- using Request = std::pair<Date_t, Promise<ConnectionHandle>>;
+ struct Request {
+ Date_t expiration;
+ Promise<ConnectionHandle> promise;
+ // Whether or not the requested connection should be "leased".
+ bool lease;
+ };
+
struct RequestComparator {
bool operator()(const Request& a, const Request& b) {
- return a.first > b.first;
+ return a.expiration > b.expiration;
}
};
- ConnectionHandle makeHandle(ConnectionInterface* connection);
+ ConnectionHandle makeHandle(ConnectionInterface* connection, bool isLeased);
/**
* Establishes connections until the ControllerInterface's target is met.
@@ -381,11 +392,11 @@ private:
void fulfillRequests();
- void returnConnection(ConnectionInterface* connPtr);
+ void returnConnection(ConnectionInterface* connPtr, bool isLeased);
// This internal helper is used both by get and by _fulfillRequests and differs in that it
// skips some bookkeeping that the other callers do on their own
- ConnectionHandle tryGetConnection();
+ ConnectionHandle tryGetConnection(bool lease);
template <typename OwnershipPoolType>
typename OwnershipPoolType::mapped_type takeFromPool(
@@ -414,6 +425,7 @@ private:
OwnershipPool _processingPool;
OwnershipPool _droppedProcessingPool;
OwnershipPool _checkedOutPool;
+ OwnershipPool _leasedPool;
std::vector<Request> _requests;
Date_t _lastActiveTime;
@@ -548,21 +560,37 @@ void ConnectionPool::mutateTags(
pool->mutateTags(mutateFunc);
}
+void ConnectionPool::retrieve_forTest(RetrieveConnection retrieve, GetConnectionCallback cb) {
+ // We kick ourselves onto the executor queue to prevent us from deadlocking with our own thread
+ auto getConnectionFunc =
+ [this, retrieve = std::move(retrieve), cb = std::move(cb)](Status&&) mutable {
+ retrieve().thenRunOn(_factory->getExecutor()).getAsync(std::move(cb));
+ };
+ _factory->getExecutor()->schedule(std::move(getConnectionFunc));
+}
+
void ConnectionPool::get_forTest(const HostAndPort& hostAndPort,
Milliseconds timeout,
GetConnectionCallback cb) {
- // We kick ourselves onto the executor queue to prevent us from deadlocking with our own thread
- auto getConnectionFunc = [this, hostAndPort, timeout, cb = std::move(cb)](Status&&) mutable {
- get(hostAndPort, transport::kGlobalSSLMode, timeout)
- .thenRunOn(_factory->getExecutor())
- .getAsync(std::move(cb));
+ auto getConnectionFunc = [this, hostAndPort, timeout]() mutable {
+ return get(hostAndPort, transport::kGlobalSSLMode, timeout);
};
- _factory->getExecutor()->schedule(std::move(getConnectionFunc));
+ retrieve_forTest(getConnectionFunc, std::move(cb));
+}
+
+void ConnectionPool::lease_forTest(const HostAndPort& hostAndPort,
+ Milliseconds timeout,
+ GetConnectionCallback cb) {
+ auto getConnectionFunc = [this, hostAndPort, timeout]() mutable {
+ return lease(hostAndPort, transport::kGlobalSSLMode, timeout);
+ };
+ retrieve_forTest(getConnectionFunc, std::move(cb));
}
-SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort,
- transport::ConnectSSLMode sslMode,
- Milliseconds timeout) {
+SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::_get(const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout,
+ bool lease) {
stdx::lock_guard lk(_mutex);
auto& pool = _pools[hostAndPort];
@@ -574,7 +602,7 @@ SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPo
invariant(pool);
- auto connFuture = pool->getConnection(timeout);
+ auto connFuture = pool->getConnection(timeout, lease);
pool->updateState();
return std::move(connFuture).semi();
@@ -590,6 +618,7 @@ void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const {
auto& pool = kv.second;
ConnectionStatsPer hostStats{pool->inUseConnections(),
pool->availableConnections(),
+ pool->leasedConnections(),
pool->createdConnections(),
pool->refreshingConnections(),
pool->refreshedConnections()};
@@ -625,6 +654,7 @@ ConnectionPool::SpecificPool::~SpecificPool() {
if (shouldInvariantOnPoolCorrectness()) {
invariant(_requests.empty());
invariant(_checkedOutPool.empty());
+ invariant(_leasedPool.empty());
}
}
@@ -636,6 +666,10 @@ size_t ConnectionPool::SpecificPool::availableConnections() const {
return _readyPool.size();
}
+size_t ConnectionPool::SpecificPool::leasedConnections() const {
+ return _leasedPool.size();
+}
+
size_t ConnectionPool::SpecificPool::refreshingConnections() const {
return _processingPool.size();
}
@@ -649,7 +683,7 @@ size_t ConnectionPool::SpecificPool::createdConnections() const {
}
size_t ConnectionPool::SpecificPool::openConnections() const {
- return _checkedOutPool.size() + _readyPool.size() + _processingPool.size();
+ return _checkedOutPool.size() + _readyPool.size() + _processingPool.size() + _leasedPool.size();
}
size_t ConnectionPool::SpecificPool::requestsPending() const {
@@ -657,7 +691,7 @@ size_t ConnectionPool::SpecificPool::requestsPending() const {
}
Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection(
- Milliseconds timeout) {
+ Milliseconds timeout, bool lease) {
// Reset our activity timestamp
auto now = _parent->_factory->now();
@@ -665,7 +699,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
// If we do not have requests, then we can fulfill immediately
if (_requests.size() == 0) {
- auto conn = tryGetConnection();
+ auto conn = tryGetConnection(lease);
if (conn) {
LOGV2_DEBUG(22559,
@@ -691,23 +725,24 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
const auto expiration = now + timeout;
auto pf = makePromiseFuture<ConnectionHandle>();
- _requests.push_back(make_pair(expiration, std::move(pf.promise)));
+ _requests.push_back({expiration, std::move(pf.promise), lease});
std::push_heap(begin(_requests), end(_requests), RequestComparator{});
return std::move(pf.future);
}
-auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle {
- auto deleter = [this, anchor = shared_from_this()](ConnectionInterface* connection) {
+auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection, bool isLeased)
+ -> ConnectionHandle {
+ auto deleter = [this, anchor = shared_from_this(), isLeased](ConnectionInterface* connection) {
stdx::lock_guard lk(_parent->_mutex);
- returnConnection(connection);
+ returnConnection(connection, isLeased);
_lastActiveTime = _parent->_factory->now();
updateState();
};
return ConnectionHandle(connection, std::move(deleter));
}
-ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection() {
+ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(bool lease) {
while (_readyPool.size()) {
// _readyPool is an LRUCache, so its begin() object is the MRU item.
auto iter = _readyPool.begin();
@@ -729,12 +764,15 @@ ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(
auto connPtr = conn.get();
- // check out the connection
- _checkedOutPool[connPtr] = std::move(conn);
+ if (lease) {
+ _leasedPool[connPtr] = std::move(conn);
+ } else {
+ _checkedOutPool[connPtr] = std::move(conn);
+ }
// pass it to the user
connPtr->resetToUnknown();
- auto handle = makeHandle(connPtr);
+ auto handle = makeHandle(connPtr, lease);
return handle;
}
@@ -804,10 +842,10 @@ void ConnectionPool::SpecificPool::finishRefresh(ConnectionInterface* connPtr, S
fulfillRequests();
}
-void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr) {
+void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, bool isLeased) {
auto needsRefreshTP = connPtr->getLastUsed() + _parent->_controller->toRefreshTimeout();
- auto conn = takeFromPool(_checkedOutPool, connPtr);
+ auto conn = takeFromPool(isLeased ? _leasedPool : _checkedOutPool, connPtr);
invariant(conn);
if (_health.isShutdown) {
@@ -842,8 +880,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
if (shouldRefreshConnection) {
auto controls = _parent->_controller->getControls(_id);
- if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >=
- controls.targetConnections) {
+ if (openConnections() >= controls.targetConnections) {
// If we already have minConnections, just let the connection lapse
LOGV2(22567,
"Ending idle connection to host {hostAndPort} because the pool meets "
@@ -910,7 +947,7 @@ void ConnectionPool::SpecificPool::addToReady(OwnedConnection conn) {
connPtr->indicateSuccess();
- returnConnection(connPtr);
+ returnConnection(connPtr, false);
});
connPtr->setTimeout(_parent->_controller->toRefreshTimeout(), std::move(returnConnectionFunc));
}
@@ -977,7 +1014,7 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status) {
}
for (auto& request : _requests) {
- request.second.setError(status);
+ request.promise.setError(status);
}
LOGV2_DEBUG(22573,
@@ -999,14 +1036,14 @@ void ConnectionPool::SpecificPool::fulfillRequests() {
// deadlock).
//
// None of the heap manipulation code throws, but it's something to keep in mind.
- auto conn = tryGetConnection();
+ auto conn = tryGetConnection(_requests.front().lease);
if (!conn) {
break;
}
// Grab the request and callback
- auto promise = std::move(_requests.front().second);
+ auto promise = std::move(_requests.front().promise);
std::pop_heap(begin(_requests), end(_requests), RequestComparator{});
_requests.pop_back();
@@ -1114,7 +1151,8 @@ void ConnectionPool::SpecificPool::updateHealth() {
const auto now = _parent->_factory->now();
// We're expired if we have no sign of connection use and are past our expiry
- _health.isExpired = _requests.empty() && _checkedOutPool.empty() && (_hostExpiration <= now);
+ _health.isExpired = _requests.empty() && _checkedOutPool.empty() && _leasedPool.empty() &&
+ (_hostExpiration <= now);
// We're failed until we get new requests or our timer triggers
if (_health.isFailed) {
@@ -1132,7 +1170,7 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
}
// If our expiration comes before our next event, then it is the next event
- if (_requests.empty() && _checkedOutPool.empty()) {
+ if (_requests.empty() && _checkedOutPool.empty() && _leasedPool.empty()) {
_hostExpiration = _lastActiveTime + _parent->_controller->hostTimeout();
if ((_hostExpiration > now) && (_hostExpiration < nextEventTime)) {
nextEventTime = _hostExpiration;
@@ -1140,8 +1178,8 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
}
// If a request would timeout before the next event, then it is the next event
- if (_requests.size() && (_requests.front().first < nextEventTime)) {
- nextEventTime = _requests.front().first;
+ if (_requests.size() && (_requests.front().expiration < nextEventTime)) {
+ nextEventTime = _requests.front().expiration;
}
// Clamp next event time to be either now or in the future. Next event time
@@ -1169,12 +1207,12 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
_health.isFailed = false;
- while (_requests.size() && (_requests.front().first <= now)) {
+ while (_requests.size() && (_requests.front().expiration <= now)) {
std::pop_heap(begin(_requests), end(_requests), RequestComparator{});
auto& request = _requests.back();
- request.second.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
- "Couldn't get a connection within the time limit"));
+ request.promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit,
+ "Couldn't get a connection within the time limit"));
_requests.pop_back();
// Since we've failed a request, we've interacted with external users
@@ -1198,6 +1236,7 @@ void ConnectionPool::SpecificPool::updateController() {
refreshingConnections(),
availableConnections(),
inUseConnections(),
+ leasedConnections(),
};
LOGV2_DEBUG(22578,
kDiagnosticLogLevel,
@@ -1236,6 +1275,7 @@ void ConnectionPool::SpecificPool::updateController() {
if (shouldInvariantOnPoolCorrectness()) {
invariant(pool->_checkedOutPool.empty());
invariant(pool->_requests.empty());
+ invariant(pool->_leasedPool.empty());
}
pool->triggerShutdown(Status(ErrorCodes::ConnectionPoolExpired,
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 0e5bf90dc9a..1da17ee8897 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -79,6 +79,7 @@ public:
using ConnectionHandleDeleter = std::function<void(ConnectionInterface* connection)>;
using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>;
+ using RetrieveConnection = unique_function<SemiFuture<ConnectionHandle>()>;
using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>;
using PoolId = uint64_t;
@@ -210,6 +211,7 @@ public:
size_t pending = 0;
size_t ready = 0;
size_t active = 0;
+ size_t leased = 0;
std::string toString() const;
};
@@ -252,13 +254,35 @@ public:
const std::function<transport::Session::TagMask(transport::Session::TagMask)>&
mutateFunc) override;
- SemiFuture<ConnectionHandle> get(const HostAndPort& hostAndPort,
- transport::ConnectSSLMode sslMode,
- Milliseconds timeout);
+ inline SemiFuture<ConnectionHandle> get(const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout) {
+ return _get(hostAndPort, sslMode, timeout, false /*lease*/);
+ }
+
void get_forTest(const HostAndPort& hostAndPort,
Milliseconds timeout,
GetConnectionCallback cb);
+ /**
+ * "Lease" a connection from the pool.
+ *
+ * Connections retrieved via this method are not assumed to be in active use for the duration of
+ * their lease and are reported separately in metrics. Otherwise, this method behaves similarly
+ * to `ConnectionPool::get`.
+ */
+ inline SemiFuture<ConnectionHandle> lease(
+ const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout,
+ ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ return _get(hostAndPort, sslMode, timeout, true /*lease*/);
+ }
+
+ void lease_forTest(const HostAndPort& hostAndPort,
+ Milliseconds timeout,
+ GetConnectionCallback cb);
+
void appendConnectionStats(ConnectionPoolStats* stats) const;
size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const;
@@ -268,6 +292,13 @@ public:
}
private:
+ SemiFuture<ConnectionHandle> _get(const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout,
+ bool leased);
+
+ void retrieve_forTest(RetrieveConnection retrieve, GetConnectionCallback cb);
+
std::string _name;
const std::shared_ptr<DependentTypeFactoryInterface> _factory;
diff --git a/src/mongo/executor/connection_pool_stats.cpp b/src/mongo/executor/connection_pool_stats.cpp
index a3d1e1fb0b1..85d6743b127 100644
--- a/src/mongo/executor/connection_pool_stats.cpp
+++ b/src/mongo/executor/connection_pool_stats.cpp
@@ -36,10 +36,15 @@
namespace mongo {
namespace executor {
-ConnectionStatsPer::ConnectionStatsPer(
- size_t nInUse, size_t nAvailable, size_t nCreated, size_t nRefreshing, size_t nRefreshed)
+ConnectionStatsPer::ConnectionStatsPer(size_t nInUse,
+ size_t nAvailable,
+ size_t nLeased,
+ size_t nCreated,
+ size_t nRefreshing,
+ size_t nRefreshed)
: inUse(nInUse),
available(nAvailable),
+ leased(nLeased),
created(nCreated),
refreshing(nRefreshing),
refreshed(nRefreshed) {}
@@ -49,6 +54,7 @@ ConnectionStatsPer::ConnectionStatsPer() = default;
ConnectionStatsPer& ConnectionStatsPer::operator+=(const ConnectionStatsPer& other) {
inUse += other.inUse;
available += other.available;
+ leased += other.leased;
created += other.created;
refreshing += other.refreshing;
refreshed += other.refreshed;
@@ -75,6 +81,7 @@ void ConnectionPoolStats::updateStatsForHost(std::string pool,
// Update total connection stats.
totalInUse += newStats.inUse;
totalAvailable += newStats.available;
+ totalLeased += newStats.leased;
totalCreated += newStats.created;
totalRefreshing += newStats.refreshing;
totalRefreshed += newStats.refreshed;
@@ -83,6 +90,7 @@ void ConnectionPoolStats::updateStatsForHost(std::string pool,
void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFTDC) {
result.appendNumber("totalInUse", static_cast<long long>(totalInUse));
result.appendNumber("totalAvailable", static_cast<long long>(totalAvailable));
+ result.appendNumber("totalLeased", static_cast<long long>(totalLeased));
result.appendNumber("totalCreated", static_cast<long long>(totalCreated));
result.appendNumber("totalRefreshing", static_cast<long long>(totalRefreshing));
result.appendNumber("totalRefreshed", static_cast<long long>(totalRefreshed));
@@ -115,6 +123,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT
auto& poolStats = pool.second;
poolInfo.appendNumber("poolInUse", static_cast<long long>(poolStats.inUse));
poolInfo.appendNumber("poolAvailable", static_cast<long long>(poolStats.available));
+ poolInfo.appendNumber("poolLeased", static_cast<long long>(poolStats.leased));
poolInfo.appendNumber("poolCreated", static_cast<long long>(poolStats.created));
poolInfo.appendNumber("poolRefreshing", static_cast<long long>(poolStats.refreshing));
poolInfo.appendNumber("poolRefreshed", static_cast<long long>(poolStats.refreshed));
@@ -124,6 +133,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT
auto& hostStats = host.second;
hostInfo.appendNumber("inUse", static_cast<long long>(hostStats.inUse));
hostInfo.appendNumber("available", static_cast<long long>(hostStats.available));
+ hostInfo.appendNumber("leased", static_cast<long long>(hostStats.leased));
hostInfo.appendNumber("created", static_cast<long long>(hostStats.created));
hostInfo.appendNumber("refreshing", static_cast<long long>(hostStats.refreshing));
hostInfo.appendNumber("refreshed", static_cast<long long>(hostStats.refreshed));
@@ -139,6 +149,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT
auto hostStats = host.second;
hostInfo.appendNumber("inUse", static_cast<long long>(hostStats.inUse));
hostInfo.appendNumber("available", static_cast<long long>(hostStats.available));
+ hostInfo.appendNumber("leased", static_cast<long long>(hostStats.leased));
hostInfo.appendNumber("created", static_cast<long long>(hostStats.created));
hostInfo.appendNumber("refreshing", static_cast<long long>(hostStats.refreshing));
hostInfo.appendNumber("refreshed", static_cast<long long>(hostStats.refreshed));
diff --git a/src/mongo/executor/connection_pool_stats.h b/src/mongo/executor/connection_pool_stats.h
index a96739b42be..3f4183adbf2 100644
--- a/src/mongo/executor/connection_pool_stats.h
+++ b/src/mongo/executor/connection_pool_stats.h
@@ -41,8 +41,12 @@ namespace executor {
* a parent ConnectionPoolStats object and should not need to be created directly.
*/
struct ConnectionStatsPer {
- ConnectionStatsPer(
- size_t nInUse, size_t nAvailable, size_t nCreated, size_t nRefreshing, size_t nRefreshed);
+ ConnectionStatsPer(size_t nInUse,
+ size_t nAvailable,
+ size_t nLeased,
+ size_t nCreated,
+ size_t nRefreshing,
+ size_t nRefreshed);
ConnectionStatsPer();
@@ -50,6 +54,7 @@ struct ConnectionStatsPer {
size_t inUse = 0u;
size_t available = 0u;
+ size_t leased = 0u;
size_t created = 0u;
size_t refreshing = 0u;
size_t refreshed = 0u;
@@ -68,6 +73,7 @@ struct ConnectionPoolStats {
size_t totalInUse = 0u;
size_t totalAvailable = 0u;
+ size_t totalLeased = 0u;
size_t totalCreated = 0u;
size_t totalRefreshing = 0u;
size_t totalRefreshed = 0u;
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 79eaaaf0218..e78d62c4285 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -31,6 +31,8 @@
#include "mongo/executor/connection_pool_test_fixture.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/net/hostandport.h"
#include <algorithm>
#include <memory>
#include <random>
@@ -41,6 +43,7 @@
#include <fmt/ostream.h>
#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/connection_pool_stats.h"
#include "mongo/stdx/future.h"
#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/unittest/unittest.h"
@@ -52,6 +55,8 @@ namespace connection_pool_test_details {
class ConnectionPoolTest : public unittest::Test {
public:
+ constexpr static Milliseconds kNoTimeout = Milliseconds{-1};
+
protected:
void setUp() override {}
@@ -148,6 +153,8 @@ TEST_F(ConnectionPoolTest, SameConn) {
*/
TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
auto pool = makePool();
+ std::random_device rd;
+ std::mt19937 rng(rd());
// Obtain a set of connections
constexpr size_t kSize = 100;
@@ -170,17 +177,24 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
}
});
+ std::uniform_int_distribution<> dist{0, 1};
for (size_t i = 0; i != kSize; ++i) {
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- monitors[i].exec([&]() {
- ASSERT(swConn.isOK());
- connections.push_back(std::move(swConn.getValue()));
- monitors[i].notifyDone();
- });
- });
+ auto cb = [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitors[i].exec([&]() {
+ ASSERT(swConn.isOK());
+ connections.push_back(std::move(swConn.getValue()));
+ monitors[i].notifyDone();
+ });
+ };
+ auto timeout = Milliseconds(5000);
+
+ // Randomly lease or check out connection.
+ if (dist(rng)) {
+ pool->get_forTest(HostAndPort(), timeout, cb);
+ } else {
+ pool->lease_forTest(HostAndPort(), timeout, cb);
+ }
}
for (auto& monitor : monitors) {
@@ -190,8 +204,6 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
ASSERT_EQ(connections.size(), kSize);
// Shuffle them into a random order
- std::random_device rd;
- std::mt19937 rng(rd());
std::shuffle(connections.begin(), connections.end(), rng);
// Return them to the pool in that random order, recording IDs in a stack
@@ -211,18 +223,24 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
// as the IDs in the stack, since the pool returns them in MRU order.
for (size_t i = 0; i != kSize; ++i) {
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- monitors[i].exec([&]() {
- ASSERT(swConn.isOK());
- const auto id = verifyAndGetId(swConn);
- connections.push_back(std::move(swConn.getValue()));
- ASSERT_EQ(id, ids.top());
- ids.pop();
- monitors[i].notifyDone();
- });
- });
+ auto cb = [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitors[i].exec([&]() {
+ ASSERT(swConn.isOK());
+ const auto id = verifyAndGetId(swConn);
+ connections.push_back(std::move(swConn.getValue()));
+ ASSERT_EQ(id, ids.top());
+ ids.pop();
+ monitors[i].notifyDone();
+ });
+ };
+ auto timeout = Milliseconds(5000);
+
+ // Randomly lease or check out connection.
+ if (dist(rng)) {
+ pool->get_forTest(HostAndPort(), timeout, cb);
+ } else {
+ pool->lease_forTest(HostAndPort(), timeout, cb);
+ }
}
for (auto& monitor : monitors) {
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 799f473a12d..1867a87e71c 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -34,6 +34,7 @@
#include <string>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/async_client.h"
#include "mongo/executor/task_executor.h"
#include "mongo/transport/baton.h"
#include "mongo/util/fail_point.h"
@@ -252,6 +253,46 @@ public:
Milliseconds timeout,
Status status) = 0;
+ /**
+ * An RAII type that NetworkInterface uses to allow users to access a stream corresponding to a
+ * single network-level (i.e. TCP, UDS) connection on which to run commands directly. Generally
+ * speaking, users should use the NetworkInterface itself to run commands to take advantage of
+ * connection-pooling, hedging, and other features - but for special cases where users need to
+ * borrow their own network-stream for manual use, they can lease one through this type.
+ * LeasedStreams are minimal and do not offer automated health-management/automated refreshing
+ * while on lease - users are responsible for examining the health of the stream as-needed if
+ * they desire. Users are also responsible for reporting on the health of stream before the
+ * lease ends so that it can be subsequently re-used; see comments below for detail.
+ */
+ class LeasedStream {
+ public:
+ virtual ~LeasedStream() = default;
+
+ // AsyncDBClient provides the mongoRPC-API for running commands over this stream. This
+ // stream owns the AsyncDBClient and no outstanding networking should be scheduled on the
+ // client when it is destroyed.
+ virtual AsyncDBClient* getClient() = 0;
+
+ // Indicates that the user is done with this leased stream, and no failures on it occured.
+ // Users MUST call either this function or indicateFailure before the LeasedStream is
+ // destroyed.
+ virtual void indicateSuccess() = 0;
+ // Indicates that the stream is unhealthy (i.e. the user received a network error indicating
+ // the stream failed). This prevents the stream from being reused.
+ virtual void indicateFailure(Status) = 0;
+ // Indicates that the stream has successfully performed networking over the stream. Updates
+ // metadata indicating the last healthy networking over the stream so that appropriate
+ // health-checks can be done after the lease ends.
+ virtual void indicateUsed() = 0;
+ };
+
+ /**
+ * Lease a stream from this NetworkInterface for manual use.
+ */
+ virtual SemiFuture<std::unique_ptr<LeasedStream>> leaseStream(const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout) = 0;
+
protected:
NetworkInterface();
};
diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp
index fdf1ac6f8ba..b3f57157fe8 100644
--- a/src/mongo/executor/network_interface_integration_test.cpp
+++ b/src/mongo/executor/network_interface_integration_test.cpp
@@ -939,6 +939,31 @@ TEST_F(NetworkInterfaceTest, TearDownWaitsForInProgress) {
ASSERT_EQ(getInProgress(), 0);
}
+TEST_F(NetworkInterfaceTest, RunCommandOnLeasedStream) {
+ auto cs = fixture();
+ auto target = cs.getServers().front();
+ auto leasedStream = net().leaseStream(target, transport::kGlobalSSLMode, kNoTimeout).get();
+ auto* client = leasedStream->getClient();
+
+ auto request = RemoteCommandRequest(target, "admin", makeEchoCmdObj(), nullptr, kNoTimeout);
+ auto deferred = client->runCommandRequest(request);
+
+ auto res = deferred.get();
+
+ ASSERT(res.elapsed);
+ uassertStatusOK(res.status);
+ leasedStream->indicateSuccess();
+ leasedStream->indicateUsed();
+
+ // This opmsg request expect the following reply, which is generated below
+ // { echo: { echo: 1, foo: "bar", $db: "admin" }, ok: 1.0 }
+ auto cmdObj = res.data.getObjectField("echo");
+ ASSERT_EQ(1, cmdObj.getIntField("echo"));
+ ASSERT_EQ("bar"_sd, cmdObj.getStringField("foo"));
+ ASSERT_EQ("admin"_sd, cmdObj.getStringField("$db"));
+ ASSERT_EQ(1, res.data.getIntField("ok"));
+}
+
} // namespace
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index e096589d640..8d717506d40 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -148,6 +148,12 @@ public:
void testEgress(const HostAndPort&, transport::ConnectSSLMode, Milliseconds, Status) override {}
+ // Stream-leasing functionality is not mocked at this time.
+ SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> leaseStream(
+ const HostAndPort&, transport::ConnectSSLMode, Milliseconds) override {
+ MONGO_UNREACHABLE;
+ }
+
////////////////////////////////////////////////////////////////////////////////
//
// Methods for simulating network operations and the passage of time.
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 1b74e1e9ee5..0f23b2f61d7 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -33,12 +33,14 @@
#include <fmt/format.h>
+#include "mongo/base/checked_cast.h"
#include "mongo/config.h"
#include "mongo/db/auth/security_token.h"
#include "mongo/db/server_options.h"
#include "mongo/db/wire_version.h"
#include "mongo/executor/connection_pool_tl.h"
#include "mongo/executor/hedging_metrics.h"
+#include "mongo/executor/network_interface.h"
#include "mongo/executor/network_interface_tl_gen.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -1349,5 +1351,32 @@ void NetworkInterfaceTL::dropConnections(const HostAndPort& hostAndPort) {
_pool->dropConnections(hostAndPort);
}
+AsyncDBClient* NetworkInterfaceTL::LeasedStream::getClient() {
+ return checked_cast<connection_pool_tl::TLConnection*>(_conn.get())->client();
+}
+
+void NetworkInterfaceTL::LeasedStream::indicateSuccess() {
+ return _conn->indicateSuccess();
+}
+
+void NetworkInterfaceTL::LeasedStream::indicateFailure(Status status) {
+ _conn->indicateFailure(status);
+}
+
+void NetworkInterfaceTL::LeasedStream::indicateUsed() {
+ _conn->indicateUsed();
+}
+
+SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> NetworkInterfaceTL::leaseStream(
+ const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, Milliseconds timeout) {
+
+ return _pool->lease(hostAndPort, sslMode, timeout)
+ .thenRunOn(_reactor)
+ .then([](auto conn) -> std::unique_ptr<NetworkInterface::LeasedStream> {
+ auto ptr = std::make_unique<NetworkInterfaceTL::LeasedStream>(std::move(conn));
+ return ptr;
+ })
+ .semi();
+}
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 158e4b0a679..96e4ecff2fa 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -31,9 +31,12 @@
#include <deque>
+#include <boost/optional.hpp>
+
#include "mongo/client/async_client.h"
#include "mongo/db/service_context.h"
#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/connection_pool_tl.h"
#include "mongo/executor/network_interface.h"
#include "mongo/logv2/log_severity.h"
#include "mongo/platform/mutex.h"
@@ -103,6 +106,33 @@ public:
Milliseconds timeout,
Status status) override;
+ /**
+ * NetworkInterfaceTL's implementation of a leased network-stream
+ * provided for manual use outside of the NITL's usual RPC API.
+ * When this type is destroyed, the destructor of the ConnectionHandle
+ * member will return the connection to this NetworkInterface's ConnectionPool.
+ */
+ class LeasedStream : public NetworkInterface::LeasedStream {
+ public:
+ AsyncDBClient* getClient() override;
+
+ LeasedStream(ConnectionPool::ConnectionHandle&& conn) : _conn{std::move(conn)} {}
+
+ // These pass-through indications of the health of the leased
+ // stream to the underlying ConnectionHandle
+ void indicateSuccess() override;
+ void indicateUsed() override;
+ void indicateFailure(Status) override;
+
+ private:
+ ConnectionPool::ConnectionHandle _conn;
+ };
+
+ SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> leaseStream(
+ const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode,
+ Milliseconds timeout) override;
+
private:
struct RequestState;
struct RequestManager;
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp
index d95a76a168f..c73967e6c50 100644
--- a/src/mongo/s/sharding_task_executor_pool_controller.cpp
+++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp
@@ -253,7 +253,7 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState&
"maxConns"_attr = maxConns);
// Update the target for just the pool first
- poolData.target = stats.requests + stats.active;
+ poolData.target = stats.requests + stats.active + stats.leased;
if (poolData.target < minConns) {
poolData.target = minConns;