diff options
-rw-r--r-- | src/mongo/client/connpool.cpp | 1 | ||||
-rw-r--r-- | src/mongo/executor/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 137 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 38 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_stats.cpp | 8 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_stats.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 228 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.cpp | 5 | ||||
-rw-r--r-- | src/mongo/executor/network_interface.h | 41 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_integration_test.cpp | 25 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 30 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 30 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.cpp | 2 |
14 files changed, 481 insertions, 74 deletions
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp index d155e238c2c..a1fd97e3e9d 100644 --- a/src/mongo/client/connpool.cpp +++ b/src/mongo/client/connpool.cpp @@ -618,6 +618,7 @@ void DBConnectionPool::appendConnectionStats(executor::ConnectionPoolStats* stat executor::ConnectionStatsPer hostStats{static_cast<size_t>(i->second.numInUse()), static_cast<size_t>(i->second.numAvailable()), + 0, static_cast<size_t>(i->second.numCreated()), 0, 0, diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index d8b929cd37b..095ee4acba7 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -383,6 +383,7 @@ env.CppIntegrationTest( 'thread_pool_task_executor_integration_test.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/client/async_client', '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/db/wire_version', '$BUILD_DIR/mongo/executor/network_interface_factory', diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index c4c70b74916..4cc00399900 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -146,8 +146,8 @@ std::string ConnectionPool::ConnectionControls::toString() const { } std::string ConnectionPool::HostState::toString() const { - return "{{ requests: {}, ready: {}, pending: {}, active: {}, isExpired: {} }}"_format( - requests, ready, pending, active, health.isExpired); + return "{{ requests: {}, ready: {}, pending: {}, active: {}, leased: {}, isExpired: {} }}"_format( + requests, ready, pending, active, leased, health.isExpired); } /** @@ -171,7 +171,7 @@ public: const auto minConns = getPool()->_options.minConnections; const auto maxConns = getPool()->_options.maxConnections; - data.target = stats.requests + stats.active; + data.target = stats.requests + stats.active + stats.leased; if (data.target < minConns) { data.target = minConns; } else if (data.target > maxConns) { @@ -284,7 +284,9 @@ public: * Gets a connection from the specific pool. Sinks a unique_lock from the * parent to preserve the lock on _mutex */ - Future<ConnectionHandle> getConnection(Milliseconds timeout, ErrorCodes::Error timeoutCode); + Future<ConnectionHandle> getConnection(Milliseconds timeout, + bool lease, + ErrorCodes::Error timeoutCode); /** * Triggers the shutdown procedure. This function sets isShutdown to true @@ -307,6 +309,11 @@ public: size_t inUseConnections() const; /** + * Returns the number of leased connections from the pool. + */ + size_t leasedConnections() const; + + /** * Returns the number of available connections in the pool. */ size_t availableConnections() const; @@ -344,7 +351,7 @@ public: /** * Returns the total number of connections currently open that belong to * this pool. This is the sum of refreshingConnections, availableConnections, - * and inUseConnections. + * inUseConnections, and leasedConnections. */ size_t openConnections() const; @@ -404,6 +411,8 @@ private: struct Request { Date_t expiration; Promise<ConnectionHandle> promise; + // Whether or not the requested connection should be "leased". + bool lease; ErrorCodes::Error timeoutCode; }; @@ -413,7 +422,7 @@ private: } }; - ConnectionHandle makeHandle(ConnectionInterface* connection); + ConnectionHandle makeHandle(ConnectionInterface* connection, bool isLeased); /** * Given a uniquely-owned OwnedConnection, returns an OwnedConnection @@ -451,11 +460,11 @@ private: void fulfillRequests(); - void returnConnection(ConnectionInterface* connPtr); + void returnConnection(ConnectionInterface* connPtr, bool isLeased); // This internal helper is used both by get and by _fulfillRequests and differs in that it // skips some bookkeeping that the other callers do on their own - ConnectionHandle tryGetConnection(); + ConnectionHandle tryGetConnection(bool lease); template <typename OwnershipPoolType> typename OwnershipPoolType::mapped_type takeFromPool( @@ -484,6 +493,7 @@ private: OwnershipPool _processingPool; OwnershipPool _droppedProcessingPool; OwnershipPool _checkedOutPool; + OwnershipPool _leasedPool; std::vector<Request> _requests; Date_t _lastActiveTime; @@ -626,24 +636,40 @@ void ConnectionPool::mutateTags( pool->mutateTags(mutateFunc); } -void ConnectionPool::get_forTest(const HostAndPort& hostAndPort, - Milliseconds timeout, - ErrorCodes::Error timeoutCode, - GetConnectionCallback cb) { +void ConnectionPool::retrieve_forTest(RetrieveConnection retrieve, GetConnectionCallback cb) { // We kick ourselves onto the executor queue to prevent us from deadlocking with our own thread auto getConnectionFunc = - [this, hostAndPort, timeout, timeoutCode, cb = std::move(cb)](Status&&) mutable { - get(hostAndPort, transport::kGlobalSSLMode, timeout, timeoutCode) - .thenRunOn(_factory->getExecutor()) - .getAsync(std::move(cb)); + [this, retrieve = std::move(retrieve), cb = std::move(cb)](Status&&) mutable { + retrieve().thenRunOn(_factory->getExecutor()).getAsync(std::move(cb)); }; _factory->getExecutor()->schedule(std::move(getConnectionFunc)); } -SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort, - transport::ConnectSSLMode sslMode, - Milliseconds timeout, - ErrorCodes::Error timeoutCode) { +void ConnectionPool::get_forTest(const HostAndPort& hostAndPort, + Milliseconds timeout, + ErrorCodes::Error timeoutCode, + GetConnectionCallback cb) { + auto getConnectionFunc = [this, hostAndPort, timeout, timeoutCode]() mutable { + return get(hostAndPort, transport::kGlobalSSLMode, timeout, timeoutCode); + }; + retrieve_forTest(getConnectionFunc, std::move(cb)); +} + +void ConnectionPool::lease_forTest(const HostAndPort& hostAndPort, + Milliseconds timeout, + ErrorCodes::Error timeoutCode, + GetConnectionCallback cb) { + auto getConnectionFunc = [this, hostAndPort, timeout, timeoutCode]() mutable { + return lease(hostAndPort, transport::kGlobalSSLMode, timeout, timeoutCode); + }; + retrieve_forTest(getConnectionFunc, std::move(cb)); +} + +SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::_get(const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout, + bool lease, + ErrorCodes::Error timeoutCode) { auto connRequestedAt = _factory->now(); stdx::lock_guard lk(_mutex); @@ -657,10 +683,12 @@ SemiFuture<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPo invariant(pool); - auto connFuture = pool->getConnection(timeout, timeoutCode); + auto connFuture = pool->getConnection(timeout, lease, timeoutCode); pool->updateState(); - if (gFeatureFlagConnHealthMetrics.isEnabledAndIgnoreFCV()) { + // Only count connections being checked-out for ordinary use, not lease, towards cumulative wait + // time. + if (gFeatureFlagConnHealthMetrics.isEnabledAndIgnoreFCV() && !lease) { connFuture = std::move(connFuture).tap([connRequestedAt, pool = pool](const auto& conn) { pool->recordConnectionWaitTime(connRequestedAt); }); @@ -679,6 +707,7 @@ void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const { auto& pool = kv.second; ConnectionStatsPer hostStats{pool->inUseConnections(), pool->availableConnections(), + pool->leasedConnections(), pool->createdConnections(), pool->refreshingConnections(), pool->refreshedConnections(), @@ -721,6 +750,7 @@ ConnectionPool::SpecificPool::~SpecificPool() { if (shouldInvariantOnPoolCorrectness()) { invariant(_requests.empty()); invariant(_checkedOutPool.empty()); + invariant(_leasedPool.empty()); } } @@ -732,6 +762,10 @@ size_t ConnectionPool::SpecificPool::availableConnections() const { return _readyPool.size(); } +size_t ConnectionPool::SpecificPool::leasedConnections() const { + return _leasedPool.size(); +} + size_t ConnectionPool::SpecificPool::refreshingConnections() const { return _processingPool.size(); } @@ -757,7 +791,7 @@ Milliseconds ConnectionPool::SpecificPool::getTotalConnUsageTime() const { } size_t ConnectionPool::SpecificPool::openConnections() const { - return _checkedOutPool.size() + _readyPool.size() + _processingPool.size(); + return _checkedOutPool.size() + _readyPool.size() + _processingPool.size() + _leasedPool.size(); } size_t ConnectionPool::SpecificPool::requestsPending() const { @@ -765,7 +799,7 @@ size_t ConnectionPool::SpecificPool::requestsPending() const { } Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection( - Milliseconds timeout, ErrorCodes::Error timeoutCode) { + Milliseconds timeout, bool lease, ErrorCodes::Error timeoutCode) { // Reset our activity timestamp auto now = _parent->_factory->now(); @@ -799,7 +833,7 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec // If we do not have requests, then we can fulfill immediately if (_requests.size() == 0) { - auto conn = tryGetConnection(); + auto conn = tryGetConnection(lease); if (conn) { LOGV2_DEBUG(22559, @@ -821,26 +855,32 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec const auto expiration = now + timeout; auto pf = makePromiseFuture<ConnectionHandle>(); - _requests.push_back({expiration, std::move(pf.promise), timeoutCode}); + _requests.push_back({expiration, std::move(pf.promise), lease, timeoutCode}); std::push_heap(begin(_requests), end(_requests), RequestComparator{}); return std::move(pf.future); } -auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle { +auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection, bool isLeased) + -> ConnectionHandle { auto connUseStartedAt = _parent->_getFastClockSource()->now(); - auto deleter = - [this, anchor = shared_from_this(), connUseStartedAt](ConnectionInterface* connection) { - stdx::lock_guard lk(_parent->_mutex); + auto deleter = [this, anchor = shared_from_this(), connUseStartedAt, isLeased]( + ConnectionInterface* connection) { + stdx::lock_guard lk(_parent->_mutex); + + // Leased connections don't count towards the pool's total connection usage time. + if (!isLeased) { _totalConnUsageTime += _parent->_getFastClockSource()->now() - connUseStartedAt; - returnConnection(connection); - _lastActiveTime = _parent->_factory->now(); - updateState(); - }; + } + + returnConnection(connection, isLeased); + _lastActiveTime = _parent->_factory->now(); + updateState(); + }; return ConnectionHandle(connection, std::move(deleter)); } -ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection() { +ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(bool lease) { while (_readyPool.size()) { // _readyPool is an LRUCache, so its begin() object is the MRU item. auto iter = _readyPool.begin(); @@ -862,12 +902,15 @@ ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection( auto connPtr = conn.get(); - // check out the connection - _checkedOutPool[connPtr] = std::move(conn); + if (lease) { + _leasedPool[connPtr] = std::move(conn); + } else { + _checkedOutPool[connPtr] = std::move(conn); + } // pass it to the user connPtr->resetToUnknown(); - auto handle = makeHandle(connPtr); + auto handle = makeHandle(connPtr, lease); return handle; } @@ -948,10 +991,10 @@ void ConnectionPool::SpecificPool::finishRefresh(ConnectionInterface* connPtr, S fulfillRequests(); } -void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr) { +void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, bool isLeased) { auto needsRefreshTP = connPtr->getLastUsed() + _parent->_controller->toRefreshTimeout(); - auto conn = takeFromPool(_checkedOutPool, connPtr); + auto conn = takeFromPool(isLeased ? _leasedPool : _checkedOutPool, connPtr); invariant(conn); if (_health.isShutdown) { @@ -986,8 +1029,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr if (shouldRefreshConnection) { auto controls = _parent->_controller->getControls(_id); - if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >= - controls.targetConnections) { + if (openConnections() >= controls.targetConnections) { // If we already have minConnections, just let the connection lapse LOGV2(22567, "Ending idle connection to host {hostAndPort} because the pool meets " @@ -1054,7 +1096,7 @@ void ConnectionPool::SpecificPool::addToReady(OwnedConnection conn) { connPtr->indicateSuccess(); - returnConnection(connPtr); + returnConnection(connPtr, false); }); connPtr->setTimeout(_parent->_controller->toRefreshTimeout(), std::move(returnConnectionFunc)); } @@ -1143,7 +1185,7 @@ void ConnectionPool::SpecificPool::fulfillRequests() { // deadlock). // // None of the heap manipulation code throws, but it's something to keep in mind. - auto conn = tryGetConnection(); + auto conn = tryGetConnection(_requests.front().lease); if (!conn) { break; @@ -1259,7 +1301,8 @@ void ConnectionPool::SpecificPool::updateHealth() { const auto now = _parent->_factory->now(); // We're expired if we have no sign of connection use and are past our expiry - _health.isExpired = _requests.empty() && _checkedOutPool.empty() && (_hostExpiration <= now); + _health.isExpired = _requests.empty() && _checkedOutPool.empty() && _leasedPool.empty() && + (_hostExpiration <= now); // We're failed until we get new requests or our timer triggers if (_health.isFailed) { @@ -1277,7 +1320,7 @@ void ConnectionPool::SpecificPool::updateEventTimer() { } // If our expiration comes before our next event, then it is the next event - if (_requests.empty() && _checkedOutPool.empty()) { + if (_requests.empty() && _checkedOutPool.empty() && _leasedPool.empty()) { _hostExpiration = _lastActiveTime + _parent->_controller->hostTimeout(); if ((_hostExpiration > now) && (_hostExpiration < nextEventTime)) { nextEventTime = _hostExpiration; @@ -1343,6 +1386,7 @@ void ConnectionPool::SpecificPool::updateController() { refreshingConnections(), availableConnections(), inUseConnections(), + leasedConnections(), }; LOGV2_DEBUG(22578, kDiagnosticLogLevel, @@ -1381,6 +1425,7 @@ void ConnectionPool::SpecificPool::updateController() { if (shouldInvariantOnPoolCorrectness()) { invariant(pool->_checkedOutPool.empty()); invariant(pool->_requests.empty()); + invariant(pool->_leasedPool.empty()); } pool->triggerShutdown(Status(ErrorCodes::ConnectionPoolExpired, diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 74f4a3adc88..de1adc2a5fa 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -81,6 +81,7 @@ public: using ConnectionHandleDeleter = std::function<void(ConnectionInterface* connection)>; using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>; + using RetrieveConnection = unique_function<SemiFuture<ConnectionHandle>()>; using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>; using PoolId = uint64_t; @@ -212,6 +213,7 @@ public: size_t pending = 0; size_t ready = 0; size_t active = 0; + size_t leased = 0; std::string toString() const; }; @@ -254,16 +256,39 @@ public: const std::function<transport::Session::TagMask(transport::Session::TagMask)>& mutateFunc) override; - SemiFuture<ConnectionHandle> get( + inline SemiFuture<ConnectionHandle> get( const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, Milliseconds timeout, - ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit); + ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit) { + return _get(hostAndPort, sslMode, timeout, false /*lease*/, timeoutCode); + } + void get_forTest(const HostAndPort& hostAndPort, Milliseconds timeout, ErrorCodes::Error timeoutCode, GetConnectionCallback cb); + /** + * "Lease" a connection from the pool. + * + * Connections retrieved via this method are not assumed to be in active use for the duration of + * their lease and are reported separately in metrics. Otherwise, this method behaves similarly + * to `ConnectionPool::get`. + */ + inline SemiFuture<ConnectionHandle> lease( + const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout, + ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit) { + return _get(hostAndPort, sslMode, timeout, true /*lease*/, timeoutCode); + } + + void lease_forTest(const HostAndPort& hostAndPort, + Milliseconds timeout, + ErrorCodes::Error timeoutCode, + GetConnectionCallback cb); + void appendConnectionStats(ConnectionPoolStats* stats) const; size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const; @@ -275,6 +300,15 @@ public: private: ClockSource* _getFastClockSource() const; + SemiFuture<ConnectionHandle> _get( + const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout, + bool leased, + ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit); + + void retrieve_forTest(RetrieveConnection retrieve, GetConnectionCallback cb); + std::string _name; const std::shared_ptr<DependentTypeFactoryInterface> _factory; diff --git a/src/mongo/executor/connection_pool_stats.cpp b/src/mongo/executor/connection_pool_stats.cpp index 83f48c923f8..42ea5027b90 100644 --- a/src/mongo/executor/connection_pool_stats.cpp +++ b/src/mongo/executor/connection_pool_stats.cpp @@ -48,6 +48,7 @@ constexpr auto kAcquisitionWaitTimesKey = "acquisitionWaitTimes"_sd; ConnectionStatsPer::ConnectionStatsPer(size_t nInUse, size_t nAvailable, + size_t nLeased, size_t nCreated, size_t nRefreshing, size_t nRefreshed, @@ -56,6 +57,7 @@ ConnectionStatsPer::ConnectionStatsPer(size_t nInUse, Milliseconds nConnUsageTime) : inUse(nInUse), available(nAvailable), + leased(nLeased), created(nCreated), refreshing(nRefreshing), refreshed(nRefreshed), @@ -68,6 +70,7 @@ ConnectionStatsPer::ConnectionStatsPer() = default; ConnectionStatsPer& ConnectionStatsPer::operator+=(const ConnectionStatsPer& other) { inUse += other.inUse; available += other.available; + leased += other.leased; created += other.created; refreshing += other.refreshing; refreshed += other.refreshed; @@ -98,6 +101,7 @@ void ConnectionPoolStats::updateStatsForHost(std::string pool, // Update total connection stats. totalInUse += newStats.inUse; totalAvailable += newStats.available; + totalLeased += newStats.leased; totalCreated += newStats.created; totalRefreshing += newStats.refreshing; totalRefreshed += newStats.refreshed; @@ -112,6 +116,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT result.appendNumber("totalInUse", static_cast<long long>(totalInUse)); result.appendNumber("totalAvailable", static_cast<long long>(totalAvailable)); + result.appendNumber("totalLeased", static_cast<long long>(totalLeased)); result.appendNumber("totalCreated", static_cast<long long>(totalCreated)); result.appendNumber("totalRefreshing", static_cast<long long>(totalRefreshing)); result.appendNumber("totalRefreshed", static_cast<long long>(totalRefreshed)); @@ -156,6 +161,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT BSONObjBuilder poolInfo(poolBuilder.subobjStart(pool)); poolInfo.appendNumber("poolInUse", static_cast<long long>(stats.inUse)); poolInfo.appendNumber("poolAvailable", static_cast<long long>(stats.available)); + poolInfo.appendNumber("poolLeased", static_cast<long long>(stats.leased)); poolInfo.appendNumber("poolCreated", static_cast<long long>(stats.created)); poolInfo.appendNumber("poolRefreshing", static_cast<long long>(stats.refreshing)); poolInfo.appendNumber("poolRefreshed", static_cast<long long>(stats.refreshed)); @@ -169,6 +175,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT BSONObjBuilder hostInfo(poolInfo.subobjStart(host.toString())); hostInfo.appendNumber("inUse", static_cast<long long>(stats.inUse)); hostInfo.appendNumber("available", static_cast<long long>(stats.available)); + hostInfo.appendNumber("leased", static_cast<long long>(stats.leased)); hostInfo.appendNumber("created", static_cast<long long>(stats.created)); hostInfo.appendNumber("refreshing", static_cast<long long>(stats.refreshing)); hostInfo.appendNumber("refreshed", static_cast<long long>(stats.refreshed)); @@ -188,6 +195,7 @@ void ConnectionPoolStats::appendToBSON(mongo::BSONObjBuilder& result, bool forFT BSONObjBuilder hostInfo(hostBuilder.subobjStart(host.toString())); hostInfo.appendNumber("inUse", static_cast<long long>(stats.inUse)); hostInfo.appendNumber("available", static_cast<long long>(stats.available)); + hostInfo.appendNumber("leased", static_cast<long long>(stats.leased)); hostInfo.appendNumber("created", static_cast<long long>(stats.created)); hostInfo.appendNumber("refreshing", static_cast<long long>(stats.refreshing)); hostInfo.appendNumber("refreshed", static_cast<long long>(stats.refreshed)); diff --git a/src/mongo/executor/connection_pool_stats.h b/src/mongo/executor/connection_pool_stats.h index f6f62ba2f6c..dec837141b0 100644 --- a/src/mongo/executor/connection_pool_stats.h +++ b/src/mongo/executor/connection_pool_stats.h @@ -56,6 +56,7 @@ public: struct ConnectionStatsPer { ConnectionStatsPer(size_t nInUse, size_t nAvailable, + size_t nLeased, size_t nCreated, size_t nRefreshing, size_t nRefreshed, @@ -69,6 +70,7 @@ struct ConnectionStatsPer { size_t inUse = 0u; size_t available = 0u; + size_t leased = 0u; size_t created = 0u; size_t refreshing = 0u; size_t refreshed = 0u; @@ -91,6 +93,7 @@ struct ConnectionPoolStats { size_t totalInUse = 0u; size_t totalAvailable = 0u; + size_t totalLeased = 0u; size_t totalCreated = 0u; size_t totalRefreshing = 0u; size_t totalRefreshed = 0u; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index feaabe0bd32..1c657e26ec8 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -29,6 +29,8 @@ #include "mongo/executor/connection_pool_test_fixture.h" +#include "mongo/util/duration.h" +#include "mongo/util/net/hostandport.h" #include <algorithm> #include <memory> #include <random> @@ -39,6 +41,7 @@ #include <fmt/ostream.h> #include "mongo/executor/connection_pool.h" +#include "mongo/executor/connection_pool_stats.h" #include "mongo/stdx/future.h" #include "mongo/unittest/thread_assertion_monitor.h" #include "mongo/unittest/unittest.h" @@ -50,6 +53,8 @@ namespace connection_pool_test_details { class ConnectionPoolTest : public unittest::Test { public: + constexpr static Milliseconds kNoTimeout = Milliseconds{-1}; + protected: void setUp() override {} @@ -180,6 +185,8 @@ TEST_F(ConnectionPoolTest, SameConn) { */ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { auto pool = makePool(); + std::random_device rd; + std::mt19937 rng(rd()); // Obtain a set of connections constexpr size_t kSize = 100; @@ -202,18 +209,25 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { } }); + std::uniform_int_distribution<> dist{0, 1}; for (size_t i = 0; i != kSize; ++i) { ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - ErrorCodes::NetworkInterfaceExceededTimeLimit, - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - monitors[i].exec([&]() { - ASSERT(swConn.isOK()); - connections.push_back(std::move(swConn.getValue())); - monitors[i].notifyDone(); - }); - }); + auto cb = [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitors[i].exec([&]() { + ASSERT(swConn.isOK()); + connections.push_back(std::move(swConn.getValue())); + monitors[i].notifyDone(); + }); + }; + auto timeout = Milliseconds(5000); + auto errorCode = ErrorCodes::NetworkInterfaceExceededTimeLimit; + + // Randomly lease or check out connection. + if (dist(rng)) { + pool->get_forTest(HostAndPort(), timeout, errorCode, cb); + } else { + pool->lease_forTest(HostAndPort(), timeout, errorCode, cb); + } } for (auto& monitor : monitors) { @@ -223,8 +237,6 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { ASSERT_EQ(connections.size(), kSize); // Shuffle them into a random order - std::random_device rd; - std::mt19937 rng(rd()); std::shuffle(connections.begin(), connections.end(), rng); // Return them to the pool in that random order, recording IDs in a stack @@ -244,19 +256,25 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { // as the IDs in the stack, since the pool returns them in MRU order. for (size_t i = 0; i != kSize; ++i) { ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - ErrorCodes::NetworkInterfaceExceededTimeLimit, - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - monitors[i].exec([&]() { - ASSERT(swConn.isOK()); - const auto id = verifyAndGetId(swConn); - connections.push_back(std::move(swConn.getValue())); - ASSERT_EQ(id, ids.top()); - ids.pop(); - monitors[i].notifyDone(); - }); - }); + auto cb = [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitors[i].exec([&]() { + ASSERT(swConn.isOK()); + const auto id = verifyAndGetId(swConn); + connections.push_back(std::move(swConn.getValue())); + ASSERT_EQ(id, ids.top()); + ids.pop(); + monitors[i].notifyDone(); + }); + }; + auto timeout = Milliseconds(5000); + auto errorCode = ErrorCodes::NetworkInterfaceExceededTimeLimit; + + // Randomly lease or check out connection. + if (dist(rng)) { + pool->get_forTest(HostAndPort(), timeout, errorCode, cb); + } else { + pool->lease_forTest(HostAndPort(), timeout, errorCode, cb); + } } for (auto& monitor : monitors) { @@ -1845,6 +1863,166 @@ TEST_F(ConnectionPoolTest, ReturnAfterShutdown) { pool->shutdown(); } +TEST_F(ConnectionPoolTest, TotalConnUseTimeIncreasedForCheckedOutConnection) { + constexpr Milliseconds checkOutLength = Milliseconds(10); + auto pool = makePool(); + + ConnectionPoolStats initialStats; + pool->appendConnectionStats(&initialStats); + + auto startTimePoint = Date_t::now(); + auto endTimePoint = startTimePoint + checkOutLength; + PoolImpl::setNow(startTimePoint); + + ConnectionImpl::pushSetup(Status::OK()); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + PoolImpl::setNow(endTimePoint); + doneWith(swConn.getValue()); + }); + + ConnectionPoolStats finalStats; + pool->appendConnectionStats(&finalStats); + + auto totalTimeUsageDelta = finalStats.totalConnUsageTime - initialStats.totalConnUsageTime; + ASSERT_GREATER_THAN_OR_EQUALS(totalTimeUsageDelta, checkOutLength); +} + +TEST_F(ConnectionPoolTest, OverlappingCheckoutsAdditivelyContributeToTotalUsageTime) { + constexpr Milliseconds checkOutLength = Milliseconds(10); + auto pool = makePool(); + + ConnectionPoolStats initialStats; + pool->appendConnectionStats(&initialStats); + + auto startTimePoint = Date_t::now(); + auto endTimePoint = startTimePoint + checkOutLength; + PoolImpl::setNow(startTimePoint); + + // Check out multiple connections. + constexpr int numConnections = 2; + std::vector<ConnectionPool::ConnectionHandle> connections; + // Ensure that no matter how we leave the test, we mark any + // checked out connections as OK before implicity returning them + // to the pool by destroying the 'connections' vector. Otherwise, + // this test would cause an invariant failure instead of a normal + // test failure if it fails, which would be confusing. + ScopeGuard guard([&] { + while (!connections.empty()) { + try { + ConnectionPool::ConnectionHandle conn = std::move(connections.back()); + connections.pop_back(); + doneWith(conn); + } catch (...) { + } + } + }); + + for (int i = 0; i < numConnections; ++i) { + ConnectionImpl::pushSetup(Status::OK()); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + connections.push_back(std::move(swConn.getValue())); + }); + } + ASSERT_EQ(connections.size(), numConnections); + // Advance the time and return the connections. + PoolImpl::setNow(endTimePoint); + while (!connections.empty()) { + try { + ConnectionPool::ConnectionHandle conn = std::move(connections.back()); + ASSERT(conn); + connections.pop_back(); + doneWith(conn); + } catch (...) { + } + } + guard.dismiss(); + + ConnectionPoolStats finalStats; + pool->appendConnectionStats(&finalStats); + + auto totalTimeUsageDelta = finalStats.totalConnUsageTime - initialStats.totalConnUsageTime; + // Since each connection was used for checkOutLength, the total usage time should be >= the + // product. + ASSERT_GREATER_THAN_OR_EQUALS(totalTimeUsageDelta, checkOutLength * numConnections); +} + +TEST_F(ConnectionPoolTest, LeasedConnectionsDontCountTowardsUsageTime) { + constexpr Milliseconds checkOutLength = Milliseconds(10); + auto pool = makePool(); + + ConnectionPoolStats initialStats; + pool->appendConnectionStats(&initialStats); + + auto startTimePoint = Date_t::now(); + auto endTimePoint = startTimePoint + checkOutLength; + PoolImpl::setNow(startTimePoint); + + ConnectionImpl::pushSetup(Status::OK()); + pool->lease_forTest(HostAndPort(), + Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + PoolImpl::setNow(endTimePoint); + doneWith(swConn.getValue()); + }); + + ConnectionPoolStats finalStats; + pool->appendConnectionStats(&finalStats); + + auto totalTimeUsageDelta = finalStats.totalConnUsageTime - initialStats.totalConnUsageTime; + ASSERT_EQ(totalTimeUsageDelta, Milliseconds(0)); +} + +TEST_F(ConnectionPoolTest, LeasedConnectionsDontInterfereWithOrdinaryCheckoutUsageTime) { + constexpr Milliseconds checkOutLength = Milliseconds(10); + auto pool = makePool(); + + ConnectionPoolStats initialStats; + pool->appendConnectionStats(&initialStats); + + auto startTimePoint = Date_t::now(); + auto endTimePoint = startTimePoint + checkOutLength; + PoolImpl::setNow(startTimePoint); + + ConnectionImpl::pushSetup(Status::OK()); + + // Checkout one connection and lease one connection. + ConnectionPool::ConnectionHandle normal; + ConnectionPool::ConnectionHandle leased; + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + normal = std::move(swConn.getValue()); + }); + pool->lease_forTest(HostAndPort(), + Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + leased = std::move(swConn.getValue()); + }); + + // Advance the time and return the connections. + PoolImpl::setNow(endTimePoint); + doneWith(normal); + doneWith(leased); + + ConnectionPoolStats finalStats; + pool->appendConnectionStats(&finalStats); + + auto totalTimeUsageDelta = finalStats.totalConnUsageTime - initialStats.totalConnUsageTime; + // Should only include usage time from the checkout, not the lease + ASSERT_GREATER_THAN_OR_EQUALS(totalTimeUsageDelta, checkOutLength); + ASSERT_LESS_THAN(totalTimeUsageDelta, checkOutLength * 2); +} + } // namespace connection_pool_test_details } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index c83529ad482..bb98af43b54 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -238,6 +238,11 @@ Date_t PoolImpl::now() { } void PoolImpl::setNow(Date_t now) { + if (_now) { + // If we're not initializing the virtual clock, advance the fast clock source as well. + Milliseconds diff = now - *_now; + _fastClockSource.advance(diff); + } _now = now; TimerImpl::fireIfNecessary(); } diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 91cd5fd3a64..8547dc2d81a 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -34,6 +34,7 @@ #include <string> #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/async_client.h" #include "mongo/executor/task_executor.h" #include "mongo/transport/baton.h" #include "mongo/util/fail_point.h" @@ -251,6 +252,46 @@ public: Milliseconds timeout, Status status) = 0; + /** + * An RAII type that NetworkInterface uses to allow users to access a stream corresponding to a + * single network-level (i.e. TCP, UDS) connection on which to run commands directly. Generally + * speaking, users should use the NetworkInterface itself to run commands to take advantage of + * connection-pooling, hedging, and other features - but for special cases where users need to + * borrow their own network-stream for manual use, they can lease one through this type. + * LeasedStreams are minimal and do not offer automated health-management/automated refreshing + * while on lease - users are responsible for examining the health of the stream as-needed if + * they desire. Users are also responsible for reporting on the health of stream before the + * lease ends so that it can be subsequently re-used; see comments below for detail. + */ + class LeasedStream { + public: + virtual ~LeasedStream() = default; + + // AsyncDBClient provides the mongoRPC-API for running commands over this stream. This + // stream owns the AsyncDBClient and no outstanding networking should be scheduled on the + // client when it is destroyed. + virtual AsyncDBClient* getClient() = 0; + + // Indicates that the user is done with this leased stream, and no failures on it occured. + // Users MUST call either this function or indicateFailure before the LeasedStream is + // destroyed. + virtual void indicateSuccess() = 0; + // Indicates that the stream is unhealthy (i.e. the user received a network error indicating + // the stream failed). This prevents the stream from being reused. + virtual void indicateFailure(Status) = 0; + // Indicates that the stream has successfully performed networking over the stream. Updates + // metadata indicating the last healthy networking over the stream so that appropriate + // health-checks can be done after the lease ends. + virtual void indicateUsed() = 0; + }; + + /** + * Lease a stream from this NetworkInterface for manual use. + */ + virtual SemiFuture<std::unique_ptr<LeasedStream>> leaseStream(const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout) = 0; + protected: NetworkInterface(); }; diff --git a/src/mongo/executor/network_interface_integration_test.cpp b/src/mongo/executor/network_interface_integration_test.cpp index c2b7c7a6c0a..3d84a640cbf 100644 --- a/src/mongo/executor/network_interface_integration_test.cpp +++ b/src/mongo/executor/network_interface_integration_test.cpp @@ -1117,6 +1117,31 @@ TEST_F(NetworkInterfaceTest, TearDownWaitsForInProgress) { ASSERT_EQ(getInProgress(), 0); } +TEST_F(NetworkInterfaceTest, RunCommandOnLeasedStream) { + auto cs = fixture(); + auto target = cs.getServers().front(); + auto leasedStream = net().leaseStream(target, transport::kGlobalSSLMode, kNoTimeout).get(); + auto* client = leasedStream->getClient(); + + auto request = RemoteCommandRequest(target, "admin", makeEchoCmdObj(), nullptr, kNoTimeout); + auto deferred = client->runCommandRequest(request); + + auto res = deferred.get(); + + ASSERT(res.elapsed); + uassertStatusOK(res.status); + leasedStream->indicateSuccess(); + leasedStream->indicateUsed(); + + // This opmsg request expect the following reply, which is generated below + // { echo: { echo: 1, foo: "bar", $db: "admin" }, ok: 1.0 } + auto cmdObj = res.data.getObjectField("echo"); + ASSERT_EQ(1, cmdObj.getIntField("echo")); + ASSERT_EQ("bar"_sd, cmdObj.getStringField("foo")); + ASSERT_EQ("admin"_sd, cmdObj.getStringField("$db")); + ASSERT_EQ(1, res.data.getIntField("ok")); +} + } // namespace } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index b63b3c6ba77..d528338674d 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -148,6 +148,12 @@ public: void testEgress(const HostAndPort&, transport::ConnectSSLMode, Milliseconds, Status) override {} + // Stream-leasing functionality is not mocked at this time. + SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> leaseStream( + const HostAndPort&, transport::ConnectSSLMode, Milliseconds) override { + MONGO_UNIMPLEMENTED; + } + //////////////////////////////////////////////////////////////////////////////// // // Methods for simulating network operations and the passage of time. diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index c1b3def21ef..794603e356c 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -32,6 +32,7 @@ #include <fmt/format.h> +#include "mongo/base/checked_cast.h" #include "mongo/config.h" #include "mongo/db/auth/validated_tenancy_scope.h" #include "mongo/db/commands/server_status_metric.h" @@ -42,6 +43,7 @@ #include "mongo/executor/connection_pool_tl.h" #include "mongo/executor/hedge_options_util.h" #include "mongo/executor/hedging_metrics.h" +#include "mongo/executor/network_interface.h" #include "mongo/executor/network_interface_tl_gen.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -1465,5 +1467,33 @@ bool NetworkInterfaceTL::onNetworkThread() { void NetworkInterfaceTL::dropConnections(const HostAndPort& hostAndPort) { _pool->dropConnections(hostAndPort); } + +AsyncDBClient* NetworkInterfaceTL::LeasedStream::getClient() { + return checked_cast<connection_pool_tl::TLConnection*>(_conn.get())->client(); +} + +void NetworkInterfaceTL::LeasedStream::indicateSuccess() { + return _conn->indicateSuccess(); +} + +void NetworkInterfaceTL::LeasedStream::indicateFailure(Status status) { + _conn->indicateFailure(status); +} + +void NetworkInterfaceTL::LeasedStream::indicateUsed() { + _conn->indicateUsed(); +} + +SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> NetworkInterfaceTL::leaseStream( + const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, Milliseconds timeout) { + + return _pool->lease(hostAndPort, sslMode, timeout) + .thenRunOn(_reactor) + .then([](auto conn) -> std::unique_ptr<NetworkInterface::LeasedStream> { + auto ptr = std::make_unique<NetworkInterfaceTL::LeasedStream>(std::move(conn)); + return ptr; + }) + .semi(); +} } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index a9351ca689f..52bbe07273a 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -31,9 +31,12 @@ #include <deque> +#include <boost/optional.hpp> + #include "mongo/client/async_client.h" #include "mongo/db/service_context.h" #include "mongo/executor/connection_pool.h" +#include "mongo/executor/connection_pool_tl.h" #include "mongo/executor/network_interface.h" #include "mongo/logv2/log_severity.h" #include "mongo/platform/mutex.h" @@ -103,6 +106,33 @@ public: Milliseconds timeout, Status status) override; + /** + * NetworkInterfaceTL's implementation of a leased network-stream + * provided for manual use outside of the NITL's usual RPC API. + * When this type is destroyed, the destructor of the ConnectionHandle + * member will return the connection to this NetworkInterface's ConnectionPool. + */ + class LeasedStream : public NetworkInterface::LeasedStream { + public: + AsyncDBClient* getClient() override; + + LeasedStream(ConnectionPool::ConnectionHandle&& conn) : _conn{std::move(conn)} {} + + // These pass-through indications of the health of the leased + // stream to the underlying ConnectionHandle + void indicateSuccess() override; + void indicateUsed() override; + void indicateFailure(Status) override; + + private: + ConnectionPool::ConnectionHandle _conn; + }; + + SemiFuture<std::unique_ptr<NetworkInterface::LeasedStream>> leaseStream( + const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout) override; + private: struct RequestState; struct RequestManager; diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp index 4b416d86f05..826145e3339 100644 --- a/src/mongo/s/sharding_task_executor_pool_controller.cpp +++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp @@ -252,7 +252,7 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState& "maxConns"_attr = maxConns); // Update the target for just the pool first - poolData.target = stats.requests + stats.active; + poolData.target = stats.requests + stats.active + stats.leased; if (poolData.target < minConns) { poolData.target = minConns; |