summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/connection_pool.cpp202
-rw-r--r--src/mongo/executor/connection_pool.h11
-rw-r--r--src/mongo/executor/connection_pool_test.cpp174
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.h15
-rw-r--r--src/mongo/executor/connection_pool_tl.h3
-rw-r--r--src/mongo/executor/network_interface_tl.cpp52
-rw-r--r--src/mongo/executor/network_interface_tl.h14
7 files changed, 246 insertions, 225 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 4cbc42dc2ec..41e4e602c65 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -140,6 +140,12 @@ public:
Future<ConnectionHandle> getConnection(Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk);
/**
+ * Gets a connection from the specific pool if a connection is available and there are no
+ * outstanding requests.
+ */
+ boost::optional<ConnectionHandle> tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk);
+
+ /**
* Triggers the shutdown procedure. This function marks the state as kInShutdown
* and calls processFailure below with the status provided. This may not immediately
* delist or destruct this pool. However, both will happen eventually as ConnectionHandles
@@ -222,21 +228,15 @@ private:
}
};
- ConnectionHandle makeHandle(ConnectionInterface* connection);
-
- void finishRefresh(stdx::unique_lock<stdx::mutex> lk,
- ConnectionInterface* connPtr,
- Status status);
-
void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn);
void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk);
void spawnConnections(stdx::unique_lock<stdx::mutex>& lk);
- // This internal helper is used both by get and by fulfillRequests and differs in that it
+ // This internal helper is used both by tryGet and by fulfillRequests and differs in that it
// skips some bookkeeping that the other callers do on their own
- ConnectionHandle tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk);
+ boost::optional<ConnectionHandle> tryGetInternal(const stdx::unique_lock<stdx::mutex>& lk);
template <typename OwnershipPoolType>
typename OwnershipPoolType::mapped_type takeFromPool(
@@ -398,6 +398,23 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort,
return get(hostAndPort, transport::kGlobalSSLMode, timeout).getAsync(std::move(cb));
}
+boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::tryGet(
+ const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ auto iter = _pools.find(hostAndPort);
+
+ if (iter == _pools.end()) {
+ return boost::none;
+ }
+
+ const auto& pool = iter->second;
+ invariant(pool);
+ pool->fassertSSLModeIs(sslMode);
+
+ return pool->tryGetConnection(lk);
+}
+
Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort,
transport::ConnectSSLMode sslMode,
Milliseconds timeout) {
@@ -506,14 +523,6 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) {
invariant(_state != State::kInShutdown);
- auto conn = tryGetConnection(lk);
-
- updateStateInLock();
-
- if (conn) {
- return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn));
- }
-
if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) {
timeout = _parent->_options.refreshTimeout;
}
@@ -526,25 +535,28 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
updateStateInLock();
- lk.unlock();
- _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
- fassert(20000, schedStatus);
-
- spawnConnections(lk);
- }));
+ spawnConnections(lk);
+ fulfillRequests(lk);
return std::move(pf.future);
}
-auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle {
- auto fun = guardCallback(
- [this](auto lk, auto connection) { returnConnection(connection, std::move(lk)); });
+boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetConnection(
+ const stdx::unique_lock<stdx::mutex>& lk) {
+ invariant(_state != State::kInShutdown);
+
+ if (_requests.size()) {
+ return boost::none;
+ }
- auto handle = ConnectionHandle(connection, fun);
- return handle;
+ auto conn = tryGetInternal(lk);
+
+ updateStateInLock();
+
+ return conn;
}
-ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(
+boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetInternal(
const stdx::unique_lock<stdx::mutex>&) {
while (_readyPool.size()) {
@@ -570,55 +582,14 @@ ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(
// pass it to the user
connPtr->resetToUnknown();
- auto handle = makeHandle(connPtr);
- return handle;
+ return ConnectionHandle(connPtr,
+ guardCallback([this](stdx::unique_lock<stdx::mutex> localLk,
+ ConnectionPool::ConnectionInterface* conn) {
+ returnConnection(conn, std::move(localLk));
+ }));
}
- return {};
-}
-
-void ConnectionPool::SpecificPool::finishRefresh(stdx::unique_lock<stdx::mutex> lk,
- ConnectionInterface* connPtr,
- Status status) {
- auto conn = takeFromProcessingPool(connPtr);
-
- // If we're in shutdown, we don't need refreshed connections
- if (_state == State::kInShutdown)
- return;
-
- // If we've exceeded the time limit, start a new connect,
- // rather than failing all operations. We do this because the
- // various callers have their own time limit which is unrelated
- // to our internal one.
- if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
- LOG(0) << "Pending connection to host " << _hostAndPort
- << " did not complete within the connection timeout,"
- << " retrying with a new connection;" << openConnections(lk)
- << " connections to that host remain open";
- spawnConnections(lk);
- return;
- }
-
- // Pass a failure on through
- if (!status.isOK()) {
- processFailure(status, std::move(lk));
- return;
- }
-
- // If the host and port were dropped, let this lapse and spawn new connections
- if (conn->getGeneration() != _generation) {
- spawnConnections(lk);
- return;
- }
-
- // If the connection refreshed successfully, throw it back in the ready pool
- addToReady(lk, std::move(conn));
-
- lk.unlock();
- _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
- fassert(20003, schedStatus);
- fulfillRequests(lk);
- }));
+ return boost::none;
}
void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr,
@@ -660,19 +631,49 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
// Unlock in case refresh can occur immediately
lk.unlock();
connPtr->refresh(_parent->_options.refreshTimeout,
- guardCallback([this](auto lk, auto conn, auto status) {
- finishRefresh(std::move(lk), conn, status);
+ guardCallback([this](stdx::unique_lock<stdx::mutex> lk,
+ ConnectionInterface* connPtr,
+ Status status) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If we're in shutdown, we don't need refreshed connections
+ if (_state == State::kInShutdown)
+ return;
+
+ // If the connection refreshed successfully, throw it back in
+ // the ready pool
+ if (status.isOK()) {
+ // If the host and port were dropped, let this lapse
+ if (conn->getGeneration() == _generation) {
+ addToReady(lk, std::move(conn));
+ fulfillRequests(lk);
+ }
+
+ return;
+ }
+
+ // If we've exceeded the time limit, start a new connect,
+ // rather than failing all operations. We do this because the
+ // various callers have their own time limit which is unrelated
+ // to our internal one.
+ if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ log() << "Pending connection to host " << _hostAndPort
+ << " did not complete within the connection timeout,"
+ << " retrying with a new connection;" << openConnections(lk)
+ << " connections to that host remain open";
+ spawnConnections(lk);
+ return;
+ }
+
+ // Otherwise pass the failure on through
+ processFailure(status, std::move(lk));
}));
lk.lock();
} else {
// If it's fine as it is, just put it in the ready queue
addToReady(lk, std::move(conn));
-
- lk.unlock();
- _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
- fassert(20004, schedStatus);
- fulfillRequests(lk);
- }));
+ // TODO This should be scheduled on an executor once we have executor-aware pooling
+ fulfillRequests(lk);
}
updateStateInLock();
@@ -783,7 +784,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
// deadlock).
//
// None of the heap manipulation code throws, but it's something to keep in mind.
- auto conn = tryGetConnection(lk);
+ auto conn = tryGetInternal(lk);
if (!conn) {
break;
@@ -795,7 +796,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
_requests.pop_back();
lk.unlock();
- promise.emplaceValue(std::move(conn));
+ promise.emplaceValue(std::move(*conn));
lk.lock();
updateStateInLock();
@@ -846,11 +847,32 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
// Run the setup callback
lk.unlock();
- handle->setup(_parent->_options.refreshTimeout,
- guardCallback([this](auto lk, auto conn, auto status) {
- finishRefresh(std::move(lk), conn, status);
- }));
-
+ handle->setup(
+ _parent->_options.refreshTimeout,
+ guardCallback([this](
+ stdx::unique_lock<stdx::mutex> lk, ConnectionInterface* connPtr, Status status) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If we're in shutdown, we don't need this conn
+ if (_state == State::kInShutdown)
+ return;
+
+ if (status.isOK()) {
+ // If the host and port was dropped, let the connection lapse
+ if (conn->getGeneration() == _generation) {
+ addToReady(lk, std::move(conn));
+ fulfillRequests(lk);
+ }
+ } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ // If we've exceeded the time limit, restart the connect, rather than
+ // failing all operations. We do this because the various callers
+ // have their own time limit which is unrelated to our internal one.
+ spawnConnections(lk);
+ } else {
+ // If the setup failed, cascade the failure edge
+ processFailure(status, std::move(lk));
+ }
+ }));
// Note that this assumes that the refreshTimeout is sound for the
// setupTimeout
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 7066177e438..5903d1b0bda 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -42,7 +42,6 @@
#include "mongo/transport/transport_layer.h"
#include "mongo/util/future.h"
#include "mongo/util/net/hostandport.h"
-#include "mongo/util/out_of_line_executor.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -73,7 +72,7 @@ public:
using ConnectionHandleDeleter = stdx::function<void(ConnectionInterface* connection)>;
using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>;
- using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>;
+ using GetConnectionCallback = stdx::function<void(StatusWith<ConnectionHandle>)>;
static constexpr Milliseconds kDefaultHostTimeout = Milliseconds(300000); // 5mins
static const size_t kDefaultMaxConns;
@@ -157,6 +156,9 @@ public:
Milliseconds timeout,
GetConnectionCallback cb);
+ boost::optional<ConnectionHandle> tryGet(const HostAndPort& hostAndPort,
+ transport::ConnectSSLMode sslMode);
+
void appendConnectionStats(ConnectionPoolStats* stats) const;
size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const;
@@ -340,11 +342,6 @@ public:
size_t generation) = 0;
/**
- * Return the executor for use with this factory
- */
- virtual OutOfLineExecutor& getExecutor() = 0;
-
- /**
* Makes a new timer
*/
virtual std::shared_ptr<TimerInterface> makeTimer() = 0;
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index b36cf8cf749..0e689456afa 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -32,7 +32,6 @@
#include <algorithm>
#include <random>
#include <stack>
-#include <tuple>
#include "mongo/executor/connection_pool_test_fixture.h"
@@ -59,17 +58,15 @@ protected:
private:
};
-void doneWith(const ConnectionPool::ConnectionHandle& conn) {
- dynamic_cast<ConnectionImpl*>(conn.get())->indicateSuccess();
+void doneWith(const ConnectionPool::ConnectionHandle& swConn) {
+ static_cast<ConnectionImpl*>(swConn.get())->indicateSuccess();
}
-using StatusWithConn = StatusWith<ConnectionPool::ConnectionHandle>;
-
-auto verifyAndGetId(StatusWithConn& swConn) {
- ASSERT(swConn.isOK());
- auto& conn = swConn.getValue();
- return dynamic_cast<ConnectionImpl*>(conn.get())->id();
-}
+#define CONN2ID(swConn) \
+ [](StatusWith<ConnectionPool::ConnectionHandle>& swConn) { \
+ ASSERT(swConn.isOK()); \
+ return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \
+ }(swConn)
/**
* Verify that we get the same connection if we grab one, return it and grab
@@ -84,7 +81,7 @@ TEST_F(ConnectionPoolTest, SameConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
+ conn1Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
@@ -94,7 +91,7 @@ TEST_F(ConnectionPoolTest, SameConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
+ conn2Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
@@ -162,7 +159,7 @@ TEST_F(ConnectionPoolTest, ConnectionsAreAcquiredInMRUOrder) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
- const auto id = verifyAndGetId(swConn);
+ const auto id = CONN2ID(swConn);
connections.push_back(std::move(swConn.getValue()));
ASSERT(id == ids.top());
ids.pop();
@@ -214,7 +211,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
- original_ids.insert(verifyAndGetId(swConn));
+ original_ids.insert(CONN2ID(swConn));
connections.push_back(std::move(swConn.getValue()));
});
}
@@ -246,7 +243,7 @@ TEST_F(ConnectionPoolTest, ConnectionsNotUsedRecentlyArePurged) {
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
ASSERT(swConn.isOK());
- reacquired_ids.insert(verifyAndGetId(swConn));
+ reacquired_ids.insert(CONN2ID(swConn));
connections.push_back(std::move(swConn.getValue()));
});
}
@@ -285,7 +282,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
+ conn1Id = CONN2ID(swConn);
swConn.getValue()->indicateFailure(Status(ErrorCodes::BadValue, "error"));
});
@@ -295,7 +292,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
+ conn2Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
@@ -318,7 +315,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
pool.get_forTest(HostAndPort("localhost:30000"),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
+ conn1Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
@@ -328,7 +325,7 @@ TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
pool.get_forTest(HostAndPort("localhost:30001"),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
+ conn2Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
@@ -463,7 +460,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
+ conn1Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
@@ -474,7 +471,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
+ conn2Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
ASSERT_EQ(conn1Id, conn2Id);
@@ -506,7 +503,7 @@ TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(1000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(verifyAndGetId(swConn), conn1Id);
+ ASSERT_NE(CONN2ID(swConn), conn1Id);
reachedB = true;
doneWith(swConn.getValue());
});
@@ -945,7 +942,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = verifyAndGetId(swConn);
+ connId = CONN2ID(swConn);
reachedA = true;
doneWith(swConn.getValue());
});
@@ -962,7 +959,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, verifyAndGetId(swConn));
+ ASSERT_NE(connId, CONN2ID(swConn));
reachedB = true;
doneWith(swConn.getValue());
});
@@ -995,7 +992,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- connId = verifyAndGetId(swConn);
+ connId = CONN2ID(swConn);
reachedA = true;
doneWith(swConn.getValue());
});
@@ -1009,7 +1006,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, verifyAndGetId(swConn));
+ ASSERT_EQ(connId, CONN2ID(swConn));
reachedB = true;
doneWith(swConn.getValue());
});
@@ -1022,7 +1019,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(connId, verifyAndGetId(swConn));
+ ASSERT_EQ(connId, CONN2ID(swConn));
reachedB2 = true;
doneWith(swConn.getValue());
});
@@ -1037,7 +1034,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(connId, verifyAndGetId(swConn));
+ ASSERT_NE(connId, CONN2ID(swConn));
reachedC = true;
doneWith(swConn.getValue());
});
@@ -1070,7 +1067,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
+ conn1Id = CONN2ID(swConn);
conn1 = std::move(swConn.getValue());
});
@@ -1081,7 +1078,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
+ conn2Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
@@ -1096,7 +1093,7 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(conn2Id, verifyAndGetId(swConn));
+ ASSERT_EQ(conn2Id, CONN2ID(swConn));
reachedA = true;
doneWith(swConn.getValue());
});
@@ -1117,8 +1114,8 @@ TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(conn1Id, verifyAndGetId(swConn));
- ASSERT_NE(conn2Id, verifyAndGetId(swConn));
+ ASSERT_NE(conn1Id, CONN2ID(swConn));
+ ASSERT_NE(conn2Id, CONN2ID(swConn));
reachedB = true;
doneWith(swConn.getValue());
});
@@ -1146,7 +1143,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
+ conn1Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
ASSERT(conn1Id);
@@ -1156,7 +1153,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_EQ(verifyAndGetId(swConn), conn1Id);
+ ASSERT_EQ(CONN2ID(swConn), conn1Id);
handle = std::move(swConn.getValue());
});
@@ -1190,7 +1187,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn2Id = verifyAndGetId(swConn);
+ conn2Id = CONN2ID(swConn);
ASSERT_NE(conn2Id, conn1Id);
doneWith(swConn.getValue());
});
@@ -1212,7 +1209,7 @@ TEST_F(ConnectionPoolTest, dropConnections) {
pool.get_forTest(HostAndPort(),
Milliseconds(5000),
[&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- ASSERT_NE(verifyAndGetId(swConn), conn2Id);
+ ASSERT_NE(CONN2ID(swConn), conn2Id);
reachedB = true;
doneWith(swConn.getValue());
});
@@ -1279,7 +1276,7 @@ TEST_F(ConnectionPoolTest, RefreshTimeoutsDontTimeoutRequests) {
ConnectionImpl::pushSetup(Status::OK());
pool.get_forTest(
HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
- conn1Id = verifyAndGetId(swConn);
+ conn1Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
ASSERT(conn1Id);
@@ -1401,7 +1398,7 @@ TEST_F(ConnectionPoolTest, DropConnectionsInMultipleViaManager) {
dropConnectionsTest(pool, manager);
}
-TEST_F(ConnectionPoolTest, AsyncGet) {
+TEST_F(ConnectionPoolTest, TryGetWorks) {
ConnectionPool::Options options;
options.maxConnections = 1;
ConnectionPool pool(stdx::make_unique<PoolImpl>(), "test pool", options);
@@ -1409,77 +1406,60 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
auto now = Date_t::now();
PoolImpl::setNow(now);
- // Make our initial connection, use and return it
- {
- size_t connId = 0;
-
- // no connections in the pool, our future is not satisfied
- auto connFuture = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
- ASSERT_FALSE(connFuture.isReady());
+ // no connections in the pool, tryGet should fail
+ ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode));
- // Successfully get a new connection
- ConnectionImpl::pushSetup(Status::OK());
-
- // Future should be ready now
- ASSERT_TRUE(connFuture.isReady());
- std::move(connFuture).getAsync([&](StatusWithConn swConn) mutable {
- connId = verifyAndGetId(swConn);
+ // Successfully get a new connection
+ size_t conn1Id = 0;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get_forTest(
+ HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn1Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
- ASSERT(connId);
- }
+ ASSERT(conn1Id);
- // There is one connection in the pool:
- // * The first get should resolve immediately
- // * The second get should should be queued
- // * The eventual third should be queued before the second
- {
- size_t connId1 = 0;
- size_t connId2 = 0;
- size_t connId3 = 0;
-
- auto connFuture1 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
- auto connFuture2 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{10});
-
- // Queue up the second future to resolve as soon as it is ready
- std::move(connFuture2).getAsync([&](StatusWithConn swConn) mutable {
- connId2 = verifyAndGetId(swConn);
+ // 1 connection in the pool, tryGet should succeed
+ auto tryGetConn = pool.tryGet(HostAndPort(), transport::kGlobalSSLMode);
+ ASSERT(tryGetConn);
+
+ // No connection available, this waits in the request queue
+ size_t conn3Id = 0;
+ pool.get_forTest(
+ HostAndPort(), Seconds(2), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn3Id = CONN2ID(swConn);
doneWith(swConn.getValue());
});
- // The first future should be immediately ready. The second should be in the queue.
- ASSERT_TRUE(connFuture1.isReady());
- ASSERT_FALSE(connFuture2.isReady());
-
- // Resolve the first future to return the connection and continue on to the second.
- decltype(connFuture1) connFuture3;
- std::move(connFuture1).getAsync([&](StatusWithConn swConn) mutable {
- // Grab our third future while our first one is being fulfilled
- connFuture3 = pool.get(HostAndPort(), transport::kGlobalSSLMode, Seconds{1});
+ ASSERT_EQ(conn3Id, 0ul);
- connId1 = verifyAndGetId(swConn);
+ // We want to wait if there are any outstanding requests (to provide fair access to the pool),
+ // so we need to call tryGet while fulfilling requests. This triggers that race by actually
+ // calling tryGet from within a callback (which works, because we drop locks). Not the cleanest
+ // way to do it, but gets us at least the code coverage we need.
+ //
+ // We run before the previous get because our deadline is 1 sec instead of 2
+ size_t conn2Id = 0;
+ pool.get_forTest(
+ HostAndPort(), Seconds(1), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn2Id = CONN2ID(swConn);
doneWith(swConn.getValue());
- });
- ASSERT(connId1);
- ASSERT_FALSE(connId2);
-
- // Since the third future has a smaller timeout than the second,
- // it should take priority over the second
- ASSERT_TRUE(connFuture3.isReady());
- ASSERT_FALSE(connFuture2.isReady());
+ swConn.getValue().reset();
- // Resolve the third future. This should trigger the second future
- std::move(connFuture3).getAsync([&](StatusWithConn swConn) mutable {
- // We run before the second future
- ASSERT_FALSE(connId2);
+ // we do have one connection
+ ASSERT_EQUALS(pool.getNumConnectionsPerHost(HostAndPort()), 1ul);
- connId3 = verifyAndGetId(swConn);
- doneWith(swConn.getValue());
+ // we fail because there's an outstanding request, even though we do have a good
+ // connection
+ // available.
+ ASSERT_FALSE(pool.tryGet(HostAndPort(), transport::kGlobalSSLMode));
});
- ASSERT_EQ(connId1, connId2);
- ASSERT_EQ(connId2, connId3);
- }
+ doneWith(*tryGetConn);
+ tryGetConn.reset();
+
+ ASSERT(conn2Id);
+ ASSERT(conn3Id);
}
} // namespace connection_pool_test_details
diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h
index 2ecb6ca5279..f41de316950 100644
--- a/src/mongo/executor/connection_pool_test_fixture.h
+++ b/src/mongo/executor/connection_pool_test_fixture.h
@@ -134,16 +134,6 @@ private:
};
/**
- * An "OutOfLineExecutor" that actually runs on the same thread of execution
- */
-class InlineOutOfLineExecutor : public OutOfLineExecutor {
-public:
- void schedule(Task task) override {
- std::move(task)(Status::OK());
- }
-};
-
-/**
* Mock for the pool implementation
*/
class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface {
@@ -157,11 +147,6 @@ public:
std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override;
- OutOfLineExecutor& getExecutor() override {
- static InlineOutOfLineExecutor _executor;
- return _executor;
- }
-
Date_t now() override;
void shutdown() override {
diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h
index d614436c49d..17f000ffaeb 100644
--- a/src/mongo/executor/connection_pool_tl.h
+++ b/src/mongo/executor/connection_pool_tl.h
@@ -56,9 +56,6 @@ public:
transport::ConnectSSLMode sslMode,
size_t generation) override;
std::shared_ptr<ConnectionPool::TimerInterface> makeTimer() override;
- OutOfLineExecutor& getExecutor() override {
- return *_reactor;
- }
Date_t now() override;
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 1ad164f5c44..a9529d1c825 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -213,18 +213,48 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
return Status::OK();
}
- auto connFuture = _pool->get(request.target, request.sslMode, request.timeout)
- .tapError([state](Status error) {
- LOG(2) << "Failed to get connection from pool for request "
- << state->request.id << ": " << error;
- });
+ // Interacting with the connection pool can involve more work than just getting a connection
+ // out. In particular, we can end up having to spin up new connections, and fulfilling promises
+ // for other requesters. Returning connections has the same issue.
+ //
+ // To work around it, we make sure to hop onto the reactor thread before getting a connection,
+ // then making sure to get back to the client thread to do the work (if on a baton). And we
+ // hook up a connection returning unique_ptr that ensures that however we exit, we always do the
+ // return on the reactor thread.
+ //
+ // TODO: get rid of this cruft once we have a connection pool that's executor aware.
+
+ auto connFuture = [&] {
+ auto conn = _pool->tryGet(request.target, request.sslMode);
+
+ if (conn) {
+ return Future<ConnectionPool::ConnectionHandle>(std::move(*conn));
+ }
+
+ return _reactor
+ ->execute([this, state, request, baton] {
+ return makeReadyFutureWith([this, request] {
+ return _pool->get(request.target, request.sslMode, request.timeout);
+ });
+ })
+ .tapError([state](Status error) {
+ LOG(2) << "Failed to get connection from pool for request " << state->request.id
+ << ": " << error;
+ });
+ }().then([this, baton](ConnectionPool::ConnectionHandle conn) {
+ auto deleter = conn.get_deleter();
+
+ // TODO: drop out this shared_ptr once we have a unique_function capable future
+ return std::make_shared<CommandState::ConnHandle>(conn.release(),
+ CommandState::Deleter{deleter, _reactor});
+ });
auto remainingWork =
[ this, state, future = std::move(pf.future), baton, onFinish = std::move(onFinish) ](
- StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable {
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
makeReadyFutureWith([&] {
- auto conn = uassertStatusOK(std::move(swConn));
- return _onAcquireConn(state, std::move(future), std::move(conn), baton);
+ auto connHandle = uassertStatusOK(std::move(swConn));
+ return _onAcquireConn(state, std::move(future), std::move(*connHandle), baton);
})
.onError([](Status error) -> StatusWith<RemoteCommandResponse> {
// The TransportLayer has, for historical reasons returned SocketException for
@@ -253,7 +283,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
// connection
std::move(connFuture)
.getAsync([ baton, reactor = _reactor.get(), rw = std::move(remainingWork) ](
- StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable {
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
baton->schedule([ rw = std::move(rw),
swConn = std::move(swConn) ](OperationContext * opCtx) mutable {
if (opCtx) {
@@ -268,7 +298,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
// otherwise we're happy to run inline
std::move(connFuture)
.getAsync([rw = std::move(remainingWork)](
- StatusWith<ConnectionPool::ConnectionHandle> swConn) mutable {
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
std::move(rw)(std::move(swConn));
});
}
@@ -281,7 +311,7 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
std::shared_ptr<CommandState> state,
Future<RemoteCommandResponse> future,
- ConnectionPool::ConnectionHandle conn,
+ CommandState::ConnHandle conn,
const BatonHandle& baton) {
if (MONGO_FAIL_POINT(networkInterfaceDiscardCommandsAfterAcquireConn)) {
conn->indicateSuccess();
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index d6f50032111..2ab75f49aaa 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -97,7 +97,17 @@ private:
Date_t deadline = RemoteCommandRequest::kNoExpirationDate;
Date_t start;
- ConnectionPool::ConnectionHandle conn;
+ struct Deleter {
+ ConnectionPool::ConnectionHandleDeleter returner;
+ transport::ReactorHandle reactor;
+
+ void operator()(ConnectionPool::ConnectionInterface* ptr) const {
+ reactor->dispatch([ ret = returner, ptr ](auto) { ret(ptr); });
+ }
+ };
+ using ConnHandle = std::unique_ptr<ConnectionPool::ConnectionInterface, Deleter>;
+
+ ConnHandle conn;
std::unique_ptr<transport::ReactorTimer> timer;
AtomicWord<bool> done;
@@ -128,7 +138,7 @@ private:
void _eraseInUseConn(const TaskExecutor::CallbackHandle& handle);
Future<RemoteCommandResponse> _onAcquireConn(std::shared_ptr<CommandState> state,
Future<RemoteCommandResponse> future,
- ConnectionPool::ConnectionHandle conn,
+ CommandState::ConnHandle conn,
const BatonHandle& baton);
std::string _instanceName;