diff options
author | Ben Caimano <ben.caimano@10gen.com> | 2019-03-05 13:35:34 -0500 |
---|---|---|
committer | Ben Caimano <ben.caimano@10gen.com> | 2019-04-02 18:23:30 -0400 |
commit | f4b07839c904a73d28e3994af5bc902004fd4f9d (patch) | |
tree | ce8b79ce9aa9088e23333c8dfb13b32ed5bd08a6 /src/mongo/executor/connection_pool_test.cpp | |
parent | 04ea1d46eb6c4c78e19409f120ae2e61f2a35204 (diff) | |
download | mongo-f4b07839c904a73d28e3994af5bc902004fd4f9d.tar.gz |
SERVER-39814 Add OutOfLineExecutor to ConnectionPool factory interface
Diffstat (limited to 'src/mongo/executor/connection_pool_test.cpp')
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 174 |
1 files changed, 97 insertions, 77 deletions
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 0e689456afa..b36cf8cf749 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -32,6 +32,7 @@ #include <algorithm> #include <random> #include <stack> +#include <tuple> #include "mongo/executor/connection_pool_test_fixture.h" @@ -58,15 +59,17 @@ protected: private: }; -void doneWith(const ConnectionPool::ConnectionHandle& swConn) { - static_cast<ConnectionImpl*>(swConn.get())->indicateSuccess(); +void doneWith(const ConnectionPool::ConnectionHandle& conn) { + dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess(); } -#define CONN2ID(swConn) \ - [](StatusWith<ConnectionPool::ConnectionHandle>& swConn) { \ - ASSERT(swConn.isOK()); \ - return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \ - }(swConn) +using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>; + +auto verifyAndGetId(StatusWithConn& swConn) { + ASSERT(swConn.isOK()); + auto& conn = swConn.getValue(); + return dynamic_cast<ConnectionImpl*>(conn.get())->id(); +} /** * Verify that we get the same connection if we grab one, return it and grab @@ -81,7 +84,7 @@ TEST_F(ConnectionPoolTest, SameConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -91,7 +94,7 @@ TEST_F(ConnectionPoolTest, SameConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -159,7 +162,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); - const auto id = CONN2ID(swConn); + const auto id = verifyAndGetId(swConn); connections.push_back(std::move(swConn.getValue())); ASSERT(id == ids.top()); ids.pop(); @@ -211,7 +214,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); - original_ids.insert(CONN2ID(swConn)); + original_ids.insert(verifyAndGetId(swConn)); connections.push_back(std::move(swConn.getValue())); }); } @@ -243,7 +246,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); - reacquired_ids.insert(CONN2ID(swConn)); + reacquired_ids.insert(verifyAndGetId(swConn)); connections.push_back(std::move(swConn.getValue())); }); } @@ -282,7 +285,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error")); }); @@ -292,7 +295,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -315,7 +318,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { pool.get_forTest(HostAndPort("localhost:30000"), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -325,7 +328,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { pool.get_forTest(HostAndPort("localhost:30001"), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -460,7 +463,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -471,7 +474,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); ASSERT_EQ(conn1Id, conn2Id); @@ -503,7 +506,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(1000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(CONN2ID(swConn), conn1Id); + ASSERT_NE(verifyAndGetId(swConn), conn1Id); reachedB = true; doneWith(swConn.getValue()); }); @@ -942,7 +945,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = CONN2ID(swConn); + connId = verifyAndGetId(swConn); reachedA = true; doneWith(swConn.getValue()); }); @@ -959,7 +962,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, CONN2ID(swConn)); + ASSERT_NE(connId, verifyAndGetId(swConn)); reachedB = true; doneWith(swConn.getValue()); }); @@ -992,7 +995,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - connId = CONN2ID(swConn); + connId = verifyAndGetId(swConn); reachedA = true; doneWith(swConn.getValue()); }); @@ -1006,7 +1009,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, CONN2ID(swConn)); + ASSERT_EQ(connId, verifyAndGetId(swConn)); reachedB = true; doneWith(swConn.getValue()); }); @@ -1019,7 +1022,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(connId, CONN2ID(swConn)); + ASSERT_EQ(connId, verifyAndGetId(swConn)); reachedB2 = true; doneWith(swConn.getValue()); }); @@ -1034,7 +1037,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(connId, CONN2ID(swConn)); + ASSERT_NE(connId, verifyAndGetId(swConn)); reachedC = true; doneWith(swConn.getValue()); }); @@ -1067,7 +1070,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); conn1 = std::move(swConn.getValue()); }); @@ -1078,7 +1081,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); @@ -1093,7 +1096,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(conn2Id, CONN2ID(swConn)); + ASSERT_EQ(conn2Id, verifyAndGetId(swConn)); reachedA = true; doneWith(swConn.getValue()); }); @@ -1114,8 +1117,8 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(conn1Id, CONN2ID(swConn)); - ASSERT_NE(conn2Id, CONN2ID(swConn)); + ASSERT_NE(conn1Id, verifyAndGetId(swConn)); + ASSERT_NE(conn2Id, verifyAndGetId(swConn)); reachedB = true; doneWith(swConn.getValue()); }); @@ -1143,7 +1146,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); ASSERT(conn1Id); @@ -1153,7 +1156,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_EQ(CONN2ID(swConn), conn1Id); + ASSERT_EQ(verifyAndGetId(swConn), conn1Id); handle = std::move(swConn.getValue()); }); @@ -1187,7 +1190,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + conn2Id = verifyAndGetId(swConn); ASSERT_NE(conn2Id, conn1Id); doneWith(swConn.getValue()); }); @@ -1209,7 +1212,7 @@ TEST_F(ConnectionPoolTest, dropConnections) { pool.get_forTest(HostAndPort(), Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - ASSERT_NE(CONN2ID(swConn), conn2Id); + ASSERT_NE(verifyAndGetId(swConn), conn2Id); reachedB = true; doneWith(swConn.getValue()); }); @@ -1276,7 +1279,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) { ConnectionImpl::pushSetup(Status::OK()); pool.get_forTest( HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + conn1Id = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); ASSERT(conn1Id); @@ -1398,7 +1401,7 @@ TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) { dropConnectionsTest(pool, manager); } -TEST_F(ConnectionPoolTest, TryGetWorks) { +TEST_F(ConnectionPoolTest, AsyncGet) { ConnectionPool::Options options; options.maxConnections = 1; ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options); @@ -1406,60 +1409,77 @@ TEST_F(ConnectionPoolTest, TryGetWorks) { auto now = Date_t::now(); PoolImpl::setNow(now); - // no connections in the pool, tryGet should fail - ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode)); + // Make our initial connection, use and return it + { + size_t connId = 0; - // Successfully get a new connection - size_t conn1Id = 0; - ConnectionImpl::pushSetup(Status::OK()); - pool.get_forTest( - HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn1Id = CONN2ID(swConn); + // no connections in the pool, our future is not satisfied + auto connFuture = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + ASSERT_FALSE(connFuture.isReady()); + + // Successfully get a new connection + ConnectionImpl::pushSetup(Status::OK()); + + // Future should be ready now + ASSERT_TRUE(connFuture.isReady()); + std::move(connFuture).getAsync([&](StatusWithConn swConn) mutable { + connId = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); - ASSERT(conn1Id); - - // 1 connection in the pool, tryGet should succeed - auto tryGetConn = pool.tryGet(HostAndPort(), transport::kGlobalSSLMode); - ASSERT(tryGetConn); + ASSERT(connId); + } - // No connection available, this waits in the request queue - size_t conn3Id = 0; - pool.get_forTest( - HostAndPort(), Seconds(2), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn3Id = CONN2ID(swConn); + // There is one connection in the pool: + // * The first get should resolve immediately + // * The second get should should be queued + // * The eventual third should be queued before the second + { + size_t connId1 = 0; + size_t connId2 = 0; + size_t connId3 = 0; + + auto connFuture1 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + auto connFuture2 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10}); + + // Queue up the second future to resolve as soon as it is ready + std::move(connFuture2).getAsync([&](StatusWithConn swConn) mutable { + connId2 = verifyAndGetId(swConn); doneWith(swConn.getValue()); }); - ASSERT_EQ(conn3Id, 0ul); + // The first future should be immediately ready. The second should be in the queue. + ASSERT_TRUE(connFuture1.isReady()); + ASSERT_FALSE(connFuture2.isReady()); - // We want to wait if there are any outstanding requests (to provide fair access to the pool), - // so we need to call tryGet while fulfilling requests. This triggers that race by actually - // calling tryGet from within a callback (which works, because we drop locks). Not the cleanest - // way to do it, but gets us at least the code coverage we need. - // - // We run before the previous get because our deadline is 1 sec instead of 2 - size_t conn2Id = 0; - pool.get_forTest( - HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { - conn2Id = CONN2ID(swConn); + // Resolve the first future to return the connection and continue on to the second. + decltype(connFuture1) connFuture3; + std::move(connFuture1).getAsync([&](StatusWithConn swConn) mutable { + // Grab our third future while our first one is being fulfilled + connFuture3 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1}); + + connId1 = verifyAndGetId(swConn); doneWith(swConn.getValue()); - swConn.getValue().reset(); + }); + ASSERT(connId1); + ASSERT_FALSE(connId2); - // we do have one connection - ASSERT_EQUALS(pool.getNumConnectionsPerHost(HostAndPort()), 1ul); + // Since the third future has a smaller timeout than the second, + // it should take priority over the second + ASSERT_TRUE(connFuture3.isReady()); + ASSERT_FALSE(connFuture2.isReady()); - // we fail because there's an outstanding request, even though we do have a good - // connection - // available. - ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode)); - }); + // Resolve the third future. This should trigger the second future + std::move(connFuture3).getAsync([&](StatusWithConn swConn) mutable { + // We run before the second future + ASSERT_FALSE(connId2); - doneWith(*tryGetConn); - tryGetConn.reset(); + connId3 = verifyAndGetId(swConn); + doneWith(swConn.getValue()); + }); - ASSERT(conn2Id); - ASSERT(conn3Id); + ASSERT_EQ(connId1, connId2); + ASSERT_EQ(connId2, connId3); + } } } // namespace connection_pool_test_details |