diff options
author | George Wangensteen <george.wangensteen@mongodb.com> | 2023-03-15 20:30:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-04-12 20:54:50 +0000 |
commit | 1a6650b6776c2c7cea2e86fe97a7432e801b124b (patch) | |
tree | 9d20146d9a943400f6515bbfe3e3346415d8d390 | |
parent | c821f218c1174e65570b4c1c2495fdd07b7fd4c6 (diff) | |
download | mongo-1a6650b6776c2c7cea2e86fe97a7432e801b124b.tar.gz |
SERVER-73610 Introduce Connection-Leasing from ConnectionPool and NetworkInterface
(cherry picked from commit 3eb6f1f4540a9315434341e447fd4b2830211ef6)
-rw-r--r-- | src/mongo/client/connpool.cpp | 1 | ||||
-rw-r--r-- | src/mongo/executor/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 132 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 37 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_stats.cpp | 15 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_stats.h | 10 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 64 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 41 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_integration_test.cpp | 25 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 29 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 30 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.cpp | 2 |
13 files changed, 316 insertions, 77 deletions
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp index e52bd54d102..1df0d747090 100644 --- a/src/mongo/client/connpool.cpp +++ b/src/mongo/client/connpool.cpp @@ -589,6 +589,7 @@ void DBConnectionPool::appendConnectionStats(executor::ConnectionPoolStats* stat executor::ConnectionStatsPer hostStats{static_cast<size_t>(i->second.numInUse()), static_cast<size_t>(i->second.numAvailable()), + 0, static_cast<size_t>(i->second.numCreated()), 0, 0}; diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 627675845a2..1a1424865ac 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -320,6 +320,7 @@ env.CppIntegrationTest( 'thread_pool_task_executor_integration_test.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/client/async_client', '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/db/wire_version', '$BUILD_DIR/mongo/executor/network_interface_factory', diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 5900f65a650..e7b2b1ad58a 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -137,8 +137,8 @@ std::string ConnectionPool::ConnectionControls::toString() const { } std::string ConnectionPool::HostState::toString() const { - return "{{ requests: {}, ready: {}, pending: {}, active: {}, isExpired: {} }}"_format( - requests, ready, pending, active, health.isExpired); + return "{{ requests: {}, ready: {}, pending: {}, active: {}, leased: {}, isExpired: {} }}"_format( + requests, ready, pending, active, leased, health.isExpired); } /** @@ -162,7 +162,7 @@ public: const auto minConns = getPool()->_options.minConnections; const auto maxConns = getPool()->_options.maxConnections; - data.target = stats.requests + stats.active; + data.target = stats.requests + stats.active + stats.leased; if (data.target < minConns) { data.target = minConns; } else if (data.target > maxConns) { @@ -275,7 +275,7 @@ public: * Gets a connection from the specific pool. Sinks a unique_lock from the * parent to preserve the lock on _mutex */ - Future<ConnectionHandle> getConnection(Milliseconds timeout); + Future<ConnectionHandle> getConnection(Milliseconds timeout, bool lease); /** * Triggers the shutdown procedure. This function sets isShutdown to true @@ -298,6 +298,11 @@ public: size_t inUseConnections() const; /** + * Returns the number of leased connections from the pool. + */ + size_t leasedConnections() const; + + /** * Returns the number of available connections in the pool. */ size_t availableConnections() const; @@ -320,7 +325,7 @@ public: /** * Returns the total number of connections currently open that belong to * this pool. This is the sum of refreshingConnections, availableConnections, - * and inUseConnections. + * inUseConnections, and leasedConnections. */ size_t openConnections() const; @@ -361,14 +366,20 @@ private: using OwnedConnection = std::shared_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; using LRUOwnershipPool = LRUCache<OwnershipPool::key_type, OwnershipPool::mapped_type>; - using Request = std::pair<Date_t, Promise<ConnectionHandle>>; + struct Request { + Date_t expiration; + Promise<ConnectionHandle> promise; + // Whether or not the requested connection should be "leased". + bool lease; + }; + struct RequestComparator { bool operator()(const Request& a, const Request& b) { - return a.first > b.first; + return a.expiration > b.expiration; } }; - ConnectionHandle makeHandle(ConnectionInterface* connection); + ConnectionHandle makeHandle(ConnectionInterface* connection, bool isLeased); /** * Establishes connections until the ControllerInterface's target is met. @@ -381,11 +392,11 @@ private: void fulfillRequests(); - void returnConnection(ConnectionInterface* connPtr); + void returnConnection(ConnectionInterface* connPtr, bool isLeased); // This internal helper is used both by get and by _fulfillRequests and differs in that it // skips some bookkeeping that the other callers do on their own - ConnectionHandle tryGetConnection(); + ConnectionHandle tryGetConnection(bool lease); template <typename OwnershipPoolType> typename OwnershipPoolType::mapped_type takeFromPool( @@ -414,6 +425,7 @@ private: OwnershipPool _processingPool; OwnershipPool _droppedProcessingPool; OwnershipPool _checkedOutPool; + OwnershipPool _leasedPool; std::vector<Request> _requests; Date_t _lastActiveTime; @@ -548,21 +560,37 @@ void ConnectionPool::mutateTags( pool->mutateTags(mutateFunc); } +void ConnectionPool::retrieve_forTest(RetrieveConnection retrieve, GetConnectionCallback cb) { + // We kick ourselves onto the executor queue to prevent us from deadlocking with our own thread + auto getConnectionFunc = + [this, retrieve = std::move(retrieve), cb = std::move(cb)](Status&&) mutable { + retrieve().thenRunOn(_factory->getExecutor()).getAsync(std::move(cb)); + }; + _factory->getExecutor()->schedule(std::move(getConnectionFunc)); +} + void ConnectionPool::get_forTest(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb) { - // We kick ourselves onto the executor queue to prevent us from deadlocking with our own thread - auto getConnectionFunc = [this, hostAndPort, timeout, cb = std::move(cb)](Status&&) mutable { - get(hostAndPort, transport::kGlobalSSLMode, timeout) - .thenRunOn(_factory->getExecutor()) - .getAsync(std::move(cb)); + auto getConnectionFunc = [this, hostAndPort, timeout]() mutable { + return get(hostAndPort, transport::kGlobalSSLMode, timeout); }; - _factory->getExecutor()->schedule(std::move(getConnectionFunc)); + retrieve_forTest(getConnectionFunc, std::move(cb)); +} + +void ConnectionPool::lease_forTest(const HostAndPort& hostAndPort, + Milliseconds timeout, + GetConnectionCallback cb) { + auto getConnectionFunc = [this, hostAndPort, timeout]() mutable { + return lease(hostAndPort, transport::kGlobalSSLMode, timeout); + }; + retrieve_forTest(getConnectionFunc, std::move(cb)); } -SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort, - transport::ConnectSSLMode sslMode, - Milliseconds timeout) { +SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::_get(const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout, + bool lease) { stdx::lock_guard lk(_mutex); auto& pool = _pools[hostAndPort]; @@ -574,7 +602,7 @@ SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPo invariant(pool); - auto connFuture = pool->getConnection(timeout); + auto connFuture = pool->getConnection(timeout, lease); pool->updateState(); return std::move(connFuture).semi(); @@ -590,6 +618,7 @@ void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const { auto& pool = kv.second; ConnectionStatsPer hostStats{pool->inUseConnections(), pool->availableConnections(), + pool->leasedConnections(), pool->createdConnections(), pool->refreshingConnections(), pool->refreshedConnections()}; @@ -625,6 +654,7 @@ ConnectionPool::SpecificPool::~SpecificPool() { if (shouldInvariantOnPoolCorrectness()) { invariant(_requests.empty()); invariant(_checkedOutPool.empty()); + invariant(_leasedPool.empty()); } } @@ -636,6 +666,10 @@ size_t ConnectionPool::SpecificPool::availableConnections() const { return _readyPool.size(); } +size_t ConnectionPool::SpecificPool::leasedConnections() const { + return _leasedPool.size(); +} + size_t ConnectionPool::SpecificPool::refreshingConnections() const { return _processingPool.size(); } @@ -649,7 +683,7 @@ size_t ConnectionPool::SpecificPool::createdConnections() const { } size_t ConnectionPool::SpecificPool::openConnections() const { - return _checkedOutPool.size() + _readyPool.size() + _processingPool.size(); + return _checkedOutPool.size() + _readyPool.size() + _processingPool.size() + _leasedPool.size(); } size_t ConnectionPool::SpecificPool::requestsPending() const { @@ -657,7 +691,7 @@ size_t ConnectionPool::SpecificPool::requestsPending() const { } Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection( - Milliseconds timeout) { + Milliseconds timeout, bool lease) { // Reset our activity timestamp auto now = _parent->_factory->now(); @@ -665,7 +699,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec // If we do not have requests, then we can fulfill immediately if (_requests.size() == 0) { - auto conn = tryGetConnection(); + auto conn = tryGetConnection(lease); if (conn) { LOGV2_DEBUG(22559, @@ -691,23 +725,24 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec const auto expiration = now + timeout; auto pf = makePromiseFuture<ConnectionHandle>(); - _requests.push_back(make_pair(expiration, std::move(pf.promise))); + _requests.push_back({expiration, std::move(pf.promise), lease}); std::push_heap(begin(_requests), end(_requests), RequestComparator{}); return std::move(pf.future); } -auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle { - auto deleter = [this, anchor = shared_from_this()](ConnectionInterface* connection) { +auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection, bool isLeased) + -> ConnectionHandle { + auto deleter = [this, anchor = shared_from_this(), isLeased](ConnectionInterface* connection) { stdx::lock_guard lk(_parent->_mutex); - returnConnection(connection); + returnConnection(connection, isLeased); _lastActiveTime = _parent->_factory->now(); updateState(); }; return ConnectionHandle(connection, std::move(deleter)); } -ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection() { +ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(bool lease) { while (_readyPool.size()) { // _readyPool is an LRUCache, so its begin() object is the MRU item. auto iter = _readyPool.begin(); @@ -729,12 +764,15 @@ ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection( auto connPtr = conn.get(); - // check out the connection - _checkedOutPool[connPtr] = std::move(conn); + if (lease) { + _leasedPool[connPtr] = std::move(conn); + } else { + _checkedOutPool[connPtr] = std::move(conn); + } // pass it to the user connPtr->resetToUnknown(); - auto handle = makeHandle(connPtr); + auto handle = makeHandle(connPtr, lease); return handle; } @@ -804,10 +842,10 @@ void ConnectionPool::SpecificPool::finishRefresh(ConnectionInterface* connPtr, S fulfillRequests(); } -void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr) { +void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, bool isLeased) { auto needsRefreshTP = connPtr->getLastUsed() + _parent->_controller->toRefreshTimeout(); - auto conn = takeFromPool(_checkedOutPool, connPtr); + auto conn = takeFromPool(isLeased ? _leasedPool : _checkedOutPool, connPtr); invariant(conn); if (_health.isShutdown) { @@ -842,8 +880,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr if (shouldRefreshConnection) { auto controls = _parent->_controller->getControls(_id); - if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >= - controls.targetConnections) { + if (openConnections() >= controls.targetConnections) { // If we already have minConnections, just let the connection lapse LOGV2(22567, "Ending idle connection to host {hostAndPort} because the pool meets " @@ -910,7 +947,7 @@ void ConnectionPool::SpecificPool::addToReady(OwnedConnection conn) { connPtr->indicateSuccess(); - returnConnection(connPtr); + returnConnection(connPtr, false); }); connPtr->setTimeout(_parent->_controller->toRefreshTimeout(), std::move(returnConnectionFunc)); } @@ -977,7 +1014,7 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status) { } for (auto& request : _requests) { - request.second.setError(status); + request.promise.setError(status); } LOGV2_DEBUG(22573, @@ -999,14 +1036,14 @@ void ConnectionPool::SpecificPool::fulfillRequests() { // deadlock). // // None of the heap manipulation code throws, but it's something to keep in mind. - auto conn = tryGetConnection(); + auto conn = tryGetConnection(_requests.front().lease); if (!conn) { break; } // Grab the request and callback - auto promise = std::move(_requests.front().second); + auto promise = std::move(_requests.front().promise); std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); _requests.pop_back(); @@ -1114,7 +1151,8 @@ void ConnectionPool::SpecificPool::updateHealth() { const auto now = _parent->_factory->now(); // We're expired if we have no sign of connection use and are past our expiry - _health.isExpired = _requests.empty() && _checkedOutPool.empty() && (_hostExpiration <= now); + _health.isExpired = _requests.empty() && _checkedOutPool.empty() && _leasedPool.empty() && + (_hostExpiration <= now); // We're failed until we get new requests or our timer triggers if (_health.isFailed) { @@ -1132,7 +1170,7 @@ void ConnectionPool::SpecificPool::updateEventTimer() { } // If our expiration comes before our next event, then it is the next event - if (_requests.empty() && _checkedOutPool.empty()) { + if (_requests.empty() && _checkedOutPool.empty() && _leasedPool.empty()) { _hostExpiration = _lastActiveTime + _parent->_controller->hostTimeout(); if ((_hostExpiration > now) && (_hostExpiration < nextEventTime)) { nextEventTime = _hostExpiration; @@ -1140,8 +1178,8 @@ void ConnectionPool::SpecificPool::updateEventTimer() { } // If a request would timeout before the next event, then it is the next event - if (_requests.size() && (_requests.front().first < nextEventTime)) { - nextEventTime = _requests.front().first; + if (_requests.size() && (_requests.front().expiration < nextEventTime)) { + nextEventTime = _requests.front().expiration; } // Clamp next event time to be either now or in the future. Next event time @@ -1169,12 +1207,12 @@ void ConnectionPool::SpecificPool::updateEventTimer() { _health.isFailed = false; - while (_requests.size() && (_requests.front().first <= now)) { + while (_requests.size() && (_requests.front().expiration <= now)) { std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); auto& request = _requests.back(); - request.second.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, - "Couldn't get a connection within the time limit")); + request.promise.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, + "Couldn't get a connection within the time limit")); _requests.pop_back(); // Since we've failed a request, we've interacted with external users @@ -1198,6 +1236,7 @@ void ConnectionPool::SpecificPool::updateController() { refreshingConnections(), availableConnections(), inUseConnections(), + leasedConnections(), }; LOGV2_DEBUG(22578, kDiagnosticLogLevel, @@ -1236,6 +1275,7 @@ void ConnectionPool::SpecificPool::updateController() { if (shouldInvariantOnPoolCorrectness()) { invariant(pool->_checkedOutPool.empty()); invariant(pool->_requests.empty()); + invariant(pool->_leasedPool.empty()); } pool->triggerShutdown(Status(ErrorCodes::ConnectionPoolExpired, diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 0e5bf90dc9a..1da17ee8897 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -79,6 +79,7 @@ public: using ConnectionHandleDeleter = std::function<void(ConnectionInterface* connection)>; using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>; + using RetrieveConnection = unique_function<SemiFuture<ConnectionHandle>()>; using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>; using PoolId = uint64_t; @@ -210,6 +211,7 @@ public: size_t pending = 0; size_t ready = 0; size_t active = 0; + size_t leased = 0; std::string toString() const; }; @@ -252,13 +254,35 @@ public: const std::function<transport::Session::TagMask(transport::Session::TagMask)>& mutateFunc) override; - SemiFuture<ConnectionHandle> get(const HostAndPort& hostAndPort, - transport::ConnectSSLMode sslMode, - Milliseconds timeout); + inline SemiFuture<ConnectionHandle> get(const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout) { + return _get(hostAndPort, sslMode, timeout, false /*lease*/); + } + void get_forTest(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb); + /** + * "Lease" a connection from the pool. + * + * Connections retrieved via this method are not assumed to be in active use for the duration of + * their lease and are reported separately in metrics. Otherwise, this method behaves similarly + * to `ConnectionPool::get`. + */ + inline SemiFuture<ConnectionHandle> lease( + const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout, + ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit) { + return _get(hostAndPort, sslMode, timeout, true /*lease*/); + } + + void lease_forTest(const HostAndPort& hostAndPort, + Milliseconds timeout, + GetConnectionCallback cb); + void appendConnectionStats(ConnectionPoolStats* stats) const; size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const; @@ -268,6 +292,13 @@ public: } private: + SemiFuture<ConnectionHandle> _get(const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout, + bool leased); + + void retrieve_forTest(RetrieveConnection retrieve, GetConnectionCallback cb); + std::string _name; const std::shared_ptr<DependentTypeFactoryInterface> _factory; diff --git a/src/mongo/executor/connection_pool_stats.cpp b/src/mongo/executor/connection_pool_stats.cpp index a3d1e1fb0b1..85d6743b127 100644 --- a/src/mongo/executor/connection_pool_stats.cpp +++ b/src/mongo/executor/connection_pool_stats.cpp @@ -36,10 +36,15 @@ namespace mongo { namespace executor { -ConnectionStatsPer::ConnectionStatsPer( - size_t nInUse, size_t nAvailable, size_t nCreated, size_t nRefreshing, size_t nRefreshed) +ConnectionStatsPer::ConnectionStatsPer(size_t nInUse, + size_t nAvailable, + size_t nLeased, + size_t nCreated, + size_t nRefreshing, + size_t nRefreshed) : inUse(nInUse), available(nAvailable), + leased(nLeased), created(nCreated), refreshing(nRefreshing), refreshed(nRefreshed) {} @@ -49,6 +54,7 @@ ConnectionStatsPer::ConnectionStatsPer() = default; ConnectionStatsPer& ConnectionStatsPer::operator+=(const ConnectionStatsPer& other) { inUse += other.inUse; available += other.available; + leased += other.leased; created += other.created; refreshing += other.refreshing; refreshed += other.refreshed; @@ -75,6 +81,7 @@ void ConnectionPoolStats::updateStatsForHost(std::string pool, // Update total connection stats. totalInUse += newStats.inUse; totalAvailable += newStats.available; + totalLeased += newStats.leased; totalCreated += newStats.created; totalRefreshing += newStats.refreshing; totalRefreshed += newStats.refreshed; @@ -83,6 +90,7 @@ void ConnectionPoolStats::updateStatsForHost(std::string pool, void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFTDC) { result.appendNumber("totalInUse", static_cast<long long>(totalInUse)); result.appendNumber("totalAvailable", static_cast<long long>(totalAvailable)); + result.appendNumber("totalLeased", static_cast<long long>(totalLeased)); result.appendNumber("totalCreated", static_cast<long long>(totalCreated)); result.appendNumber("totalRefreshing", static_cast<long long>(totalRefreshing)); result.appendNumber("totalRefreshed", static_cast<long long>(totalRefreshed)); @@ -115,6 +123,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT auto& poolStats = pool.second; poolInfo.appendNumber("poolInUse", static_cast<long long>(poolStats.inUse)); poolInfo.appendNumber("poolAvailable", static_cast<long long>(poolStats.available)); + poolInfo.appendNumber("poolLeased", static_cast<long long>(poolStats.leased)); poolInfo.appendNumber("poolCreated", static_cast<long long>(poolStats.created)); poolInfo.appendNumber("poolRefreshing", static_cast<long long>(poolStats.refreshing)); poolInfo.appendNumber("poolRefreshed", static_cast<long long>(poolStats.refreshed)); @@ -124,6 +133,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT auto& hostStats = host.second; hostInfo.appendNumber("inUse", static_cast<long long>(hostStats.inUse)); hostInfo.appendNumber("available", static_cast<long long>(hostStats.available)); + hostInfo.appendNumber("leased", static_cast<long long>(hostStats.leased)); hostInfo.appendNumber("created", static_cast<long long>(hostStats.created)); hostInfo.appendNumber("refreshing", static_cast<long long>(hostStats.refreshing)); hostInfo.appendNumber("refreshed", static_cast<long long>(hostStats.refreshed)); @@ -139,6 +149,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT auto hostStats = host.second; hostInfo.appendNumber("inUse", static_cast<long long>(hostStats.inUse)); hostInfo.appendNumber("available", static_cast<long long>(hostStats.available)); + hostInfo.appendNumber("leased", static_cast<long long>(hostStats.leased)); hostInfo.appendNumber("created", static_cast<long long>(hostStats.created)); hostInfo.appendNumber("refreshing", static_cast<long long>(hostStats.refreshing)); hostInfo.appendNumber("refreshed", static_cast<long long>(hostStats.refreshed)); diff --git a/src/mongo/executor/connection_pool_stats.h b/src/mongo/executor/connection_pool_stats.h index a96739b42be..3f4183adbf2 100644 --- a/src/mongo/executor/connection_pool_stats.h +++ b/src/mongo/executor/connection_pool_stats.h @@ -41,8 +41,12 @@ namespace executor { * a parent ConnectionPoolStats object and should not need to be created directly. */ struct ConnectionStatsPer { - ConnectionStatsPer( - size_t nInUse, size_t nAvailable, size_t nCreated, size_t nRefreshing, size_t nRefreshed); + ConnectionStatsPer(size_t nInUse, + size_t nAvailable, + size_t nLeased, + size_t nCreated, + size_t nRefreshing, + size_t nRefreshed); ConnectionStatsPer(); @@ -50,6 +54,7 @@ struct ConnectionStatsPer { size_t inUse = 0u; size_t available = 0u; + size_t leased = 0u; size_t created = 0u; size_t refreshing = 0u; size_t refreshed = 0u; @@ -68,6 +73,7 @@ struct ConnectionPoolStats { size_t totalInUse = 0u; size_t totalAvailable = 0u; + size_t totalLeased = 0u; size_t totalCreated = 0u; size_t totalRefreshing = 0u; size_t totalRefreshed = 0u; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 79eaaaf0218..e78d62c4285 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -31,6 +31,8 @@ #include "mongo/executor/connection_pool_test_fixture.h" +#include "mongo/util/duration.h" +#include "mongo/util/net/hostandport.h" #include <algorithm> #include <memory> #include <random> @@ -41,6 +43,7 @@ #include <fmt/ostream.h> #include "mongo/executor/connection_pool.h" +#include "mongo/executor/connection_pool_stats.h" #include "mongo/stdx/future.h" #include "mongo/unittest/thread_assertion_monitor.h" #include "mongo/unittest/unittest.h" @@ -52,6 +55,8 @@ namespace connection_pool_test_details { class ConnectionPoolTest : public unittest::Test { public: + constexpr static Milliseconds kNoTimeout = Milliseconds{-1}; + protected: void setUp() override {} @@ -148,6 +153,8 @@ TEST_F(ConnectionPoolTest, SameConn) { */ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { auto pool = makePool(); + std::random_device rd; + std::mt19937 rng(rd()); // Obtain a set of connections constexpr size_t kSize = 100; @@ -170,17 +177,24 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { } }); + std::uniform_int_distribution<> dist{0, 1}; for (size_t i = 0; i != kSize; ++i) { ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - monitors[i].exec([&]() { - ASSERT(swConn.isOK()); - connections.push_back(std::move(swConn.getValue())); - monitors[i].notifyDone(); - }); - }); + auto cb = [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitors[i].exec([&]() { + ASSERT(swConn.isOK()); + connections.push_back(std::move(swConn.getValue())); + monitors[i].notifyDone(); + }); + }; + auto timeout = Milliseconds(5000); + + // Randomly lease or check out connection. + if (dist(rng)) { + pool->get_forTest(HostAndPort(), timeout, cb); + } else { + pool->lease_forTest(HostAndPort(), timeout, cb); + } } for (auto& monitor : monitors) { @@ -190,8 +204,6 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { ASSERT_EQ(connections.size(), kSize); // Shuffle them into a random order - std::random_device rd; - std::mt19937 rng(rd()); std::shuffle(connections.begin(), connections.end(), rng); // Return them to the pool in that random order, recording IDs in a stack @@ -211,18 +223,24 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { // as the IDs in the stack, since the pool returns them in MRU order. for (size_t i = 0; i != kSize; ++i) { ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - monitors[i].exec([&]() { - ASSERT(swConn.isOK()); - const auto id = verifyAndGetId(swConn); - connections.push_back(std::move(swConn.getValue())); - ASSERT_EQ(id, ids.top()); - ids.pop(); - monitors[i].notifyDone(); - }); - }); + auto cb = [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitors[i].exec([&]() { + ASSERT(swConn.isOK()); + const auto id = verifyAndGetId(swConn); + connections.push_back(std::move(swConn.getValue())); + ASSERT_EQ(id, ids.top()); + ids.pop(); + monitors[i].notifyDone(); + }); + }; + auto timeout = Milliseconds(5000); + + // Randomly lease or check out connection. + if (dist(rng)) { + pool->get_forTest(HostAndPort(), timeout, cb); + } else { + pool->lease_forTest(HostAndPort(), timeout, cb); + } } for (auto& monitor : monitors) { diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 799f473a12d..1867a87e71c 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -34,6 +34,7 @@ #include <string> #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/async_client.h" #include "mongo/executor/task_executor.h" #include "mongo/transport/baton.h" #include "mongo/util/fail_point.h" @@ -252,6 +253,46 @@ public: Milliseconds timeout, Status status) = 0; + /** + * An RAII type that NetworkInterface uses to allow users to access a stream corresponding to a + * single network-level (i.e. TCP, UDS) connection on which to run commands directly. Generally + * speaking, users should use the NetworkInterface itself to run commands to take advantage of + * connection-pooling, hedging, and other features - but for special cases where users need to + * borrow their own network-stream for manual use, they can lease one through this type. + * LeasedStreams are minimal and do not offer automated health-management/automated refreshing + * while on lease - users are responsible for examining the health of the stream as-needed if + * they desire. Users are also responsible for reporting on the health of stream before the + * lease ends so that it can be subsequently re-used; see comments below for detail. + */ + class LeasedStream { + public: + virtual ~LeasedStream() = default; + + // AsyncDBClient provides the mongoRPC-API for running commands over this stream. This + // stream owns the AsyncDBClient and no outstanding networking should be scheduled on the + // client when it is destroyed. + virtual AsyncDBClient* getClient() = 0; + + // Indicates that the user is done with this leased stream, and no failures on it occured. + // Users MUST call either this function or indicateFailure before the LeasedStream is + // destroyed. + virtual void indicateSuccess() = 0; + // Indicates that the stream is unhealthy (i.e. the user received a network error indicating + // the stream failed). This prevents the stream from being reused. + virtual void indicateFailure(Status) = 0; + // Indicates that the stream has successfully performed networking over the stream. Updates + // metadata indicating the last healthy networking over the stream so that appropriate + // health-checks can be done after the lease ends. + virtual void indicateUsed() = 0; + }; + + /** + * Lease a stream from this NetworkInterface for manual use. + */ + virtual SemiFuture<std::unique_ptr<LeasedStream>> leaseStream(const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout) = 0; + protected: NetworkInterface(); }; diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp index fdf1ac6f8ba..b3f57157fe8 100644 --- a/src/mongo/executor/network_interface_integration_test.cpp +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -939,6 +939,31 @@ TEST_F(NetworkInterfaceTest, TearDownWaitsForInProgress) { ASSERT_EQ(getInProgress(), 0); } +TEST_F(NetworkInterfaceTest, RunCommandOnLeasedStream) { + auto cs = fixture(); + auto target = cs.getServers().front(); + auto leasedStream = net().leaseStream(target, transport::kGlobalSSLMode, kNoTimeout).get(); + auto* client = leasedStream->getClient(); + + auto request = RemoteCommandRequest(target, "admin", makeEchoCmdObj(), nullptr, kNoTimeout); + auto deferred = client->runCommandRequest(request); + + auto res = deferred.get(); + + ASSERT(res.elapsed); + uassertStatusOK(res.status); + leasedStream->indicateSuccess(); + leasedStream->indicateUsed(); + + // This opmsg request expect the following reply, which is generated below + // { echo: { echo: 1, foo: "bar", $db: "admin" }, ok: 1.0 } + auto cmdObj = res.data.getObjectField("echo"); + ASSERT_EQ(1, cmdObj.getIntField("echo")); + ASSERT_EQ("bar"_sd, cmdObj.getStringField("foo")); + ASSERT_EQ("admin"_sd, cmdObj.getStringField("$db")); + ASSERT_EQ(1, res.data.getIntField("ok")); +} + } // namespace } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index e096589d640..8d717506d40 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -148,6 +148,12 @@ public: void testEgress(const HostAndPort&, transport::ConnectSSLMode, Milliseconds, Status) override {} + // Stream-leasing functionality is not mocked at this time. + SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> leaseStream( + const HostAndPort&, transport::ConnectSSLMode, Milliseconds) override { + MONGO_UNREACHABLE; + } + //////////////////////////////////////////////////////////////////////////////// // // Methods for simulating network operations and the passage of time. diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 1b74e1e9ee5..0f23b2f61d7 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -33,12 +33,14 @@ #include <fmt/format.h> +#include "mongo/base/checked_cast.h" #include "mongo/config.h" #include "mongo/db/auth/security_token.h" #include "mongo/db/server_options.h" #include "mongo/db/wire_version.h" #include "mongo/executor/connection_pool_tl.h" #include "mongo/executor/hedging_metrics.h" +#include "mongo/executor/network_interface.h" #include "mongo/executor/network_interface_tl_gen.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -1349,5 +1351,32 @@ void NetworkInterfaceTL::dropConnections(const HostAndPort& hostAndPort) { _pool->dropConnections(hostAndPort); } +AsyncDBClient* NetworkInterfaceTL::LeasedStream::getClient() { + return checked_cast<connection_pool_tl::TLConnection*>(_conn.get())->client(); +} + +void NetworkInterfaceTL::LeasedStream::indicateSuccess() { + return _conn->indicateSuccess(); +} + +void NetworkInterfaceTL::LeasedStream::indicateFailure(Status status) { + _conn->indicateFailure(status); +} + +void NetworkInterfaceTL::LeasedStream::indicateUsed() { + _conn->indicateUsed(); +} + +SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> NetworkInterfaceTL::leaseStream( + const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, Milliseconds timeout) { + + return _pool->lease(hostAndPort, sslMode, timeout) + .thenRunOn(_reactor) + .then([](auto conn) -> std::unique_ptr<NetworkInterface::LeasedStream> { + auto ptr = std::make_unique<NetworkInterfaceTL::LeasedStream>(std::move(conn)); + return ptr; + }) + .semi(); +} } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 158e4b0a679..96e4ecff2fa 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -31,9 +31,12 @@ #include <deque> +#include <boost/optional.hpp> + #include "mongo/client/async_client.h" #include "mongo/db/service_context.h" #include "mongo/executor/connection_pool.h" +#include "mongo/executor/connection_pool_tl.h" #include "mongo/executor/network_interface.h" #include "mongo/logv2/log_severity.h" #include "mongo/platform/mutex.h" @@ -103,6 +106,33 @@ public: Milliseconds timeout, Status status) override; + /** + * NetworkInterfaceTL's implementation of a leased network-stream + * provided for manual use outside of the NITL's usual RPC API. + * When this type is destroyed, the destructor of the ConnectionHandle + * member will return the connection to this NetworkInterface's ConnectionPool. + */ + class LeasedStream : public NetworkInterface::LeasedStream { + public: + AsyncDBClient* getClient() override; + + LeasedStream(ConnectionPool::ConnectionHandle&& conn) : _conn{std::move(conn)} {} + + // These pass-through indications of the health of the leased + // stream to the underlying ConnectionHandle + void indicateSuccess() override; + void indicateUsed() override; + void indicateFailure(Status) override; + + private: + ConnectionPool::ConnectionHandle _conn; + }; + + SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> leaseStream( + const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout) override; + private: struct RequestState; struct RequestManager; diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp index d95a76a168f..c73967e6c50 100644 --- a/src/mongo/s/sharding_task_executor_pool_controller.cpp +++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp @@ -253,7 +253,7 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState& "maxConns"_attr = maxConns); // Update the target for just the pool first - poolData.target = stats.requests + stats.active; + poolData.target = stats.requests + stats.active + stats.leased; if (poolData.target < minConns) { poolData.target = minConns; |