summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/executor/connection_pool.cpp202
-rw-r--r--src/mongo/executor/connection_pool.h11
-rw-r--r--src/mongo/executor/connection_pool_test.cpp174
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.h15
-rw-r--r--src/mongo/executor/connection_pool_tl.h3
-rw-r--r--src/mongo/executor/network_interface_tl.cpp52
-rw-r--r--src/mongo/executor/network_interface_tl.h14
7 files changed, 225 insertions, 246 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 41e4e602c65..4cbc42dc2ec 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -140,12 +140,6 @@ public:
Future<ConnectionHandle> getConnection(Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk);
/**
- * Gets a connection from the specific pool if a connection is available and there are no
- * outstanding requests.
- */
- boost::optional<ConnectionHandle> tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk);
-
- /**
* Triggers the shutdown procedure. This function marks the state as kInShutdown
* and calls processFailure below with the status provided. This may not immediately
* delist or destruct this pool. However, both will happen eventually as ConnectionHandles
@@ -228,15 +222,21 @@ private:
}
};
+ ConnectionHandle makeHandle(ConnectionInterface* connection);
+
+ void finishRefresh(stdx::unique_lock<stdx::mutex> lk,
+ ConnectionInterface* connPtr,
+ Status status);
+
void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn);
void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk);
void spawnConnections(stdx::unique_lock<stdx::mutex>& lk);
- // This internal helper is used both by tryGet and by fulfillRequests and differs in that it
+ // This internal helper is used both by get and by fulfillRequests and differs in that it
// skips some bookkeeping that the other callers do on their own
- boost::optional<ConnectionHandle> tryGetInternal(const stdx::unique_lock<stdx::mutex>& lk);
+ ConnectionHandle tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk);
template <typename OwnershipPoolType>
typename OwnershipPoolType::mapped_type takeFromPool(
@@ -398,23 +398,6 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort,
return get(hostAndPort, transport::kGlobalSSLMode, timeout).getAsync(std::move(cb));
}
-boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::tryGet(
- const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- auto iter = _pools.find(hostAndPort);
-
- if (iter == _pools.end()) {
- return boost::none;
- }
-
- const auto& pool = iter->second;
- invariant(pool);
- pool->fassertSSLModeIs(sslMode);
-
- return pool->tryGetConnection(lk);
-}
-
Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort,
transport::ConnectSSLMode sslMode,
Milliseconds timeout) {
@@ -523,6 +506,14 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) {
invariant(_state != State::kInShutdown);
+ auto conn = tryGetConnection(lk);
+
+ updateStateInLock();
+
+ if (conn) {
+ return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn));
+ }
+
if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) {
timeout = _parent->_options.refreshTimeout;
}
@@ -535,28 +526,25 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
updateStateInLock();
- spawnConnections(lk);
- fulfillRequests(lk);
+ lk.unlock();
+ _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
+ fassert(20000, schedStatus);
+
+ spawnConnections(lk);
+ }));
return std::move(pf.future);
}
-boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetConnection(
- const stdx::unique_lock<stdx::mutex>& lk) {
- invariant(_state != State::kInShutdown);
-
- if (_requests.size()) {
- return boost::none;
- }
+auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle {
+ auto fun = guardCallback(
+ [this](auto lk, auto connection) { returnConnection(connection, std::move(lk)); });
- auto conn = tryGetInternal(lk);
-
- updateStateInLock();
-
- return conn;
+ auto handle = ConnectionHandle(connection, fun);
+ return handle;
}
-boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetInternal(
+ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(
const stdx::unique_lock<stdx::mutex>&) {
while (_readyPool.size()) {
@@ -582,14 +570,55 @@ boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::
// pass it to the user
connPtr->resetToUnknown();
- return ConnectionHandle(connPtr,
- guardCallback([this](stdx::unique_lock<stdx::mutex> localLk,
- ConnectionPool::ConnectionInterface* conn) {
- returnConnection(conn, std::move(localLk));
- }));
+ auto handle = makeHandle(connPtr);
+ return handle;
}
- return boost::none;
+ return {};
+}
+
+void ConnectionPool::SpecificPool::finishRefresh(stdx::unique_lock<stdx::mutex> lk,
+ ConnectionInterface* connPtr,
+ Status status) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If we're in shutdown, we don't need refreshed connections
+ if (_state == State::kInShutdown)
+ return;
+
+ // If we've exceeded the time limit, start a new connect,
+ // rather than failing all operations. We do this because the
+ // various callers have their own time limit which is unrelated
+ // to our internal one.
+ if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ LOG(0) << "Pending connection to host " << _hostAndPort
+ << " did not complete within the connection timeout,"
+ << " retrying with a new connection;" << openConnections(lk)
+ << " connections to that host remain open";
+ spawnConnections(lk);
+ return;
+ }
+
+ // Pass a failure on through
+ if (!status.isOK()) {
+ processFailure(status, std::move(lk));
+ return;
+ }
+
+ // If the host and port were dropped, let this lapse and spawn new connections
+ if (conn->getGeneration() != _generation) {
+ spawnConnections(lk);
+ return;
+ }
+
+ // If the connection refreshed successfully, throw it back in the ready pool
+ addToReady(lk, std::move(conn));
+
+ lk.unlock();
+ _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
+ fassert(20003, schedStatus);
+ fulfillRequests(lk);
+ }));
}
void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr,
@@ -631,49 +660,19 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
// Unlock in case refresh can occur immediately
lk.unlock();
connPtr->refresh(_parent->_options.refreshTimeout,
- guardCallback([this](stdx::unique_lock<stdx::mutex> lk,
- ConnectionInterface* connPtr,
- Status status) {
- auto conn = takeFromProcessingPool(connPtr);
-
- // If we're in shutdown, we don't need refreshed connections
- if (_state == State::kInShutdown)
- return;
-
- // If the connection refreshed successfully, throw it back in
- // the ready pool
- if (status.isOK()) {
- // If the host and port were dropped, let this lapse
- if (conn->getGeneration() == _generation) {
- addToReady(lk, std::move(conn));
- fulfillRequests(lk);
- }
-
- return;
- }
-
- // If we've exceeded the time limit, start a new connect,
- // rather than failing all operations. We do this because the
- // various callers have their own time limit which is unrelated
- // to our internal one.
- if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
- log() << "Pending connection to host " << _hostAndPort
- << " did not complete within the connection timeout,"
- << " retrying with a new connection;" << openConnections(lk)
- << " connections to that host remain open";
- spawnConnections(lk);
- return;
- }
-
- // Otherwise pass the failure on through
- processFailure(status, std::move(lk));
+ guardCallback([this](auto lk, auto conn, auto status) {
+ finishRefresh(std::move(lk), conn, status);
}));
lk.lock();
} else {
// If it's fine as it is, just put it in the ready queue
addToReady(lk, std::move(conn));
- // TODO This should be scheduled on an executor once we have executor-aware pooling
- fulfillRequests(lk);
+
+ lk.unlock();
+ _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
+ fassert(20004, schedStatus);
+ fulfillRequests(lk);
+ }));
}
updateStateInLock();
@@ -784,7 +783,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
// deadlock).
//
// None of the heap manipulation code throws, but it's something to keep in mind.
- auto conn = tryGetInternal(lk);
+ auto conn = tryGetConnection(lk);
if (!conn) {
break;
@@ -796,7 +795,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
_requests.pop_back();
lk.unlock();
- promise.emplaceValue(std::move(*conn));
+ promise.emplaceValue(std::move(conn));
lk.lock();
updateStateInLock();
@@ -847,32 +846,11 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
// Run the setup callback
lk.unlock();
- handle->setup(
- _parent->_options.refreshTimeout,
- guardCallback([this](
- stdx::unique_lock<stdx::mutex> lk, ConnectionInterface* connPtr, Status status) {
- auto conn = takeFromProcessingPool(connPtr);
-
- // If we're in shutdown, we don't need this conn
- if (_state == State::kInShutdown)
- return;
-
- if (status.isOK()) {
- // If the host and port was dropped, let the connection lapse
- if (conn->getGeneration() == _generation) {
- addToReady(lk, std::move(conn));
- fulfillRequests(lk);
- }
- } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
- // If we've exceeded the time limit, restart the connect, rather than
- // failing all operations. We do this because the various callers
- // have their own time limit which is unrelated to our internal one.
- spawnConnections(lk);
- } else {
- // If the setup failed, cascade the failure edge
- processFailure(status, std::move(lk));
- }
- }));
+ handle->setup(_parent->_options.refreshTimeout,
+ guardCallback([this](auto lk, auto conn, auto status) {
+ finishRefresh(std::move(lk), conn, status);
+ }));
+
// Note that this assumes that the refreshTimeout is sound for the
// setupTimeout
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 5903d1b0bda..7066177e438 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -42,6 +42,7 @@
#include "mongo/transport/transport_layer.h"
#include "mongo/util/future.h"
#include "mongo/util/net/hostandport.h"
+#include "mongo/util/out_of_line_executor.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -72,7 +73,7 @@ public:
using ConnectionHandleDeleter = stdx::function<void(ConnectionInterface* connection)>;
using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>;
- using GetConnectionCallback = stdx::function<void(StatusWith<ConnectionHandle>)>;
+ using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>;
static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000); // 5mins
static const size_t kDefaultMaxConns;
@@ -156,9 +157,6 @@ public:
Milliseconds timeout,
GetConnectionCallback cb);
- boost::optional<ConnectionHandle> tryGet(const HostAndPort& hostAndPort,
- transport::ConnectSSLMode sslMode);
-
void appendConnectionStats(ConnectionPoolStats* stats) const;
size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const;
@@ -342,6 +340,11 @@ public:
size_t generation) = 0;
/**
+ * Return the executor for use with this factory
+ */
+ virtual OutOfLineExecutor& getExecutor() = 0;
+
+ /**
* Makes a new timer
*/
virtual std::shared_ptr<TimerInterface> makeTimer() = 0;
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 0e689456afa..b36cf8cf749 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -32,6 +32,7 @@
#include <algorithm>
#include <random>
#include <stack>
+#include <tuple>
#include "mongo/executor/connection_pool_test_fixture.h"
@@ -58,15 +59,17 @@ protected:
private:
};
-void doneWith(const ConnectionPool::ConnectionHandle& swConn) {
- static_cast<ConnectionImpl*>(swConn.get())->indicateSuccess();
+void doneWith(const ConnectionPool::ConnectionHandle& conn) {
+ dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess();
}
-#define CONN2ID(swConn) \
- [](StatusWith<ConnectionPool::ConnectionHandle>& swConn) { \
- ASSERT(swConn.isOK()); \
- return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \
- }(swConn)
+using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>;
+
+auto verifyAndGetId(StatusWithConn& swConn) {
+ ASSERT(swConn.isOK());
+ auto& conn = swConn.getValue();
+ return dynamic_cast<ConnectionImpl*>(conn.get())->id();
+}
/**
* Verify that we get the same connection if we grab one, return it and grab
@@ -81,7 +84,7 @@ TEST_F(ConnectionPoolTest, SameConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -91,7 +94,7 @@ TEST_F(ConnectionPoolTest, SameConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -159,7 +162,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
- const auto id = CONN2ID(swConn);
+ const auto id = verifyAndGetId(swConn);
connections.push_back(std::move(swConn.getValue()));
ASSERT(id == ids.top());
ids.pop();
@@ -211,7 +214,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
- original_ids.insert(CONN2ID(swConn));
+ original_ids.insert(verifyAndGetId(swConn));
connections.push_back(std::move(swConn.getValue()));
});
}
@@ -243,7 +246,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
- reacquired_ids.insert(CONN2ID(swConn));
+ reacquired_ids.insert(verifyAndGetId(swConn));
connections.push_back(std::move(swConn.getValue()));
});
}
@@ -282,7 +285,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error"));
});
@@ -292,7 +295,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -315,7 +318,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
pool.get_forTest(HostAndPort("localhost:30000"),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -325,7 +328,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
pool.get_forTest(HostAndPort("localhost:30001"),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -460,7 +463,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -471,7 +474,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
ASSERT_EQ(conn1Id, conn2Id);
@@ -503,7 +506,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(1000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(CONN2ID(swConn), conn1Id);
+ ASSERT_NE(verifyAndGetId(swConn), conn1Id);
reachedB = true;
doneWith(swConn.getValue());
});
@@ -942,7 +945,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = CONN2ID(swConn);
+ connId = verifyAndGetId(swConn);
reachedA = true;
doneWith(swConn.getValue());
});
@@ -959,7 +962,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, CONN2ID(swConn));
+ ASSERT_NE(connId, verifyAndGetId(swConn));
reachedB = true;
doneWith(swConn.getValue());
});
@@ -992,7 +995,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = CONN2ID(swConn);
+ connId = verifyAndGetId(swConn);
reachedA = true;
doneWith(swConn.getValue());
});
@@ -1006,7 +1009,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, CONN2ID(swConn));
+ ASSERT_EQ(connId, verifyAndGetId(swConn));
reachedB = true;
doneWith(swConn.getValue());
});
@@ -1019,7 +1022,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, CONN2ID(swConn));
+ ASSERT_EQ(connId, verifyAndGetId(swConn));
reachedB2 = true;
doneWith(swConn.getValue());
});
@@ -1034,7 +1037,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, CONN2ID(swConn));
+ ASSERT_NE(connId, verifyAndGetId(swConn));
reachedC = true;
doneWith(swConn.getValue());
});
@@ -1067,7 +1070,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
conn1 = std::move(swConn.getValue());
});
@@ -1078,7 +1081,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
@@ -1093,7 +1096,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(conn2Id, CONN2ID(swConn));
+ ASSERT_EQ(conn2Id, verifyAndGetId(swConn));
reachedA = true;
doneWith(swConn.getValue());
});
@@ -1114,8 +1117,8 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(conn1Id, CONN2ID(swConn));
- ASSERT_NE(conn2Id, CONN2ID(swConn));
+ ASSERT_NE(conn1Id, verifyAndGetId(swConn));
+ ASSERT_NE(conn2Id, verifyAndGetId(swConn));
reachedB = true;
doneWith(swConn.getValue());
});
@@ -1143,7 +1146,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
ASSERT(conn1Id);
@@ -1153,7 +1156,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(CONN2ID(swConn), conn1Id);
+ ASSERT_EQ(verifyAndGetId(swConn), conn1Id);
handle = std::move(swConn.getValue());
});
@@ -1187,7 +1190,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ conn2Id = verifyAndGetId(swConn);
ASSERT_NE(conn2Id, conn1Id);
doneWith(swConn.getValue());
});
@@ -1209,7 +1212,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(CONN2ID(swConn), conn2Id);
+ ASSERT_NE(verifyAndGetId(swConn), conn2Id);
reachedB = true;
doneWith(swConn.getValue());
});
@@ -1276,7 +1279,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
ConnectionImpl::pushSetup(Status::OK());
pool.get_forTest(
HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ conn1Id = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
ASSERT(conn1Id);
@@ -1398,7 +1401,7 @@ TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) {
dropConnectionsTest(pool, manager);
}
-TEST_F(ConnectionPoolTest, TryGetWorks) {
+TEST_F(ConnectionPoolTest, AsyncGet) {
ConnectionPool::Options options;
options.maxConnections = 1;
ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
@@ -1406,60 +1409,77 @@ TEST_F(ConnectionPoolTest, TryGetWorks) {
auto now = Date_t::now();
PoolImpl::setNow(now);
- // no connections in the pool, tryGet should fail
- ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode));
+ // Make our initial connection, use and return it
+ {
+ size_t connId = 0;
- // Successfully get a new connection
- size_t conn1Id = 0;
- ConnectionImpl::pushSetup(Status::OK());
- pool.get_forTest(
- HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = CONN2ID(swConn);
+ // no connections in the pool, our future is not satisfied
+ auto connFuture = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ ASSERT_FALSE(connFuture.isReady());
+
+ // Successfully get a new connection
+ ConnectionImpl::pushSetup(Status::OK());
+
+ // Future should be ready now
+ ASSERT_TRUE(connFuture.isReady());
+ std::move(connFuture).getAsync([&](StatusWithConn swConn) mutable {
+ connId = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
- ASSERT(conn1Id);
-
- // 1 connection in the pool, tryGet should succeed
- auto tryGetConn = pool.tryGet(HostAndPort(), transport::kGlobalSSLMode);
- ASSERT(tryGetConn);
+ ASSERT(connId);
+ }
- // No connection available, this waits in the request queue
- size_t conn3Id = 0;
- pool.get_forTest(
- HostAndPort(), Seconds(2), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn3Id = CONN2ID(swConn);
+ // There is one connection in the pool:
+ // * The first get should resolve immediately
+ // * The second get should should be queued
+ // * The eventual third should be queued before the second
+ {
+ size_t connId1 = 0;
+ size_t connId2 = 0;
+ size_t connId3 = 0;
+
+ auto connFuture1 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ auto connFuture2 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10});
+
+ // Queue up the second future to resolve as soon as it is ready
+ std::move(connFuture2).getAsync([&](StatusWithConn swConn) mutable {
+ connId2 = verifyAndGetId(swConn);
doneWith(swConn.getValue());
});
- ASSERT_EQ(conn3Id, 0ul);
+ // The first future should be immediately ready. The second should be in the queue.
+ ASSERT_TRUE(connFuture1.isReady());
+ ASSERT_FALSE(connFuture2.isReady());
- // We want to wait if there are any outstanding requests (to provide fair access to the pool),
- // so we need to call tryGet while fulfilling requests. This triggers that race by actually
- // calling tryGet from within a callback (which works, because we drop locks). Not the cleanest
- // way to do it, but gets us at least the code coverage we need.
- //
- // We run before the previous get because our deadline is 1 sec instead of 2
- size_t conn2Id = 0;
- pool.get_forTest(
- HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = CONN2ID(swConn);
+ // Resolve the first future to return the connection and continue on to the second.
+ decltype(connFuture1) connFuture3;
+ std::move(connFuture1).getAsync([&](StatusWithConn swConn) mutable {
+ // Grab our third future while our first one is being fulfilled
+ connFuture3 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+
+ connId1 = verifyAndGetId(swConn);
doneWith(swConn.getValue());
- swConn.getValue().reset();
+ });
+ ASSERT(connId1);
+ ASSERT_FALSE(connId2);
- // we do have one connection
- ASSERT_EQUALS(pool.getNumConnectionsPerHost(HostAndPort()), 1ul);
+ // Since the third future has a smaller timeout than the second,
+ // it should take priority over the second
+ ASSERT_TRUE(connFuture3.isReady());
+ ASSERT_FALSE(connFuture2.isReady());
- // we fail because there's an outstanding request, even though we do have a good
- // connection
- // available.
- ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode));
- });
+ // Resolve the third future. This should trigger the second future
+ std::move(connFuture3).getAsync([&](StatusWithConn swConn) mutable {
+ // We run before the second future
+ ASSERT_FALSE(connId2);
- doneWith(*tryGetConn);
- tryGetConn.reset();
+ connId3 = verifyAndGetId(swConn);
+ doneWith(swConn.getValue());
+ });
- ASSERT(conn2Id);
- ASSERT(conn3Id);
+ ASSERT_EQ(connId1, connId2);
+ ASSERT_EQ(connId2, connId3);
+ }
}
} // namespace connection_pool_test_details
diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h
index f41de316950..2ecb6ca5279 100644
--- a/src/mongo/executor/connection_pool_test_fixture.h
+++ b/src/mongo/executor/connection_pool_test_fixture.h
@@ -134,6 +134,16 @@ private:
};
/**
+ * An "OutOfLineExecutor" that actually runs on the same thread of execution
+ */
+class InlineOutOfLineExecutor : public OutOfLineExecutor {
+public:
+ void schedule(Task task) override {
+ std::move(task)(Status::OK());
+ }
+};
+
+/**
* Mock for the pool implementation
*/
class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface {
@@ -147,6 +157,11 @@ public:
std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override;
+ OutOfLineExecutor& getExecutor() override {
+ static InlineOutOfLineExecutor _executor;
+ return _executor;
+ }
+
Date_t now() override;
void shutdown() override {
diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h
index 17f000ffaeb..d614436c49d 100644
--- a/src/mongo/executor/connection_pool_tl.h
+++ b/src/mongo/executor/connection_pool_tl.h
@@ -56,6 +56,9 @@ public:
transport::ConnectSSLMode sslMode,
size_t generation) override;
std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override;
+ OutOfLineExecutor& getExecutor() override {
+ return *_reactor;
+ }
Date_t now() override;
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index a9529d1c825..1ad164f5c44 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -213,48 +213,18 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
return Status::OK();
}
- // Interacting with the connection pool can involve more work than just getting a connection
- // out. In particular, we can end up having to spin up new connections, and fulfilling promises
- // for other requesters. Returning connections has the same issue.
- //
- // To work around it, we make sure to hop onto the reactor thread before getting a connection,
- // then making sure to get back to the client thread to do the work (if on a baton). And we
- // hook up a connection returning unique_ptr that ensures that however we exit, we always do the
- // return on the reactor thread.
- //
- // TODO: get rid of this cruft once we have a connection pool that's executor aware.
-
- auto connFuture = [&] {
- auto conn = _pool->tryGet(request.target, request.sslMode);
-
- if (conn) {
- return Future<ConnectionPool::ConnectionHandle>(std::move(*conn));
- }
-
- return _reactor
- ->execute([this, state, request, baton] {
- return makeReadyFutureWith([this, request] {
- return _pool->get(request.target, request.sslMode, request.timeout);
- });
- })
- .tapError([state](Status error) {
- LOG(2) << "Failed to get connection from pool for request " << state->request.id
- << ": " << error;
- });
- }().then([this, baton](ConnectionPool::ConnectionHandle conn) {
- auto deleter = conn.get_deleter();
-
- // TODO: drop out this shared_ptr once we have a unique_function capable future
- return std::make_shared<CommandState::ConnHandle>(conn.release(),
- CommandState::Deleter{deleter, _reactor});
- });
+ auto connFuture = _pool->get(request.target, request.sslMode, request.timeout)
+ .tapError([state](Status error) {
+ LOG(2) << "Failed to get connection from pool for request "
+ << state->request.id << ": " << error;
+ });
auto remainingWork =
[ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ](
- StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable {
makeReadyFutureWith([&] {
- auto connHandle = uassertStatusOK(std::move(swConn));
- return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton);
+ auto conn = uassertStatusOK(std::move(swConn));
+ return _onAcquireConn(state, std::move(future), std::move(conn), baton);
})
.onError([](Status error) -> StatusWith<RemoteCommandResponse> {
// The TransportLayer has, for historical reasons returned SocketException for
@@ -283,7 +253,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
// connection
std::move(connFuture)
.getAsync([ baton, reactor = _reactor.get(), rw = std::move(remainingWork) ](
- StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable {
baton->schedule([ rw = std::move(rw),
swConn = std::move(swConn) ](OperationContext * opCtx) mutable {
if (opCtx) {
@@ -298,7 +268,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
// otherwise we're happy to run inline
std::move(connFuture)
.getAsync([rw = std::move(remainingWork)](
- StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable {
std::move(rw)(std::move(swConn));
});
}
@@ -311,7 +281,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
std::shared_ptr<CommandState> state,
Future<RemoteCommandResponse> future,
- CommandState::ConnHandle conn,
+ ConnectionPool::ConnectionHandle conn,
const BatonHandle& baton) {
if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) {
conn->indicateSuccess();
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 2ab75f49aaa..d6f50032111 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -97,17 +97,7 @@ private:
Date_t deadline = RemoteCommandRequest::kNoExpirationDate;
Date_t start;
- struct Deleter {
- ConnectionPool::ConnectionHandleDeleter returner;
- transport::ReactorHandle reactor;
-
- void operator()(ConnectionPool::ConnectionInterface* ptr) const {
- reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); });
- }
- };
- using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>;
-
- ConnHandle conn;
+ ConnectionPool::ConnectionHandle conn;
std::unique_ptr<transport::ReactorTimer> timer;
AtomicWord<bool> done;
@@ -138,7 +128,7 @@ private:
void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle);
Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state,
Future<RemoteCommandResponse> future,
- CommandState::ConnHandle conn,
+ ConnectionPool::ConnectionHandle conn,
const BatonHandle& baton);
std::string _instanceName;