summaryrefslogtreecommitdiff
path: root/src/mongo/executor/connection_pool_test.cpp
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-03-05 13:35:34 -0500
committerBen Caimano <ben.caimano@10gen.com>2019-04-02 18:23:30 -0400
commitf4b07839c904a73d28e3994af5bc902004fd4f9d (patch)
treece8b79ce9aa9088e23333c8dfb13b32ed5bd08a6 /src/mongo/executor/connection_pool_test.cpp
parent04ea1d46eb6c4c78e19409f120ae2e61f2a35204 (diff)
downloadmongo-f4b07839c904a73d28e3994af5bc902004fd4f9d.tar.gz
SERVER-39814 Add OutOfLineExecutor to ConnectionPool factory interface
Diffstat (limited to 'src/mongo/executor/connection_pool_test.cpp')
-rw-r--r--src/mongo/executor/connection_pool_test.cpp174
1 files changed, 97 insertions, 77 deletions
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 0e689456afa..b36cf8cf749 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -32,6 +32,7 @@
#include <algorithm>
#include <random>
#include <stack>
+#include <tuple>
#include "mongo/executor/connection_pool_test_fixture.h"
@@ -58,15 +59,17 @@ protected:
private:
};
-void doneWith(const ConnectionPool::ConnectionHandle& swConn) {
- static_cast<ConnectionImpl*>(swConn.get())->indicateSuccess();
+void doneWith(const ConnectionPool::ConnectionHandle& conn) {
+ dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess();
}
-#define CONN2ID(swConn) \
- [](StatusWith<ConnectionPool::ConnectionHandle>& swConn) { \
- ASSERT(swConn.isOK()); \
- return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \
- }(swConn)
+using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>;
+
+auto verifyAndGetId(StatusWithConn& swConn) {
+ ASSERT(swConn.isOK());
+ auto& conn = swConn.getValue();
+ return dynamic_cast<ConnectionImpl*>(conn.get())->id();
+}
/**
* Verify that we get the same connection if we grab one, return it and grab
@@ -81,7 +84,7 @@ TEST_F(ConnectionPoolTest, SameConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -91,7 +94,7 @@ TEST_F(ConnectionPoolTest, SameConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -159,7 +162,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
- const auto id = CONN2ID(swConn);
+ const auto id = verifyAndGetId(swConn);
connections.push_back(std::move(swConn.getValue()));
ASSERT(id == ids.top());
ids.pop();
@@ -211,7 +214,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
- original_ids.insert(CONN2ID(swConn));
+ original_ids.insert(verifyAndGetId(swConn));
connections.push_back(std::move(swConn.getValue()));
});
}
@@ -243,7 +246,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
- reacquired_ids.insert(CONN2ID(swConn));
+ reacquired_ids.insert(verifyAndGetId(swConn));
connections.push_back(std::move(swConn.getValue()));
});
}
@@ -282,7 +285,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error"));
});
@@ -292,7 +295,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -315,7 +318,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
pool.get_forTest(HostAndPort("localhost:30000"),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -325,7 +328,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
pool.get_forTest(HostAndPort("localhost:30001"),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -460,7 +463,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -471,7 +474,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
ASSERT_EQ(conn1Id, conn2Id);
@@ -503,7 +506,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(1000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(CONN2ID(swConn), conn1Id);
+ ASSERT_NE(verifyAndGetId(swConn), conn1Id);
reachedB = true;
doneWith(swConn.getValue());
});
@@ -942,7 +945,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = CONN2ID(swConn);
+ connId = verifyAndGetId(swConn);
reachedA = true;
doneWith(swConn.getValue());
});
@@ -959,7 +962,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, CONN2ID(swConn));
+ ASSERT_NE(connId, verifyAndGetId(swConn));
reachedB = true;
doneWith(swConn.getValue());
});
@@ -992,7 +995,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = CONN2ID(swConn);
+ connId = verifyAndGetId(swConn);
reachedA = true;
doneWith(swConn.getValue());
});
@@ -1006,7 +1009,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, CONN2ID(swConn));
+ ASSERT_EQ(connId, verifyAndGetId(swConn));
reachedB = true;
doneWith(swConn.getValue());
});
@@ -1019,7 +1022,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, CONN2ID(swConn));
+ ASSERT_EQ(connId, verifyAndGetId(swConn));
reachedB2 = true;
doneWith(swConn.getValue());
});
@@ -1034,7 +1037,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, CONN2ID(swConn));
+ ASSERT_NE(connId, verifyAndGetId(swConn));
reachedC = true;
doneWith(swConn.getValue());
});
@@ -1067,7 +1070,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
conn1 = std::move(swConn.getValue());
});
@@ -1078,7 +1081,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -1093,7 +1096,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(conn2Id, CONN2ID(swConn));
+ ASSERT_EQ(conn2Id, verifyAndGetId(swConn));
reachedA = true;
doneWith(swConn.getValue());
});
@@ -1114,8 +1117,8 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(conn1Id, CONN2ID(swConn));
- ASSERT_NE(conn2Id, CONN2ID(swConn));
+ ASSERT_NE(conn1Id, verifyAndGetId(swConn));
+ ASSERT_NE(conn2Id, verifyAndGetId(swConn));
reachedB = true;
doneWith(swConn.getValue());
});
@@ -1143,7 +1146,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
ASSERT(conn1Id);
@@ -1153,7 +1156,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(CONN2ID(swConn), conn1Id);
+ ASSERT_EQ(verifyAndGetId(swConn), conn1Id);
handle = std::move(swConn.getValue());
});
@@ -1187,7 +1190,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
ASSERT_NE(conn2Id, conn1Id);
doneWith(swConn.getValue());
});
@@ -1209,7 +1212,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(CONN2ID(swConn), conn2Id);
+ ASSERT_NE(verifyAndGetId(swConn), conn2Id);
reachedB = true;
doneWith(swConn.getValue());
});
@@ -1276,7 +1279,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
ConnectionImpl::pushSetup(Status::OK());
pool.get_forTest(
HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
ASSERT(conn1Id);
@@ -1398,7 +1401,7 @@ TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) {
dropConnectionsTest(pool, manager);
}
-TEST_F(ConnectionPoolTest, TryGetWorks) {
+TEST_F(ConnectionPoolTest, AsyncGet) {
ConnectionPool::Options options;
options.maxConnections = 1;
ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
@@ -1406,60 +1409,77 @@ TEST_F(ConnectionPoolTest, TryGetWorks) {
auto now = Date_t::now();
PoolImpl::setNow(now);
- // no connections in the pool, tryGet should fail
- ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode));
+ // Make our initial connection, use and return it
+ {
+ size_t connId = 0;
- // Successfully get a new connection
- size_t conn1Id = 0;
- ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(
- HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ // no connections in the pool, our future is not satisfied
+ auto connFuture = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ ASSERT_FALSE(connFuture.isReady());
+
+ // Successfully get a new connection
+ ConnectionImpl::pushSetup(Status::OK());
+
+ // Future should be ready now
+ ASSERT_TRUE(connFuture.isReady());
+ std::move(connFuture).getAsync([&](StatusWithConn swConn) mutable {
+ connId = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
- ASSERT(conn1Id);
-
- // 1 connection in the pool, tryGet should succeed
- auto tryGetConn = pool.tryGet(HostAndPort(), transport::kGlobalSSLMode);
- ASSERT(tryGetConn);
+ ASSERT(connId);
+ }
- // No connection available, this waits in the request queue
- size_t conn3Id = 0;
- pool.get_forTest(
- HostAndPort(), Seconds(2), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn3Id = CONN2ID(swConn);
+ // There is one connection in the pool:
+ // * The first get should resolve immediately
+ // * The second get should should be queued
+ // * The eventual third should be queued before the second
+ {
+ size_t connId1 = 0;
+ size_t connId2 = 0;
+ size_t connId3 = 0;
+
+ auto connFuture1 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ auto connFuture2 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10});
+
+ // Queue up the second future to resolve as soon as it is ready
+ std::move(connFuture2).getAsync([&](StatusWithConn swConn) mutable {
+ connId2 = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
- ASSERT_EQ(conn3Id, 0ul);
+ // The first future should be immediately ready. The second should be in the queue.
+ ASSERT_TRUE(connFuture1.isReady());
+ ASSERT_FALSE(connFuture2.isReady());
- // We want to wait if there are any outstanding requests (to provide fair access to the pool),
- // so we need to call tryGet while fulfilling requests. This triggers that race by actually
- // calling tryGet from within a callback (which works, because we drop locks). Not the cleanest
- // way to do it, but gets us at least the code coverage we need.
- //
- // We run before the previous get because our deadline is 1 sec instead of 2
- size_t conn2Id = 0;
- pool.get_forTest(
- HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ // Resolve the first future to return the connection and continue on to the second.
+ decltype(connFuture1) connFuture3;
+ std::move(connFuture1).getAsync([&](StatusWithConn swConn) mutable {
+ // Grab our third future while our first one is being fulfilled
+ connFuture3 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+
+ connId1 = verifyAndGetId(swConn);
doneWith(swConn.getValue());
- swConn.getValue().reset();
+ });
+ ASSERT(connId1);
+ ASSERT_FALSE(connId2);
- // we do have one connection
- ASSERT_EQUALS(pool.getNumConnectionsPerHost(HostAndPort()), 1ul);
+ // Since the third future has a smaller timeout than the second,
+ // it should take priority over the second
+ ASSERT_TRUE(connFuture3.isReady());
+ ASSERT_FALSE(connFuture2.isReady());
- // we fail because there's an outstanding request, even though we do have a good
- // connection
- // available.
- ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode));
- });
+ // Resolve the third future. This should trigger the second future
+ std::move(connFuture3).getAsync([&](StatusWithConn swConn) mutable {
+ // We run before the second future
+ ASSERT_FALSE(connId2);
- doneWith(*tryGetConn);
- tryGetConn.reset();
+ connId3 = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
- ASSERT(conn2Id);
- ASSERT(conn3Id);
+ ASSERT_EQ(connId1, connId2);
+ ASSERT_EQ(connId2, connId3);
+ }
}
} // namespace connection_pool_test_details