summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTyler Seip <Tyler.Seip@mongodb.com>2021-07-08 17:46:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-21 16:59:56 +0000
commited2e91304a83771f20e3df874222984c61a3e884 (patch)
treeebcb62e3a3544414b127377485d87bd3021044d3
parent130476fc56a711b489525112c855766bd2a1bb23 (diff)
downloadmongo-ed2e91304a83771f20e3df874222984c61a3e884.tar.gz
SERVER-55816: Clamp negative timeouts in connection pools' connections
-rw-r--r--src/mongo/executor/connection_pool.cpp12
-rw-r--r--src/mongo/executor/connection_pool_test.cpp709
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp5
3 files changed, 473 insertions, 253 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 1fa7790a300..18e8cbc0d4c 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -1111,7 +1111,7 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
// If our expiration comes before our next event, then it is the next event
if (_requests.empty() && _checkedOutPool.empty()) {
_hostExpiration = _lastActiveTime + _parent->_controller->hostTimeout();
- if ((_hostExpiration > now) && (_hostExpiration < nextEventTime)) {
+ if (_hostExpiration < nextEventTime) {
nextEventTime = _hostExpiration;
}
}
@@ -1121,6 +1121,14 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
nextEventTime = _requests.front().first;
}
+ // Clamp next event time to be either now or in the future. Next event time
+ // can be in the past anytime we wait a long time between invocations of
+ // updateState; in these cases, we want to set our event timer to expire
+ // immediately.
+ if (nextEventTime < now) {
+ nextEventTime = now;
+ }
+
// If our timer is already set to the next event, then we're done
if (nextEventTime == _eventTimerExpiration) {
return;
@@ -1133,7 +1141,7 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
_eventTimer->cancelTimeout();
// Set our event timer to timeout requests, refresh the state, and potentially expire this pool
- auto deferredStateUpdateFunc = guardCallback([this, timeout]() {
+ auto deferredStateUpdateFunc = guardCallback([this]() {
auto now = _parent->_factory->now();
_health.isFailed = false;
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 9ac648e0bad..79eaaaf0218 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -42,6 +42,7 @@
#include "mongo/executor/connection_pool.h"
#include "mongo/stdx/future.h"
+#include "mongo/unittest/thread_assertion_monitor.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/scopeguard.h"
@@ -151,6 +152,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
// Obtain a set of connections
constexpr size_t kSize = 100;
std::vector<ConnectionPool::ConnectionHandle> connections;
+ std::vector<unittest::ThreadAssertionMonitor> monitors(kSize);
// Ensure that no matter how we leave the test, we mark any
// checked out connections as OK before implicity returning them
@@ -173,10 +175,18 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- connections.push_back(std::move(swConn.getValue()));
+ monitors[i].exec([&]() {
+ ASSERT(swConn.isOK());
+ connections.push_back(std::move(swConn.getValue()));
+ monitors[i].notifyDone();
+ });
});
}
+
+ for (auto& monitor : monitors) {
+ monitor.wait();
+ }
+
ASSERT_EQ(connections.size(), kSize);
// Shuffle them into a random order
@@ -194,6 +204,9 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
}
ASSERT_EQ(ids.size(), kSize);
+ // Replace the thread monitors with fresh ones.
+ monitors = std::vector<unittest::ThreadAssertionMonitor>(kSize);
+
// Re-obtain the connections. They should come back in the same order
// as the IDs in the stack, since the pool returns them in MRU order.
for (size_t i = 0; i != kSize; ++i) {
@@ -201,13 +214,21 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- const auto id = verifyAndGetId(swConn);
- connections.push_back(std::move(swConn.getValue()));
- ASSERT_EQ(id, ids.top());
- ids.pop();
+ monitors[i].exec([&]() {
+ ASSERT(swConn.isOK());
+ const auto id = verifyAndGetId(swConn);
+ connections.push_back(std::move(swConn.getValue()));
+ ASSERT_EQ(id, ids.top());
+ ids.pop();
+ monitors[i].notifyDone();
+ });
});
}
+
+ for (auto& monitor : monitors) {
+ monitor.wait();
+ }
+
ASSERT(ids.empty());
}
@@ -227,6 +248,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
// Obtain a set of connections
constexpr size_t kSize = 100;
std::vector<ConnectionPool::ConnectionHandle> connections;
+ std::vector<unittest::ThreadAssertionMonitor> monitors(kSize);
// Ensure that no matter how we leave the test, we mark any
// checked out connections as OK before implicity returning them
@@ -254,12 +276,19 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- original_ids.insert(verifyAndGetId(swConn));
- connections.push_back(std::move(swConn.getValue()));
+ monitors[i].exec([&]() {
+ ASSERT(swConn.isOK());
+ original_ids.insert(verifyAndGetId(swConn));
+ connections.push_back(std::move(swConn.getValue()));
+ monitors[i].notifyDone();
+ });
});
}
+ for (auto& monitor : monitors) {
+ monitor.wait();
+ }
+
ASSERT_EQ(original_ids.size(), kSize);
ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize);
@@ -280,18 +309,26 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
ASSERT_EQ(pool->getNumConnectionsPerHost(HostAndPort()), kSize);
// Re-obtain a quarter of the connections, and record their IDs in a set.
+ monitors = std::vector<unittest::ThreadAssertionMonitor>(kSize / 4);
std::set<size_t> reacquired_ids;
for (size_t i = 0; i < kSize / 4; ++i) {
ConnectionImpl::pushSetup(Status::OK());
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- reacquired_ids.insert(verifyAndGetId(swConn));
- connections.push_back(std::move(swConn.getValue()));
+ monitors[i].exec([&]() {
+ ASSERT(swConn.isOK());
+ reacquired_ids.insert(verifyAndGetId(swConn));
+ connections.push_back(std::move(swConn.getValue()));
+ monitors[i].notifyDone();
+ });
});
}
+ for (auto& monitor : monitors) {
+ monitor.wait();
+ }
+
ASSERT_EQ(reacquired_ids.size(), kSize / 4);
ASSERT(std::includes(
original_ids.begin(), original_ids.end(), reacquired_ids.begin(), reacquired_ids.end()));
@@ -388,22 +425,31 @@ TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) {
// Get the first connection, move it out rather than letting it return
ConnectionPool::ConnectionHandle conn1;
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- conn1 = std::move(swConn.getValue());
- });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT(swConn.isOK());
+ conn1 = std::move(swConn.getValue());
+ });
+ });
+ });
// Get the second connection, move it out rather than letting it return
ConnectionPool::ConnectionHandle conn2;
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- conn2 = std::move(swConn.getValue());
- });
+
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT(swConn.isOK());
+ conn2 = std::move(swConn.getValue());
+ });
+ });
+ });
// Verify that the two connections are different
ASSERT_NE(conn1.get(), conn2.get());
@@ -465,12 +511,16 @@ TEST_F(ConnectionPoolTest, refreshHappens) {
// Get a connection
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- doneWith(swConn.getValue());
- });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ });
+ });
+ });
// After 1 second, one refresh has occurred
PoolImpl::setNow(now + Milliseconds(1000));
@@ -502,25 +552,33 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
size_t conn1Id = 0;
// Grab a connection and verify it's good
- ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
-
- PoolImpl::setNow(now + Milliseconds(500));
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ ConnectionImpl::pushSetup(Status::OK());
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ conn1Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
+ });
+ PoolImpl::setNow(now + Milliseconds(500));
+ });
size_t conn2Id = 0;
- // Make sure we still get the first one
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
- ASSERT_EQ(conn1Id, conn2Id);
+
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ // Make sure we still get the first one
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ conn2Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
+ });
+ ASSERT_EQ(conn1Id, conn2Id);
+ });
// This should trigger a refresh, but not time it out. So now we have one
// connection sitting in refresh.
@@ -529,30 +587,37 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
// This will wait because we have a refreshing connection, so it'll wait to
// see if that pans out. In this case, we'll get a failure on timeout.
- ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(1000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(!swConn.isOK());
-
- reachedA = true;
- });
- ASSERT(!reachedA);
- PoolImpl::setNow(now + Milliseconds(3000));
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ ConnectionImpl::pushSetup(Status::OK());
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT(!swConn.isOK());
- // Let the refresh timeout
- PoolImpl::setNow(now + Milliseconds(4000));
+ reachedA = true;
+ });
+ });
+ ASSERT(!reachedA);
+ PoolImpl::setNow(now + Milliseconds(3000));
+ // Let the refresh timeout
+ PoolImpl::setNow(now + Milliseconds(4000));
+ });
bool reachedB = false;
// Make sure we can get a new connection
- pool->get_forTest(HostAndPort(),
- Milliseconds(1000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(verifyAndGetId(swConn), conn1Id);
- reachedB = true;
- doneWith(swConn.getValue());
- });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT_NE(verifyAndGetId(swConn), conn1Id);
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedA);
ASSERT(reachedB);
@@ -568,29 +633,36 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) {
bool reachedB = false;
ConnectionPool::ConnectionHandle conn;
-
+ unittest::ThreadAssertionMonitor c1;
pool->get_forTest(HostAndPort(),
Milliseconds(2000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ c1.exec([&]() {
+ ASSERT(swConn.isOK());
- reachedA = true;
- doneWith(swConn.getValue());
+ reachedA = true;
+ doneWith(swConn.getValue());
+ c1.notifyDone();
+ });
});
-
+ unittest::ThreadAssertionMonitor c2;
pool->get_forTest(HostAndPort(),
Milliseconds(1000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ c2.exec([&]() {
+ ASSERT(swConn.isOK());
- reachedB = true;
+ reachedB = true;
- conn = std::move(swConn.getValue());
+ conn = std::move(swConn.getValue());
+ c2.notifyDone();
+ });
});
ConnectionImpl::pushSetup(Status::OK());
// Note thate we hit the 1 second request, but not the 2 second
+ c2.wait();
ASSERT(reachedB);
ASSERT(!reachedA);
@@ -598,6 +670,7 @@ TEST_F(ConnectionPoolTest, requestsServedByUrgency) {
// Now that we've returned the connection, we see the second has been
// called
+ c1.wait();
ASSERT(reachedA);
}
@@ -616,30 +689,44 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) {
// Make 3 requests, each which keep their connection (don't return it to
// the pool)
+ unittest::ThreadAssertionMonitor c3;
pool->get_forTest(HostAndPort(),
Milliseconds(3000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ c3.exec([&]() {
+ ASSERT(swConn.isOK());
- conn3 = std::move(swConn.getValue());
+ conn3 = std::move(swConn.getValue());
+ c3.notifyDone();
+ });
});
+ unittest::ThreadAssertionMonitor c2;
pool->get_forTest(HostAndPort(),
Milliseconds(2000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ c2.exec([&]() {
+ ASSERT(swConn.isOK());
- conn2 = std::move(swConn.getValue());
+ conn2 = std::move(swConn.getValue());
+ c2.notifyDone();
+ });
});
+ unittest::ThreadAssertionMonitor c1;
pool->get_forTest(HostAndPort(),
Milliseconds(1000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ c1.exec([&]() {
+ ASSERT(swConn.isOK());
- conn1 = std::move(swConn.getValue());
+ conn1 = std::move(swConn.getValue());
+ c1.notifyDone();
+ });
});
ConnectionImpl::pushSetup(Status::OK());
+ c1.wait();
ConnectionImpl::pushSetup(Status::OK());
+ c2.wait();
ConnectionImpl::pushSetup(Status::OK());
// Note that only two have run
@@ -652,6 +739,7 @@ TEST_F(ConnectionPoolTest, maxPoolRespected) {
doneWith(conn1);
// Verify that it's the one that pops out for request 3
+ c3.wait();
ASSERT_EQ(conn1Ptr, conn3.get());
doneWith(conn2);
@@ -673,34 +761,49 @@ TEST_F(ConnectionPoolTest, maxConnectingRespected) {
// Make 3 requests, each which keep their connection (don't return it to
// the pool)
+ unittest::ThreadAssertionMonitor c3;
pool->get_forTest(HostAndPort(),
Milliseconds(3000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ c3.exec([&]() {
+ ASSERT(swConn.isOK());
- conn3 = std::move(swConn.getValue());
+ conn3 = std::move(swConn.getValue());
+ c3.notifyDone();
+ });
});
+ unittest::ThreadAssertionMonitor c2;
pool->get_forTest(HostAndPort(),
Milliseconds(2000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ c2.exec([&]() {
+ ASSERT(swConn.isOK());
- conn2 = std::move(swConn.getValue());
+ conn2 = std::move(swConn.getValue());
+ c2.notifyDone();
+ });
});
+ unittest::ThreadAssertionMonitor c1;
pool->get_forTest(HostAndPort(),
Milliseconds(1000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ c1.exec([&]() {
+ ASSERT(swConn.isOK());
- conn1 = std::move(swConn.getValue());
+ conn1 = std::move(swConn.getValue());
+ c1.notifyDone();
+ });
});
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
ConnectionImpl::pushSetup(Status::OK());
+ c1.wait();
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
ConnectionImpl::pushSetup(Status::OK());
+ c2.wait();
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u);
ConnectionImpl::pushSetup(Status::OK());
+ c3.wait();
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
ASSERT(conn1);
@@ -732,11 +835,15 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) {
// Get a connection
ConnectionImpl::pushSetup(Status::OK());
+ unittest::ThreadAssertionMonitor c1;
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- doneWith(swConn.getValue());
+ c1.exec([&]() {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ c1.notifyDone();
+ });
});
ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u);
@@ -748,19 +855,25 @@ TEST_F(ConnectionPoolTest, maxConnectingWithRefresh) {
bool reachedA = false;
// Try to get another connection
+ unittest::ThreadAssertionMonitor c2;
pool->get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- doneWith(swConn.getValue());
- reachedA = true;
+ c2.exec([&]() {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ reachedA = true;
+ c2.notifyDone();
+ });
});
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
ASSERT(!reachedA);
+ c1.wait();
ConnectionImpl::pushRefresh(Status::OK());
ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u);
ASSERT(reachedA);
+ c2.wait();
}
/**
@@ -778,14 +891,18 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) {
PoolImpl::setNow(now);
// Get us spun up to 3 connections in the pool
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- doneWith(swConn.getValue());
- });
- ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
- ConnectionImpl::pushSetup(Status::OK());
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT(swConn.isOK());
+ doneWith(swConn.getValue());
+ });
+ });
+ ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
+ ConnectionImpl::pushSetup(Status::OK());
+ });
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
ConnectionImpl::pushSetup(Status::OK());
ConnectionImpl::pushSetup(Status::OK());
@@ -796,14 +913,18 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) {
ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 3u);
std::array<ConnectionPool::ConnectionHandle, 5> conns;
+ std::array<unittest::ThreadAssertionMonitor, 5> ams;
// Start 5 new requests
for (size_t i = 0; i < conns.size(); ++i) {
pool->get_forTest(HostAndPort(),
Milliseconds(static_cast<int>(1000 + i)),
- [&conns, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
- conns[i] = std::move(swConn.getValue());
+ [&conns, &ams, i](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ams[i].exec([&]() {
+ ASSERT(swConn.isOK());
+ conns[i] = std::move(swConn.getValue());
+ ams[i].notifyDone();
+ });
});
}
@@ -825,28 +946,33 @@ TEST_F(ConnectionPoolTest, maxConnectingWithMultipleRefresh) {
ConnectionImpl::pushRefresh(Status::OK());
ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 2u);
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
+ ams[0].wait();
firstNBound(1);
// After two refresh, one enters the setup queue, one refreshed connection gets handed out
ConnectionImpl::pushRefresh(Status::OK());
ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 1u);
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u);
+ ams[1].wait();
firstNBound(2);
// After three refresh, we're done refreshing. Two queued in setup
ConnectionImpl::pushRefresh(Status::OK());
ASSERT_EQ(ConnectionImpl::refreshQueueDepth(), 0u);
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 2u);
+ ams[2].wait();
firstNBound(3);
// now pushing setup gets us a new connection
ConnectionImpl::pushSetup(Status::OK());
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 1u);
+ ams[3].wait();
firstNBound(4);
// and we're done
ConnectionImpl::pushSetup(Status::OK());
ASSERT_EQ(ConnectionImpl::setupQueueDepth(), 0u);
+ ams[4].wait();
firstNBound(5);
for (auto& conn : conns) {
@@ -873,30 +999,34 @@ TEST_F(ConnectionPoolTest, minPoolRespected) {
ConnectionPool::ConnectionHandle conn2;
ConnectionPool::ConnectionHandle conn3;
- // Grab one connection without returning it
- pool->get_forTest(HostAndPort(),
- Milliseconds(1000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
-
- conn1 = std::move(swConn.getValue());
- });
-
bool reachedA = false;
bool reachedB = false;
bool reachedC = false;
- ConnectionImpl::pushSetup([&]() {
- reachedA = true;
- return Status::OK();
- });
- ConnectionImpl::pushSetup([&]() {
- reachedB = true;
- return Status::OK();
- });
- ConnectionImpl::pushSetup([&]() {
- reachedC = true;
- return Status::OK();
+ // Grab one connection without returning it
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT(swConn.isOK());
+
+ conn1 = std::move(swConn.getValue());
+ });
+ });
+
+ ConnectionImpl::pushSetup([&]() {
+ reachedA = true;
+ return Status::OK();
+ });
+ ConnectionImpl::pushSetup([&]() {
+ reachedB = true;
+ return Status::OK();
+ });
+ ConnectionImpl::pushSetup([&]() {
+ reachedC = true;
+ return Status::OK();
+ });
});
// Verify that two setups were invoked, even without two requests (the
@@ -906,21 +1036,28 @@ TEST_F(ConnectionPoolTest, minPoolRespected) {
ASSERT(!reachedC);
// Two more get's without returns
- pool->get_forTest(HostAndPort(),
- Milliseconds(2000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
-
- conn2 = std::move(swConn.getValue());
- });
- pool->get_forTest(HostAndPort(),
- Milliseconds(3000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(swConn.isOK());
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(2000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT(swConn.isOK());
- conn3 = std::move(swConn.getValue());
- });
+ conn2 = std::move(swConn.getValue());
+ });
+ });
+ });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(3000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT(swConn.isOK());
+ conn3 = std::move(swConn.getValue());
+ });
+ });
+ });
ASSERT(conn2);
ASSERT(conn3);
@@ -980,14 +1117,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
bool reachedA = false;
// Grab 1 connection and return it
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = verifyAndGetId(swConn);
- reachedA = true;
- doneWith(swConn.getValue());
- });
-
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ connId = verifyAndGetId(swConn);
+ reachedA = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedA);
// Jump pass the hostTimeout
@@ -997,14 +1137,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
// Verify that a new connection was spawned
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, verifyAndGetId(swConn));
- reachedB = true;
- doneWith(swConn.getValue());
- });
-
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT_NE(connId, verifyAndGetId(swConn));
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedB);
}
@@ -1030,13 +1173,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
// Grab and return
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = verifyAndGetId(swConn);
- reachedA = true;
- doneWith(swConn.getValue());
- });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ connId = verifyAndGetId(swConn);
+ reachedA = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedA);
// Jump almost up to the hostTimeout
@@ -1044,26 +1191,34 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
bool reachedB = false;
// Same connection
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, verifyAndGetId(swConn));
- reachedB = true;
- doneWith(swConn.getValue());
- });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT_EQ(connId, verifyAndGetId(swConn));
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedB);
// Now our timeout should be 1999 ms from 'now' instead of 1000 ms
// if we do another 'get' we should still get the original connection
PoolImpl::setNow(now + Milliseconds(1500));
bool reachedB2 = false;
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, verifyAndGetId(swConn));
- reachedB2 = true;
- doneWith(swConn.getValue());
- });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT_EQ(connId, verifyAndGetId(swConn));
+ reachedB2 = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedB2);
// We should time out when we get to 'now' + 2500 ms
@@ -1072,14 +1227,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
bool reachedC = false;
// Different id
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, verifyAndGetId(swConn));
- reachedC = true;
- doneWith(swConn.getValue());
- });
-
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT_NE(connId, verifyAndGetId(swConn));
+ reachedC = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedC);
}
@@ -1105,24 +1263,30 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
// save 1 connection
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
- conn1 = std::move(swConn.getValue());
- });
-
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ conn1Id = verifyAndGetId(swConn);
+ conn1 = std::move(swConn.getValue());
+ });
+ });
+ });
ASSERT(conn1Id);
// return the second
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
-
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ conn2Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(conn2Id);
// hostTimeout has passed
@@ -1131,14 +1295,17 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
bool reachedA = false;
// conn 2 is still there
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(conn2Id, verifyAndGetId(swConn));
- reachedA = true;
- doneWith(swConn.getValue());
- });
-
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT_EQ(conn2Id, verifyAndGetId(swConn));
+ reachedA = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedA);
// return conn 1
@@ -1151,14 +1318,18 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
// make sure that this is a new id
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(conn1Id, verifyAndGetId(swConn));
- ASSERT_NE(conn2Id, verifyAndGetId(swConn));
- reachedB = true;
- doneWith(swConn.getValue());
- });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT_NE(conn1Id, verifyAndGetId(swConn));
+ ASSERT_NE(conn2Id, verifyAndGetId(swConn));
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedB);
}
@@ -1180,40 +1351,50 @@ TEST_F(ConnectionPoolTest, dropConnections) {
// Grab the first connection id
size_t conn1Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
- });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ conn1Id = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(conn1Id);
// Grab it and this time keep it out of the pool
ConnectionPool::ConnectionHandle handle;
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(verifyAndGetId(swConn), conn1Id);
- handle = std::move(swConn.getValue());
- });
-
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT_EQ(verifyAndGetId(swConn), conn1Id);
+ handle = std::move(swConn.getValue());
+ });
+ });
+ });
ASSERT(handle);
bool reachedA = false;
// Queue up a request. This won't fire until we drop connections, then it
// will fail.
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT(!swConn.isOK());
- reachedA = true;
- });
- ASSERT(!reachedA);
-
- // fails the previous get
- pool->dropConnections(HostAndPort());
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT(!swConn.isOK());
+ reachedA = true;
+ });
+ });
+ ASSERT(!reachedA);
+ // fails the previous get
+ pool->dropConnections(HostAndPort());
+ });
ASSERT(reachedA);
// return the connection
@@ -1223,13 +1404,17 @@ TEST_F(ConnectionPoolTest, dropConnections) {
// connection
size_t conn2Id = 0;
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
- ASSERT_NE(conn2Id, conn1Id);
- doneWith(swConn.getValue());
- });
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ conn2Id = verifyAndGetId(swConn);
+ ASSERT_NE(conn2Id, conn1Id);
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(conn2Id);
// Push conn2 into refresh
@@ -1245,14 +1430,17 @@ TEST_F(ConnectionPoolTest, dropConnections) {
// being pending
bool reachedB = false;
ConnectionImpl::pushSetup(Status::OK());
- pool->get_forTest(HostAndPort(),
- Milliseconds(5000),
- [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(verifyAndGetId(swConn), conn2Id);
- reachedB = true;
- doneWith(swConn.getValue());
- });
-
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() {
+ ASSERT_NE(verifyAndGetId(swConn), conn2Id);
+ reachedB = true;
+ doneWith(swConn.getValue());
+ });
+ });
+ });
ASSERT(reachedB);
}
@@ -1518,6 +1706,29 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
}
}
+TEST_F(ConnectionPoolTest, NegativeTimeout) {
+ ConnectionPool::Options options;
+ options.maxConnections = 1;
+ auto pool = makePool(options);
+
+ auto now = Date_t::now();
+ PoolImpl::setNow(now);
+
+ unittest::threadAssertionMonitoredTest([&](auto& monitor) {
+ pool->get_forTest(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ monitor.exec([&]() { ASSERT(!swConn.isOK()); });
+ });
+ // Advance our timer past the request timeout, so that the resultant internal
+ // timeout in updateEventTimer is negative, and make sure we don't trip any
+ // assertions.
+ PoolImpl::setNow(now + Milliseconds(2000));
+
+ ConnectionImpl::pushSetup(Status::OK());
+ });
+}
+
TEST_F(ConnectionPoolTest, ReturnAfterShutdown) {
auto pool = makePool();
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
index 09f2e6b958a..ba1c7c75c7c 100644
--- a/src/mongo/executor/connection_pool_test_fixture.cpp
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -27,10 +27,10 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
-
#include "mongo/executor/connection_pool_test_fixture.h"
+#include "mongo/util/assert_util.h"
+
#include <memory>
@@ -46,6 +46,7 @@ TimerImpl::~TimerImpl() {
void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
_cb = std::move(cb);
+ invariant(timeout >= Milliseconds(0));
_expiration = _global->now() + timeout;
_timers.emplace(this);