From 81cd7f7f3d69937a7c7b2a9ee58ac326ce059fe3 Mon Sep 17 00:00:00 2001 From: Daniel Morilha Date: Thu, 23 Jun 2022 05:42:07 +0000 Subject: SERVER-43155 Queries which exceed `maxTimeMS` may return `NetworkInterfaceExceededTimeLimit` After careful evaluation and research the service-arch team settled into adding a new argument into connection pool's interface allowing their users to specify a custom timeout status in case the pool fails to deliver a connection within a specified time. This timeout status now competes with `NetworkInterfaceExceededTimeLimit` which is now returned either when the Connection Pool Controller is rather used to compute the timeout or when no custom timeout status code is specified. This commit also: - Refactors portions of `ConnectionPoolTest` to reflect changes in the API. - Adds `jstests/sharding/max_time_ms_connection_pool.js` integration test to ensure different timeouts return different statuses. - Fixes small glitch in `jstests/core/tnxs/many_txns.js` where `NetworkInterfaceExceededTimeLimit` might be validly returned under rare resource intense conditions and fail the test, reported as BF-20554. (cherry picked from commit 3fd2a61085f8c72a13d1bc00ad95c460c69eb8a8) --- jstests/core/txns/many_txns.js | 11 +- jstests/sharding/max_time_ms_connection_pool.js | 44 +++++ src/mongo/db/query/max_time_ms_parser.cpp | 17 +- src/mongo/executor/connection_pool.cpp | 82 +++++++--- src/mongo/executor/connection_pool.h | 9 +- src/mongo/executor/connection_pool_test.cpp | 209 ++++++++++++++++++------ src/mongo/executor/network_interface_tl.cpp | 3 +- 7 files changed, 284 insertions(+), 91 deletions(-) create mode 100644 jstests/sharding/max_time_ms_connection_pool.js diff --git a/jstests/core/txns/many_txns.js b/jstests/core/txns/many_txns.js index 2dfda376423..a3c80222461 100644 --- a/jstests/core/txns/many_txns.js +++ b/jstests/core/txns/many_txns.js @@ -74,7 +74,16 @@ for (let txnNr = 0; txnNr < numTxns; ++txnNr) { continue; } assert.commandWorked(commitRes, "couldn't commit transaction " + txnNr); - assert.commandFailedWithCode(insertRes, ErrorCodes.MaxTimeMSExpired, tojson({insertCmd})); + // This assertion relies on the fact a previous transaction with the same insertion + // has started and is still pending. + // The test assumes inserting the same record will invariably return `MaxTimeMSExpired` + // but errors might be raised. + // `NetworkInterfaceExceededTimeLimit` is raised in the case the process runs out of + // resources to either create or repurpose a network connection for this operation. + assert.commandFailedWithCode( + insertRes, + [ErrorCodes.MaxTimeMSExpired, ErrorCodes.NetworkInterfaceExceededTimeLimit], + tojson({insertCmd})); // Read with default read concern sees the committed transaction. assert.eq(doc(1), coll.findOne(doc(1))); diff --git a/jstests/sharding/max_time_ms_connection_pool.js b/jstests/sharding/max_time_ms_connection_pool.js new file mode 100644 index 00000000000..40d7832470e --- /dev/null +++ b/jstests/sharding/max_time_ms_connection_pool.js @@ -0,0 +1,44 @@ +/** + * + * Tests the rewrite of NetworkInterfaceExceededTimeLimit exception coming from + * `executor/connection_pool.cpp` into MaxTimeMSError when MaxTimeMS option is set for a given + * sharding command. + * + * @tags: [requires_fcv_50] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); + +const databaseName = "my-database"; +const collectionName = "my-collection"; + +function generateInsertCommand(maxTimeMS) { + return {insert: collectionName, documents: [{}], maxTimeMS: maxTimeMS}; +} + +const st = new ShardingTest({shards: 1, mongos: 1}); +const database = st.s0.getDB(databaseName); +const collection = database.getCollection(collectionName); +const session = database.getMongo().startSession({causalConsistency: false}); + +assert.commandWorked( + database.runCommand({create: collection.getName(), writeConcern: {w: "majority"}})); + +assert.commandWorked(database.runCommand(generateInsertCommand(1000))); + +const failpoint = + configureFailPoint(st.s, "forceExecutorConnectionPoolTimeout", {"timeout": 1000}, "alwaysOn"); + +assert.commandFailedWithCode(database.runCommand(generateInsertCommand(1)), + ErrorCodes.MaxTimeMSExpired); + +assert.commandFailedWithCode(database.runCommand(generateInsertCommand(30000)), + ErrorCodes.NetworkInterfaceExceededTimeLimit); + +failpoint.off(); + +st.stop(); +}()); diff --git a/src/mongo/db/query/max_time_ms_parser.cpp b/src/mongo/db/query/max_time_ms_parser.cpp index 054b2b76ead..38cf4a3d6d7 100644 --- a/src/mongo/db/query/max_time_ms_parser.cpp +++ b/src/mongo/db/query/max_time_ms_parser.cpp @@ -29,6 +29,8 @@ #include "mongo/db/query/max_time_ms_parser.h" +#include + #include "mongo/bson/bsonobjbuilder.h" #include "mongo/util/assert_util.h" @@ -45,13 +47,14 @@ StatusWith parseMaxTimeMS(BSONElement maxTimeMSElt) { const long long maxVal = maxTimeMSElt.fieldNameStringData() == kMaxTimeMSOpOnlyField ? (long long)(INT_MAX) + kMaxTimeMSOpOnlyMaxPadding : INT_MAX; - if (maxTimeMSLongLong < 0 || maxTimeMSLongLong > maxVal) { - return StatusWith(ErrorCodes::BadValue, - (StringBuilder() - << maxTimeMSLongLong << " value for " - << maxTimeMSElt.fieldNameStringData() << " is out of range") - .str()); - } + + using namespace fmt::literals; + + if (maxTimeMSLongLong < 0 || maxTimeMSLongLong > maxVal) + return Status(ErrorCodes::BadValue, + "{} value for {} is out of range [{}, {}]"_format( + maxTimeMSLongLong, maxTimeMSElt.fieldNameStringData(), 0, maxVal)); + double maxTimeMSDouble = maxTimeMSElt.numberDouble(); if (maxTimeMSElt.type() == mongo::NumberDouble && floor(maxTimeMSDouble) != maxTimeMSDouble) { return StatusWith( diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 96a8410fd9e..a8889ba6aaa 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -29,8 +29,6 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kConnectionPool -#include "mongo/platform/basic.h" - #include "mongo/executor/connection_pool.h" #include @@ -61,6 +59,8 @@ namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(forceExecutorConnectionPoolTimeout); + auto makeSeveritySuppressor() { return std::make_unique>( Seconds{1}, logv2::LogSeverity::Log(), logv2::LogSeverity::Debug(2)); @@ -273,7 +273,7 @@ public: * Gets a connection from the specific pool. Sinks a unique_lock from the * parent to preserve the lock on _mutex */ - Future getConnection(Milliseconds timeout); + Future getConnection(Milliseconds timeout, ErrorCodes::Error timeoutCode); /** * Triggers the shutdown procedure. This function sets isShutdown to true @@ -354,10 +354,15 @@ private: using OwnedConnection = std::shared_ptr; using OwnershipPool = stdx::unordered_map; using LRUOwnershipPool = LRUCache; - using Request = std::pair>; + struct Request { + Date_t expiration; + Promise promise; + ErrorCodes::Error timeoutCode; + }; + struct RequestComparator { - bool operator()(const Request& a, const Request& b) { - return a.first > b.first; + bool operator()(const Request& a, const Request& b) const { + return a.expiration > b.expiration; } }; @@ -541,19 +546,22 @@ void ConnectionPool::mutateTags( void ConnectionPool::get_forTest(const HostAndPort& hostAndPort, Milliseconds timeout, + ErrorCodes::Error timeoutCode, GetConnectionCallback cb) { // We kick ourselves onto the executor queue to prevent us from deadlocking with our own thread - auto getConnectionFunc = [this, hostAndPort, timeout, cb = std::move(cb)](Status&&) mutable { - get(hostAndPort, transport::kGlobalSSLMode, timeout) - .thenRunOn(_factory->getExecutor()) - .getAsync(std::move(cb)); - }; + auto getConnectionFunc = + [this, hostAndPort, timeout, timeoutCode, cb = std::move(cb)](Status&&) mutable { + get(hostAndPort, transport::kGlobalSSLMode, timeout, timeoutCode) + .thenRunOn(_factory->getExecutor()) + .getAsync(std::move(cb)); + }; _factory->getExecutor()->schedule(std::move(getConnectionFunc)); } SemiFuture ConnectionPool::get(const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode, - Milliseconds timeout) { + Milliseconds timeout, + ErrorCodes::Error timeoutCode) { stdx::lock_guard lk(_mutex); auto& pool = _pools[hostAndPort]; @@ -565,7 +573,7 @@ SemiFuture ConnectionPool::get(const HostAndPo invariant(pool); - auto connFuture = pool->getConnection(timeout); + auto connFuture = pool->getConnection(timeout, timeoutCode); pool->updateState(); return std::move(connFuture).semi(); @@ -643,12 +651,38 @@ size_t ConnectionPool::SpecificPool::requestsPending() const { } Future ConnectionPool::SpecificPool::getConnection( - Milliseconds timeout) { + Milliseconds timeout, ErrorCodes::Error timeoutCode) { // Reset our activity timestamp auto now = _parent->_factory->now(); _lastActiveTime = now; + auto pendingTimeout = _parent->_controller->pendingTimeout(); + if (timeout < Milliseconds(0) || timeout > pendingTimeout) { + timeout = pendingTimeout; + // If controller's pending timeout is closest, timeoutCode is rewritten to the internal time + // limit error + timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit; + } + + if (auto sfp = forceExecutorConnectionPoolTimeout.scoped(); MONGO_unlikely(sfp.isActive())) { + if (const Milliseconds failpointTimeout{sfp.getData()["timeout"].numberInt()}; + failpointTimeout > Milliseconds{0}) { + auto pf = makePromiseFuture(); + auto request = std::make_shared(); + request->expiration = now + failpointTimeout; + request->promise = std::move(pf.promise); + request->timeoutCode = timeoutCode; + auto timeoutTimer = _parent->_factory->makeTimer(); + timeoutTimer->setTimeout(failpointTimeout, [request, timeoutTimer]() mutable { + request->promise.setError(Status( + request->timeoutCode, + "Connection timed out due to forceExecutorConnectionPoolTimeout failpoint")); + }); + return std::move(pf.future); + } + } + // If we do not have requests, then we can fulfill immediately if (_requests.size() == 0) { auto conn = tryGetConnection(); @@ -663,10 +697,6 @@ Future ConnectionPool::SpecificPool::getConnec } } - auto pendingTimeout = _parent->_controller->pendingTimeout(); - if (timeout < Milliseconds(0) || timeout > pendingTimeout) { - timeout = pendingTimeout; - } LOGV2_DEBUG(22560, kDiagnosticLogLevel, "Requesting new connection to {hostAndPort} with timeout {timeout}", @@ -677,7 +707,7 @@ Future ConnectionPool::SpecificPool::getConnec const auto expiration = now + timeout; auto pf = makePromiseFuture(); - _requests.push_back(make_pair(expiration, std::move(pf.promise))); + _requests.push_back({expiration, std::move(pf.promise), timeoutCode}); std::push_heap(begin(_requests), end(_requests), RequestComparator{}); return std::move(pf.future); @@ -954,7 +984,7 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status) { } for (auto& request : _requests) { - request.second.setError(status); + request.promise.setError(status); } LOGV2_DEBUG(22573, @@ -983,7 +1013,7 @@ void ConnectionPool::SpecificPool::fulfillRequests() { } // Grab the request and callback - auto promise = std::move(_requests.front().second); + auto promise = std::move(_requests.front().promise); std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); _requests.pop_back(); @@ -1116,8 +1146,8 @@ void ConnectionPool::SpecificPool::updateEventTimer() { } // If a request would timeout before the next event, then it is the next event - if (_requests.size() && (_requests.front().first < nextEventTime)) { - nextEventTime = _requests.front().first; + if (_requests.size() && (_requests.front().expiration < nextEventTime)) { + nextEventTime = _requests.front().expiration; } // If our timer is already set to the next event, then we're done @@ -1137,12 +1167,12 @@ void ConnectionPool::SpecificPool::updateEventTimer() { _health.isFailed = false; - while (_requests.size() && (_requests.front().first <= now)) { + while (_requests.size() && (_requests.front().expiration <= now)) { std::pop_heap(begin(_requests), end(_requests), RequestComparator{}); auto& request = _requests.back(); - request.second.setError(Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, - "Couldn't get a connection within the time limit")); + request.promise.setError( + Status(request.timeoutCode, "Couldn't get a connection within the time limit")); _requests.pop_back(); // Since we've failed a request, we've interacted with external users diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 86fa370bd9a..60c4a09f127 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -252,11 +252,14 @@ public: const std::function& mutateFunc) override; - SemiFuture get(const HostAndPort& hostAndPort, - transport::ConnectSSLMode sslMode, - Milliseconds timeout); + SemiFuture get( + const HostAndPort& hostAndPort, + transport::ConnectSSLMode sslMode, + Milliseconds timeout, + ErrorCodes::Error timeoutCode = ErrorCodes::NetworkInterfaceExceededTimeLimit); void get_forTest(const HostAndPort& hostAndPort, Milliseconds timeout, + ErrorCodes::Error timeoutCode, GetConnectionCallback cb); void appendConnectionStats(ConnectionPoolStats* stats) const; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index b08f8d3900a..250c4af935f 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -27,8 +27,6 @@ * it in the license file. */ -#include "mongo/platform/basic.h" - #include "mongo/executor/connection_pool_test_fixture.h" #include @@ -104,6 +102,38 @@ protected: template void dropConnectionsTest(std::shared_ptr const& pool, Ptr t); + /** + * Helper for asserting the various connection pool time-out behaviours. + * + * Gets a connection from a new pool with a different timeout duration and code, + * asserting the connection times out with the appropriate code. + * + * The controller's refresh timeout is set to 250ms. + */ + void assertTimeoutHelper(Milliseconds hostTimeout, + ErrorCodes::Error expectedErrorCode, + ErrorCodes::Error timeoutCode) { + ConnectionPool::Options options; + options.refreshTimeout = Milliseconds{250}; + auto pool = makePool(options); + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + StatusWith connectionHandle{nullptr}; + pool->get_forTest(HostAndPort(), + hostTimeout, + timeoutCode, + [&](StatusWith swConn) { + connectionHandle = std::move(swConn); + }); + + PoolImpl::setNow(now + hostTimeout); + + ASSERT(!connectionHandle.isOK()); + ASSERT_EQ(connectionHandle.getStatus(), expectedErrorCode); + } + private: std::shared_ptr _executor = InlineQueuedCountingExecutor::make(); std::shared_ptr _pool; @@ -121,6 +151,7 @@ TEST_F(ConnectionPoolTest, SameConn) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -131,6 +162,7 @@ TEST_F(ConnectionPoolTest, SameConn) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -172,6 +204,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); connections.push_back(std::move(swConn.getValue())); @@ -200,6 +233,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); const auto id = verifyAndGetId(swConn); @@ -253,6 +287,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); original_ids.insert(verifyAndGetId(swConn)); @@ -285,6 +320,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); reacquired_ids.insert(verifyAndGetId(swConn)); @@ -325,6 +361,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn1Id = verifyAndGetId(swConn); swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error")); @@ -335,6 +372,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -358,6 +396,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort("localhost:30000"), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -368,6 +407,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort("localhost:30001"), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -390,6 +430,7 @@ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); conn1 = std::move(swConn.getValue()); @@ -400,6 +441,7 @@ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); conn2 = std::move(swConn.getValue()); @@ -413,30 +455,40 @@ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) { } /** - * Verify that timing out on setup works as expected (a bad status is - * returned). - * - * Note that the lack of pushSetup() calls delays the get. + * When the timeout duration comes from the parameter, and no timeout code is specified the + * connection timeout status should always be `NetworkInterfaceExceededTimeLimit` */ TEST_F(ConnectionPoolTest, TimeoutOnSetup) { - auto pool = makePool(); - - auto now = Date_t::now(); - - Milliseconds hostTimeout = Milliseconds(5000); - - PoolImpl::setNow(now); - - boost::optional> conn; - pool->get_forTest( - HostAndPort(), hostTimeout, [&](StatusWith swConn) { - conn = std::move(swConn); - }); + assertTimeoutHelper( + /* timeout duration */ Milliseconds{100}, + /* expected timeout code */ ErrorCodes::NetworkInterfaceExceededTimeLimit, + /* user-defined timeout code */ ErrorCodes::NetworkInterfaceExceededTimeLimit); +} - PoolImpl::setNow(now + hostTimeout); +/** + * When the timeout duration and timeout code come from the parameters, the connection timeout + * status should be the same as specified. + */ +TEST_F(ConnectionPoolTest, TimeoutOnSetupWithErrorCode) { + assertTimeoutHelper( + /* timeout duration */ Milliseconds{100}, + /* expected timeout code */ ErrorCodes::MaxTimeMSExpired, + /* user-defined timeout code */ ErrorCodes::MaxTimeMSExpired); +} - ASSERT(!conn->isOK()); - ASSERT_EQ(conn->getStatus(), ErrorCodes::NetworkInterfaceExceededTimeLimit); +/** + * When the timeout duration comes from controller, the connection timeout status should always + * be `NetworkInterfaceExceededTimeLimit`. + * + * This test verifies that for a duration longer than what is specified in the connection pool + * controller, the controller timeout duration takes precendence over what was requested and + * the returned status is always `NetworkInterfaceExceededTimeLimit` + */ +TEST_F(ConnectionPoolTest, ControllerTimeoutOnSetup) { + assertTimeoutHelper( + /* timeout duration */ Milliseconds{500}, + /* expected timeout code */ ErrorCodes::NetworkInterfaceExceededTimeLimit, + /* user-defined timeout code */ ErrorCodes::MaxTimeMSExpired); } /** @@ -467,6 +519,7 @@ TEST_F(ConnectionPoolTest, refreshHappens) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); doneWith(swConn.getValue()); @@ -505,6 +558,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -516,6 +570,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { // Make sure we still get the first one pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -532,6 +587,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(1000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(!swConn.isOK()); @@ -548,6 +604,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { // Make sure we can get a new connection pool->get_forTest(HostAndPort(), Milliseconds(1000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT_NE(verifyAndGetId(swConn), conn1Id); reachedB = true; @@ -571,6 +628,7 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) { pool->get_forTest(HostAndPort(), Milliseconds(2000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -580,6 +638,7 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) { pool->get_forTest(HostAndPort(), Milliseconds(1000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -618,6 +677,7 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { // the pool) pool->get_forTest(HostAndPort(), Milliseconds(3000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -625,6 +685,7 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { }); pool->get_forTest(HostAndPort(), Milliseconds(2000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -632,6 +693,7 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { }); pool->get_forTest(HostAndPort(), Milliseconds(1000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -675,6 +737,7 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) { // the pool) pool->get_forTest(HostAndPort(), Milliseconds(3000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -682,6 +745,7 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) { }); pool->get_forTest(HostAndPort(), Milliseconds(2000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -689,6 +753,7 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) { }); pool->get_forTest(HostAndPort(), Milliseconds(1000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -734,6 +799,7 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); doneWith(swConn.getValue()); @@ -750,6 +816,7 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) { // Try to get another connection pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); doneWith(swConn.getValue()); @@ -780,6 +847,7 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) { // Get us spun up to 3 connections in the pool pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); doneWith(swConn.getValue()); @@ -801,6 +869,7 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) { for (size_t i = 0; i < conns.size(); ++i) { pool->get_forTest(HostAndPort(), Milliseconds(static_cast(1000 + i)), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&conns, i](StatusWith swConn) { ASSERT(swConn.isOK()); conns[i] = std::move(swConn.getValue()); @@ -876,6 +945,7 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { // Grab one connection without returning it pool->get_forTest(HostAndPort(), Milliseconds(1000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -908,6 +978,7 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { // Two more get's without returns pool->get_forTest(HostAndPort(), Milliseconds(2000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -915,6 +986,7 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { }); pool->get_forTest(HostAndPort(), Milliseconds(3000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(swConn.isOK()); @@ -982,6 +1054,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { connId = verifyAndGetId(swConn); reachedA = true; @@ -999,6 +1072,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT_NE(connId, verifyAndGetId(swConn)); reachedB = true; @@ -1032,6 +1106,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { connId = verifyAndGetId(swConn); reachedA = true; @@ -1046,6 +1121,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { // Same connection pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT_EQ(connId, verifyAndGetId(swConn)); reachedB = true; @@ -1059,6 +1135,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { bool reachedB2 = false; pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT_EQ(connId, verifyAndGetId(swConn)); reachedB2 = true; @@ -1074,6 +1151,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT_NE(connId, verifyAndGetId(swConn)); reachedC = true; @@ -1107,6 +1185,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn1Id = verifyAndGetId(swConn); conn1 = std::move(swConn.getValue()); @@ -1118,6 +1197,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -1133,6 +1213,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { // conn 2 is still there pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT_EQ(conn2Id, verifyAndGetId(swConn)); reachedA = true; @@ -1153,6 +1234,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT_NE(conn1Id, verifyAndGetId(swConn)); ASSERT_NE(conn2Id, verifyAndGetId(swConn)); @@ -1182,6 +1264,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); @@ -1192,6 +1275,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { ConnectionPool::ConnectionHandle handle; pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT_EQ(verifyAndGetId(swConn), conn1Id); handle = std::move(swConn.getValue()); @@ -1205,6 +1289,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { // will fail. pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT(!swConn.isOK()); reachedA = true; @@ -1225,6 +1310,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { conn2Id = verifyAndGetId(swConn); ASSERT_NE(conn2Id, conn1Id); @@ -1247,6 +1333,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), + ErrorCodes::NetworkInterfaceExceededTimeLimit, [&](StatusWith swConn) { ASSERT_NE(verifyAndGetId(swConn), conn2Id); reachedB = true; @@ -1271,9 +1358,10 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) { boost::optional> conn1; pool->get_forTest( - HostAndPort(), Seconds(10), [&](StatusWith swConn) { - conn1 = std::move(swConn); - }); + HostAndPort(), + Seconds(10), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith swConn) { conn1 = std::move(swConn); }); // initially we haven't called our callback ASSERT(!conn1); @@ -1284,10 +1372,12 @@ TEST_F(ConnectionPoolTest, SetupTimeoutsDontTimeoutUnrelatedRequests) { ASSERT(!conn1); // Get conn2 (which should have an extra second before the timeout) - pool->get_forTest( - HostAndPort(), Seconds(10), [&](StatusWith swConn) { - ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress); - }); + pool->get_forTest(HostAndPort(), + Seconds(10), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith swConn) { + ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress); + }); PoolImpl::setNow(now + Seconds(2)); @@ -1313,11 +1403,13 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { // Successfully get a new connection size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest( - HostAndPort(), Seconds(1), [&](StatusWith swConn) { - conn1Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + pool->get_forTest(HostAndPort(), + Seconds(1), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith swConn) { + conn1Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); ASSERT(conn1Id); // Force it into refresh @@ -1325,9 +1417,10 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { boost::optional> conn1; pool->get_forTest( - HostAndPort(), Seconds(10), [&](StatusWith swConn) { - conn1 = std::move(swConn); - }); + HostAndPort(), + Seconds(10), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith swConn) { conn1 = std::move(swConn); }); // initially we haven't called our callback ASSERT(!conn1); @@ -1337,10 +1430,12 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { ASSERT(!conn1); // Get conn2 (which should have an extra second before the timeout) - pool->get_forTest( - HostAndPort(), Seconds(10), [&](StatusWith swConn) { - ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress); - }); + pool->get_forTest(HostAndPort(), + Seconds(10), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith swConn) { + ASSERT_EQ(swConn.getStatus(), ErrorCodes::ShutdownInProgress); + }); PoolImpl::setNow(now + Seconds(5)); @@ -1361,19 +1456,25 @@ void ConnectionPoolTest::dropConnectionsTest(std::shared_ptr con // Successfully get connections to two hosts ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(hap1, Seconds(1), [&](StatusWith swConn) { - doneWith(swConn.getValue()); - }); + pool->get_forTest( + hap1, + Seconds(1), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith swConn) { doneWith(swConn.getValue()); }); ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(hap2, Seconds(1), [&](StatusWith swConn) { - doneWith(swConn.getValue()); - }); + pool->get_forTest( + hap2, + Seconds(1), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith swConn) { doneWith(swConn.getValue()); }); ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(hap3, Seconds(1), [&](StatusWith swConn) { - doneWith(swConn.getValue()); - }); + pool->get_forTest( + hap3, + Seconds(1), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith swConn) { doneWith(swConn.getValue()); }); ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap1)); ASSERT_EQ(1ul, pool->getNumConnectionsPerHost(hap2)); @@ -1408,9 +1509,11 @@ void ConnectionPoolTest::dropConnectionsTest(std::shared_ptr con ASSERT_EQ(0ul, pool->getNumConnectionsPerHost(hap3)); ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(hap4, Seconds(1), [&](StatusWith swConn) { - doneWith(swConn.getValue()); - }); + pool->get_forTest( + hap4, + Seconds(1), + ErrorCodes::NetworkInterfaceExceededTimeLimit, + [&](StatusWith swConn) { doneWith(swConn.getValue()); }); // drop connections by hostAndPort t->dropConnections(hap1); diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 40cb04a7012..e3fa6b3bc15 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -544,7 +544,8 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa // Attempt to get a connection to every target host for (size_t idx = 0; idx < request.target.size(); ++idx) { - auto connFuture = _pool->get(request.target[idx], request.sslMode, request.timeout); + auto connFuture = + _pool->get(request.target[idx], request.sslMode, request.timeout, request.timeoutCode); // If connection future is ready or requests should be sent in order, send the request // immediately. -- cgit v1.2.1