diff options
author | Tyler Seip <Tyler.Seip@mongodb.com> | 2021-07-08 17:46:39 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-09-21 16:59:56 +0000 |
commit | ed2e91304a83771f20e3df874222984c61a3e884 (patch) | |
tree | ebcb62e3a3544414b127377485d87bd3021044d3 | |
parent | 130476fc56a711b489525112c855766bd2a1bb23 (diff) | |
download | mongo-ed2e91304a83771f20e3df874222984c61a3e884.tar.gz |
SERVER-55816: Clamp negative timeouts in connection pools' connections
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 12 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 709 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.cpp | 5 |
3 files changed, 473 insertions, 253 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 1fa7790a300..18e8cbc0d4c 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -1111,7 +1111,7 @@ void ConnectionPool::SpecificPool::updateEventTimer() { // If our expiration comes before our next event, then it is the next event if (_requests.empty() && _checkedOutPool.empty()) { _hostExpiration = _lastActiveTime + _parent->_controller->hostTimeout(); - if ((_hostExpiration > now) && (_hostExpiration < nextEventTime)) { + if (_hostExpiration < nextEventTime) { nextEventTime = _hostExpiration; } } @@ -1121,6 +1121,14 @@ void ConnectionPool::SpecificPool::updateEventTimer() { nextEventTime = _requests.front().first; } + // Clamp next event time to be either now or in the future. Next event time + // can be in the past anytime we wait a long time between invocations of + // updateState; in these cases, we want to set our event timer to expire + // immediately. + if (nextEventTime < now) { + nextEventTime = now; + } + // If our timer is already set to the next event, then we're done if (nextEventTime == _eventTimerExpiration) { return; @@ -1133,7 +1141,7 @@ void ConnectionPool::SpecificPool::updateEventTimer() { _eventTimer->cancelTimeout(); // Set our event timer to timeout requests, refresh the state, and potentially expire this pool - auto deferredStateUpdateFunc = guardCallback([this, timeout]() { + auto deferredStateUpdateFunc = guardCallback([this]() { auto now = _parent->_factory->now(); _health.isFailed = false; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 9ac648e0bad..79eaaaf0218 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -42,6 +42,7 @@ #include "mongo/executor/connection_pool.h" #include "mongo/stdx/future.h" +#include "mongo/unittest/thread_assertion_monitor.h" #include "mongo/unittest/unittest.h" #include "mongo/util/scopeguard.h" @@ -151,6 +152,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { // Obtain a set of connections constexpr size_t kSize = 100; std::vector<ConnectionPool::ConnectionHandle> connections; + std::vector<unittest::ThreadAssertionMonitor> monitors(kSize); // Ensure that no matter how we leave the test, we mark any // checked out connections as OK before implicity returning them @@ -173,10 +175,18 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { pool->get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - connections.push_back(std::move(swConn.getValue())); + monitors[i].exec([&]() { + ASSERT(swConn.isOK()); + connections.push_back(std::move(swConn.getValue())); + monitors[i].notifyDone(); + }); }); } + + for (auto& monitor : monitors) { + monitor.wait(); + } + ASSERT_EQ(connections.size(), kSize); // Shuffle them into a random order @@ -194,6 +204,9 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { } ASSERT_EQ(ids.size(), kSize); + // Replace the thread monitors with fresh ones. + monitors = std::vector<unittest::ThreadAssertionMonitor>(kSize); + // Re-obtain the connections. They should come back in the same order // as the IDs in the stack, since the pool returns them in MRU order. for (size_t i = 0; i != kSize; ++i) { @@ -201,13 +214,21 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { pool->get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - const auto id = verifyAndGetId(swConn); - connections.push_back(std::move(swConn.getValue())); - ASSERT_EQ(id, ids.top()); - ids.pop(); + monitors[i].exec([&]() { + ASSERT(swConn.isOK()); + const auto id = verifyAndGetId(swConn); + connections.push_back(std::move(swConn.getValue())); + ASSERT_EQ(id, ids.top()); + ids.pop(); + monitors[i].notifyDone(); + }); }); } + + for (auto& monitor : monitors) { + monitor.wait(); + } + ASSERT(ids.empty()); } @@ -227,6 +248,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { // Obtain a set of connections constexpr size_t kSize = 100; std::vector<ConnectionPool::ConnectionHandle> connections; + std::vector<unittest::ThreadAssertionMonitor> monitors(kSize); // Ensure that no matter how we leave the test, we mark any // checked out connections as OK before implicity returning them @@ -254,12 +276,19 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { pool->get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - original_ids.insert(verifyAndGetId(swConn)); - connections.push_back(std::move(swConn.getValue())); + monitors[i].exec([&]() { + ASSERT(swConn.isOK()); + original_ids.insert(verifyAndGetId(swConn)); + connections.push_back(std::move(swConn.getValue())); + monitors[i].notifyDone(); + }); }); } + for (auto& monitor : monitors) { + monitor.wait(); + } + ASSERT_EQ(original_ids.size(), kSize); ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize); @@ -280,18 +309,26 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize); // Re-obtain a quarter of the connections, and record their IDs in a set. + monitors = std::vector<unittest::ThreadAssertionMonitor>(kSize / 4); std::set<size_t> reacquired_ids; for (size_t i = 0; i < kSize / 4; ++i) { ConnectionImpl::pushSetup(Status::OK()); pool->get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - reacquired_ids.insert(verifyAndGetId(swConn)); - connections.push_back(std::move(swConn.getValue())); + monitors[i].exec([&]() { + ASSERT(swConn.isOK()); + reacquired_ids.insert(verifyAndGetId(swConn)); + connections.push_back(std::move(swConn.getValue())); + monitors[i].notifyDone(); + }); }); } + for (auto& monitor : monitors) { + monitor.wait(); + } + ASSERT_EQ(reacquired_ids.size(), kSize / 4); ASSERT(std::includes( original_ids.begin(), original_ids.end(), reacquired_ids.begin(), reacquired_ids.end())); @@ -388,22 +425,31 @@ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) { // Get the first connection, move it out rather than letting it return ConnectionPool::ConnectionHandle conn1; ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - conn1 = std::move(swConn.getValue()); - }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT(swConn.isOK()); + conn1 = std::move(swConn.getValue()); + }); + }); + }); // Get the second connection, move it out rather than letting it return ConnectionPool::ConnectionHandle conn2; ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - conn2 = std::move(swConn.getValue()); - }); + + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT(swConn.isOK()); + conn2 = std::move(swConn.getValue()); + }); + }); + }); // Verify that the two connections are different ASSERT_NE(conn1.get(), conn2.get()); @@ -465,12 +511,16 @@ TEST_F(ConnectionPoolTest, refreshHappens) { // Get a connection ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - doneWith(swConn.getValue()); - }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); + }); + }); // After 1 second, one refresh has occurred PoolImpl::setNow(now + Milliseconds(1000)); @@ -502,25 +552,33 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { size_t conn1Id = 0; // Grab a connection and verify it's good - ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); - - PoolImpl::setNow(now + Milliseconds(500)); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + ConnectionImpl::pushSetup(Status::OK()); + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + conn1Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); + }); + PoolImpl::setNow(now + Milliseconds(500)); + }); size_t conn2Id = 0; - // Make sure we still get the first one - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); - ASSERT_EQ(conn1Id, conn2Id); + + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + // Make sure we still get the first one + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + conn2Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); + }); + ASSERT_EQ(conn1Id, conn2Id); + }); // This should trigger a refresh, but not time it out. So now we have one // connection sitting in refresh. @@ -529,30 +587,37 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { // This will wait because we have a refreshing connection, so it'll wait to // see if that pans out. In this case, we'll get a failure on timeout. - ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(1000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(!swConn.isOK()); - - reachedA = true; - }); - ASSERT(!reachedA); - PoolImpl::setNow(now + Milliseconds(3000)); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + ConnectionImpl::pushSetup(Status::OK()); + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT(!swConn.isOK()); - // Let the refresh timeout - PoolImpl::setNow(now + Milliseconds(4000)); + reachedA = true; + }); + }); + ASSERT(!reachedA); + PoolImpl::setNow(now + Milliseconds(3000)); + // Let the refresh timeout + PoolImpl::setNow(now + Milliseconds(4000)); + }); bool reachedB = false; // Make sure we can get a new connection - pool->get_forTest(HostAndPort(), - Milliseconds(1000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(verifyAndGetId(swConn), conn1Id); - reachedB = true; - doneWith(swConn.getValue()); - }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT_NE(verifyAndGetId(swConn), conn1Id); + reachedB = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedA); ASSERT(reachedB); @@ -568,29 +633,36 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) { bool reachedB = false; ConnectionPool::ConnectionHandle conn; - + unittest::ThreadAssertionMonitor c1; pool->get_forTest(HostAndPort(), Milliseconds(2000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + c1.exec([&]() { + ASSERT(swConn.isOK()); - reachedA = true; - doneWith(swConn.getValue()); + reachedA = true; + doneWith(swConn.getValue()); + c1.notifyDone(); + }); }); - + unittest::ThreadAssertionMonitor c2; pool->get_forTest(HostAndPort(), Milliseconds(1000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + c2.exec([&]() { + ASSERT(swConn.isOK()); - reachedB = true; + reachedB = true; - conn = std::move(swConn.getValue()); + conn = std::move(swConn.getValue()); + c2.notifyDone(); + }); }); ConnectionImpl::pushSetup(Status::OK()); // Note thate we hit the 1 second request, but not the 2 second + c2.wait(); ASSERT(reachedB); ASSERT(!reachedA); @@ -598,6 +670,7 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) { // Now that we've returned the connection, we see the second has been // called + c1.wait(); ASSERT(reachedA); } @@ -616,30 +689,44 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { // Make 3 requests, each which keep their connection (don't return it to // the pool) + unittest::ThreadAssertionMonitor c3; pool->get_forTest(HostAndPort(), Milliseconds(3000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + c3.exec([&]() { + ASSERT(swConn.isOK()); - conn3 = std::move(swConn.getValue()); + conn3 = std::move(swConn.getValue()); + c3.notifyDone(); + }); }); + unittest::ThreadAssertionMonitor c2; pool->get_forTest(HostAndPort(), Milliseconds(2000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + c2.exec([&]() { + ASSERT(swConn.isOK()); - conn2 = std::move(swConn.getValue()); + conn2 = std::move(swConn.getValue()); + c2.notifyDone(); + }); }); + unittest::ThreadAssertionMonitor c1; pool->get_forTest(HostAndPort(), Milliseconds(1000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + c1.exec([&]() { + ASSERT(swConn.isOK()); - conn1 = std::move(swConn.getValue()); + conn1 = std::move(swConn.getValue()); + c1.notifyDone(); + }); }); ConnectionImpl::pushSetup(Status::OK()); + c1.wait(); ConnectionImpl::pushSetup(Status::OK()); + c2.wait(); ConnectionImpl::pushSetup(Status::OK()); // Note that only two have run @@ -652,6 +739,7 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) { doneWith(conn1); // Verify that it's the one that pops out for request 3 + c3.wait(); ASSERT_EQ(conn1Ptr, conn3.get()); doneWith(conn2); @@ -673,34 +761,49 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) { // Make 3 requests, each which keep their connection (don't return it to // the pool) + unittest::ThreadAssertionMonitor c3; pool->get_forTest(HostAndPort(), Milliseconds(3000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + c3.exec([&]() { + ASSERT(swConn.isOK()); - conn3 = std::move(swConn.getValue()); + conn3 = std::move(swConn.getValue()); + c3.notifyDone(); + }); }); + unittest::ThreadAssertionMonitor c2; pool->get_forTest(HostAndPort(), Milliseconds(2000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + c2.exec([&]() { + ASSERT(swConn.isOK()); - conn2 = std::move(swConn.getValue()); + conn2 = std::move(swConn.getValue()); + c2.notifyDone(); + }); }); + unittest::ThreadAssertionMonitor c1; pool->get_forTest(HostAndPort(), Milliseconds(1000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + c1.exec([&]() { + ASSERT(swConn.isOK()); - conn1 = std::move(swConn.getValue()); + conn1 = std::move(swConn.getValue()); + c1.notifyDone(); + }); }); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); ConnectionImpl::pushSetup(Status::OK()); + c1.wait(); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); ConnectionImpl::pushSetup(Status::OK()); + c2.wait(); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u); ConnectionImpl::pushSetup(Status::OK()); + c3.wait(); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); ASSERT(conn1); @@ -732,11 +835,15 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) { // Get a connection ConnectionImpl::pushSetup(Status::OK()); + unittest::ThreadAssertionMonitor c1; pool->get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - doneWith(swConn.getValue()); + c1.exec([&]() { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + c1.notifyDone(); + }); }); ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); @@ -748,19 +855,25 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) { bool reachedA = false; // Try to get another connection + unittest::ThreadAssertionMonitor c2; pool->get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - doneWith(swConn.getValue()); - reachedA = true; + c2.exec([&]() { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + reachedA = true; + c2.notifyDone(); + }); }); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); ASSERT(!reachedA); + c1.wait(); ConnectionImpl::pushRefresh(Status::OK()); ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); ASSERT(reachedA); + c2.wait(); } /** @@ -778,14 +891,18 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) { PoolImpl::setNow(now); // Get us spun up to 3 connections in the pool - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - doneWith(swConn.getValue()); - }); - ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); - ConnectionImpl::pushSetup(Status::OK()); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT(swConn.isOK()); + doneWith(swConn.getValue()); + }); + }); + ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ConnectionImpl::pushSetup(Status::OK()); + }); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); ConnectionImpl::pushSetup(Status::OK()); ConnectionImpl::pushSetup(Status::OK()); @@ -796,14 +913,18 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) { ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 3u); std::array<ConnectionPool::ConnectionHandle, 5> conns; + std::array<unittest::ThreadAssertionMonitor, 5> ams; // Start 5 new requests for (size_t i = 0; i < conns.size(); ++i) { pool->get_forTest(HostAndPort(), Milliseconds(static_cast<int>(1000 + i)), - [&conns, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - conns[i] = std::move(swConn.getValue()); + [&conns, &ams, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ams[i].exec([&]() { + ASSERT(swConn.isOK()); + conns[i] = std::move(swConn.getValue()); + ams[i].notifyDone(); + }); }); } @@ -825,28 +946,33 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) { ConnectionImpl::pushRefresh(Status::OK()); ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 2u); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + ams[0].wait(); firstNBound(1); // After two refresh, one enters the setup queue, one refreshed connection gets handed out ConnectionImpl::pushRefresh(Status::OK()); ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 1u); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u); + ams[1].wait(); firstNBound(2); // After three refresh, we're done refreshing. Two queued in setup ConnectionImpl::pushRefresh(Status::OK()); ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u); + ams[2].wait(); firstNBound(3); // now pushing setup gets us a new connection ConnectionImpl::pushSetup(Status::OK()); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u); + ams[3].wait(); firstNBound(4); // and we're done ConnectionImpl::pushSetup(Status::OK()); ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u); + ams[4].wait(); firstNBound(5); for (auto& conn : conns) { @@ -873,30 +999,34 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { ConnectionPool::ConnectionHandle conn2; ConnectionPool::ConnectionHandle conn3; - // Grab one connection without returning it - pool->get_forTest(HostAndPort(), - Milliseconds(1000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - - conn1 = std::move(swConn.getValue()); - }); - bool reachedA = false; bool reachedB = false; bool reachedC = false; - ConnectionImpl::pushSetup([&]() { - reachedA = true; - return Status::OK(); - }); - ConnectionImpl::pushSetup([&]() { - reachedB = true; - return Status::OK(); - }); - ConnectionImpl::pushSetup([&]() { - reachedC = true; - return Status::OK(); + // Grab one connection without returning it + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT(swConn.isOK()); + + conn1 = std::move(swConn.getValue()); + }); + }); + + ConnectionImpl::pushSetup([&]() { + reachedA = true; + return Status::OK(); + }); + ConnectionImpl::pushSetup([&]() { + reachedB = true; + return Status::OK(); + }); + ConnectionImpl::pushSetup([&]() { + reachedC = true; + return Status::OK(); + }); }); // Verify that two setups were invoked, even without two requests (the @@ -906,21 +1036,28 @@ TEST_F(ConnectionPoolTest, minPoolRespected) { ASSERT(!reachedC); // Two more get's without returns - pool->get_forTest(HostAndPort(), - Milliseconds(2000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); - - conn2 = std::move(swConn.getValue()); - }); - pool->get_forTest(HostAndPort(), - Milliseconds(3000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(swConn.isOK()); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(2000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT(swConn.isOK()); - conn3 = std::move(swConn.getValue()); - }); + conn2 = std::move(swConn.getValue()); + }); + }); + }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(3000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT(swConn.isOK()); + conn3 = std::move(swConn.getValue()); + }); + }); + }); ASSERT(conn2); ASSERT(conn3); @@ -980,14 +1117,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { bool reachedA = false; // Grab 1 connection and return it ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = verifyAndGetId(swConn); - reachedA = true; - doneWith(swConn.getValue()); - }); - + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + connId = verifyAndGetId(swConn); + reachedA = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedA); // Jump pass the hostTimeout @@ -997,14 +1137,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { // Verify that a new connection was spawned ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, verifyAndGetId(swConn)); - reachedB = true; - doneWith(swConn.getValue()); - }); - + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT_NE(connId, verifyAndGetId(swConn)); + reachedB = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedB); } @@ -1030,13 +1173,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { // Grab and return ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = verifyAndGetId(swConn); - reachedA = true; - doneWith(swConn.getValue()); - }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + connId = verifyAndGetId(swConn); + reachedA = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedA); // Jump almost up to the hostTimeout @@ -1044,26 +1191,34 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { bool reachedB = false; // Same connection - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, verifyAndGetId(swConn)); - reachedB = true; - doneWith(swConn.getValue()); - }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT_EQ(connId, verifyAndGetId(swConn)); + reachedB = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedB); // Now our timeout should be 1999 ms from 'now' instead of 1000 ms // if we do another 'get' we should still get the original connection PoolImpl::setNow(now + Milliseconds(1500)); bool reachedB2 = false; - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, verifyAndGetId(swConn)); - reachedB2 = true; - doneWith(swConn.getValue()); - }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT_EQ(connId, verifyAndGetId(swConn)); + reachedB2 = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedB2); // We should time out when we get to 'now' + 2500 ms @@ -1072,14 +1227,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { bool reachedC = false; // Different id ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, verifyAndGetId(swConn)); - reachedC = true; - doneWith(swConn.getValue()); - }); - + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT_NE(connId, verifyAndGetId(swConn)); + reachedC = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedC); } @@ -1105,24 +1263,30 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { // save 1 connection ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); - conn1 = std::move(swConn.getValue()); - }); - + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + conn1Id = verifyAndGetId(swConn); + conn1 = std::move(swConn.getValue()); + }); + }); + }); ASSERT(conn1Id); // return the second ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); - + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + conn2Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(conn2Id); // hostTimeout has passed @@ -1131,14 +1295,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { bool reachedA = false; // conn 2 is still there - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(conn2Id, verifyAndGetId(swConn)); - reachedA = true; - doneWith(swConn.getValue()); - }); - + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT_EQ(conn2Id, verifyAndGetId(swConn)); + reachedA = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedA); // return conn 1 @@ -1151,14 +1318,18 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { // make sure that this is a new id ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(conn1Id, verifyAndGetId(swConn)); - ASSERT_NE(conn2Id, verifyAndGetId(swConn)); - reachedB = true; - doneWith(swConn.getValue()); - }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT_NE(conn1Id, verifyAndGetId(swConn)); + ASSERT_NE(conn2Id, verifyAndGetId(swConn)); + reachedB = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedB); } @@ -1180,40 +1351,50 @@ TEST_F(ConnectionPoolTest, dropConnections) { // Grab the first connection id size_t conn1Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = verifyAndGetId(swConn); - doneWith(swConn.getValue()); - }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + conn1Id = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(conn1Id); // Grab it and this time keep it out of the pool ConnectionPool::ConnectionHandle handle; - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(verifyAndGetId(swConn), conn1Id); - handle = std::move(swConn.getValue()); - }); - + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT_EQ(verifyAndGetId(swConn), conn1Id); + handle = std::move(swConn.getValue()); + }); + }); + }); ASSERT(handle); bool reachedA = false; // Queue up a request. This won't fire until we drop connections, then it // will fail. - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT(!swConn.isOK()); - reachedA = true; - }); - ASSERT(!reachedA); - - // fails the previous get - pool->dropConnections(HostAndPort()); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT(!swConn.isOK()); + reachedA = true; + }); + }); + ASSERT(!reachedA); + // fails the previous get + pool->dropConnections(HostAndPort()); + }); ASSERT(reachedA); // return the connection @@ -1223,13 +1404,17 @@ TEST_F(ConnectionPoolTest, dropConnections) { // connection size_t conn2Id = 0; ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = verifyAndGetId(swConn); - ASSERT_NE(conn2Id, conn1Id); - doneWith(swConn.getValue()); - }); + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + conn2Id = verifyAndGetId(swConn); + ASSERT_NE(conn2Id, conn1Id); + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(conn2Id); // Push conn2 into refresh @@ -1245,14 +1430,17 @@ TEST_F(ConnectionPoolTest, dropConnections) { // being pending bool reachedB = false; ConnectionImpl::pushSetup(Status::OK()); - pool->get_forTest(HostAndPort(), - Milliseconds(5000), - [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(verifyAndGetId(swConn), conn2Id); - reachedB = true; - doneWith(swConn.getValue()); - }); - + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { + ASSERT_NE(verifyAndGetId(swConn), conn2Id); + reachedB = true; + doneWith(swConn.getValue()); + }); + }); + }); ASSERT(reachedB); } @@ -1518,6 +1706,29 @@ TEST_F(ConnectionPoolTest, AsyncGet) { } } +TEST_F(ConnectionPoolTest, NegativeTimeout) { + ConnectionPool::Options options; + options.maxConnections = 1; + auto pool = makePool(options); + + auto now = Date_t::now(); + PoolImpl::setNow(now); + + unittest::threadAssertionMonitoredTest([&](auto& monitor) { + pool->get_forTest(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + monitor.exec([&]() { ASSERT(!swConn.isOK()); }); + }); + // Advance our timer past the request timeout, so that the resultant internal + // timeout in updateEventTimer is negative, and make sure we don't trip any + // assertions. + PoolImpl::setNow(now + Milliseconds(2000)); + + ConnectionImpl::pushSetup(Status::OK()); + }); +} + TEST_F(ConnectionPoolTest, ReturnAfterShutdown) { auto pool = makePool(); diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index 09f2e6b958a..ba1c7c75c7c 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -27,10 +27,10 @@ * it in the license file. */ -#include "mongo/platform/basic.h" - #include "mongo/executor/connection_pool_test_fixture.h" +#include "mongo/util/assert_util.h" + #include <memory> @@ -46,6 +46,7 @@ TimerImpl::~TimerImpl() { void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) { _cb = std::move(cb); + invariant(timeout >= Milliseconds(0)); _expiration = _global->now() + timeout; _timers.emplace(this); |