summaryrefslogtreecommitdiff
path: root/src/mongo/executor/connection_pool.cpp
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-03 00:50:18 -0400
committerBenety Goh <benety@mongodb.com>2019-04-03 00:50:18 -0400
commitddae7b803ed19bf4bc1af1dcf0f8d4e44575736c (patch)
treeaca26a863f7807c64d978f0c0ab86499499183a3 /src/mongo/executor/connection_pool.cpp
parentedee95798fd655a8da71da29c6081d09e62a3b89 (diff)
downloadmongo-ddae7b803ed19bf4bc1af1dcf0f8d4e44575736c.tar.gz
Revert "SERVER-39814 Add OutOfLineExecutor to ConnectionPool factory interface"
This reverts commit f4b07839c904a73d28e3994af5bc902004fd4f9d.
Diffstat (limited to 'src/mongo/executor/connection_pool.cpp')
-rw-r--r--src/mongo/executor/connection_pool.cpp202
1 files changed, 112 insertions, 90 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 4cbc42dc2ec..41e4e602c65 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -140,6 +140,12 @@ public:
Future<ConnectionHandle> getConnection(Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk);
/**
+ * Gets a connection from the specific pool if a connection is available and there are no
+ * outstanding requests.
+ */
+ boost::optional<ConnectionHandle> tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk);
+
+ /**
* Triggers the shutdown procedure. This function marks the state as kInShutdown
* and calls processFailure below with the status provided. This may not immediately
* delist or destruct this pool. However, both will happen eventually as ConnectionHandles
@@ -222,21 +228,15 @@ private:
}
};
- ConnectionHandle makeHandle(ConnectionInterface* connection);
-
- void finishRefresh(stdx::unique_lock<stdx::mutex> lk,
- ConnectionInterface* connPtr,
- Status status);
-
void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn);
void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk);
void spawnConnections(stdx::unique_lock<stdx::mutex>& lk);
- // This internal helper is used both by get and by fulfillRequests and differs in that it
+ // This internal helper is used both by tryGet and by fulfillRequests and differs in that it
// skips some bookkeeping that the other callers do on their own
- ConnectionHandle tryGetConnection(const stdx::unique_lock<stdx::mutex>& lk);
+ boost::optional<ConnectionHandle> tryGetInternal(const stdx::unique_lock<stdx::mutex>& lk);
template <typename OwnershipPoolType>
typename OwnershipPoolType::mapped_type takeFromPool(
@@ -398,6 +398,23 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort,
return get(hostAndPort, transport::kGlobalSSLMode, timeout).getAsync(std::move(cb));
}
+boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::tryGet(
+ const HostAndPort& hostAndPort, transport::ConnectSSLMode sslMode) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ auto iter = _pools.find(hostAndPort);
+
+ if (iter == _pools.end()) {
+ return boost::none;
+ }
+
+ const auto& pool = iter->second;
+ invariant(pool);
+ pool->fassertSSLModeIs(sslMode);
+
+ return pool->tryGetConnection(lk);
+}
+
Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort,
transport::ConnectSSLMode sslMode,
Milliseconds timeout) {
@@ -506,14 +523,6 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) {
invariant(_state != State::kInShutdown);
- auto conn = tryGetConnection(lk);
-
- updateStateInLock();
-
- if (conn) {
- return Future<ConnectionPool::ConnectionHandle>::makeReady(std::move(conn));
- }
-
if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) {
timeout = _parent->_options.refreshTimeout;
}
@@ -526,25 +535,28 @@ Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnec
updateStateInLock();
- lk.unlock();
- _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
- fassert(20000, schedStatus);
-
- spawnConnections(lk);
- }));
+ spawnConnections(lk);
+ fulfillRequests(lk);
return std::move(pf.future);
}
-auto ConnectionPool::SpecificPool::makeHandle(ConnectionInterface* connection) -> ConnectionHandle {
- auto fun = guardCallback(
- [this](auto lk, auto connection) { returnConnection(connection, std::move(lk)); });
+boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetConnection(
+ const stdx::unique_lock<stdx::mutex>& lk) {
+ invariant(_state != State::kInShutdown);
+
+ if (_requests.size()) {
+ return boost::none;
+ }
- auto handle = ConnectionHandle(connection, fun);
- return handle;
+ auto conn = tryGetInternal(lk);
+
+ updateStateInLock();
+
+ return conn;
}
-ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(
+boost::optional<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::tryGetInternal(
const stdx::unique_lock<stdx::mutex>&) {
while (_readyPool.size()) {
@@ -570,55 +582,14 @@ ConnectionPool::ConnectionHandle ConnectionPool::SpecificPool::tryGetConnection(
// pass it to the user
connPtr->resetToUnknown();
- auto handle = makeHandle(connPtr);
- return handle;
+ return ConnectionHandle(connPtr,
+ guardCallback([this](stdx::unique_lock<stdx::mutex> localLk,
+ ConnectionPool::ConnectionInterface* conn) {
+ returnConnection(conn, std::move(localLk));
+ }));
}
- return {};
-}
-
-void ConnectionPool::SpecificPool::finishRefresh(stdx::unique_lock<stdx::mutex> lk,
- ConnectionInterface* connPtr,
- Status status) {
- auto conn = takeFromProcessingPool(connPtr);
-
- // If we're in shutdown, we don't need refreshed connections
- if (_state == State::kInShutdown)
- return;
-
- // If we've exceeded the time limit, start a new connect,
- // rather than failing all operations. We do this because the
- // various callers have their own time limit which is unrelated
- // to our internal one.
- if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
- LOG(0) << "Pending connection to host " << _hostAndPort
- << " did not complete within the connection timeout,"
- << " retrying with a new connection;" << openConnections(lk)
- << " connections to that host remain open";
- spawnConnections(lk);
- return;
- }
-
- // Pass a failure on through
- if (!status.isOK()) {
- processFailure(status, std::move(lk));
- return;
- }
-
- // If the host and port were dropped, let this lapse and spawn new connections
- if (conn->getGeneration() != _generation) {
- spawnConnections(lk);
- return;
- }
-
- // If the connection refreshed successfully, throw it back in the ready pool
- addToReady(lk, std::move(conn));
-
- lk.unlock();
- _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
- fassert(20003, schedStatus);
- fulfillRequests(lk);
- }));
+ return boost::none;
}
void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr,
@@ -660,19 +631,49 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
// Unlock in case refresh can occur immediately
lk.unlock();
connPtr->refresh(_parent->_options.refreshTimeout,
- guardCallback([this](auto lk, auto conn, auto status) {
- finishRefresh(std::move(lk), conn, status);
+ guardCallback([this](stdx::unique_lock<stdx::mutex> lk,
+ ConnectionInterface* connPtr,
+ Status status) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If we're in shutdown, we don't need refreshed connections
+ if (_state == State::kInShutdown)
+ return;
+
+ // If the connection refreshed successfully, throw it back in
+ // the ready pool
+ if (status.isOK()) {
+ // If the host and port were dropped, let this lapse
+ if (conn->getGeneration() == _generation) {
+ addToReady(lk, std::move(conn));
+ fulfillRequests(lk);
+ }
+
+ return;
+ }
+
+ // If we've exceeded the time limit, start a new connect,
+ // rather than failing all operations. We do this because the
+ // various callers have their own time limit which is unrelated
+ // to our internal one.
+ if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ log() << "Pending connection to host " << _hostAndPort
+ << " did not complete within the connection timeout,"
+ << " retrying with a new connection;" << openConnections(lk)
+ << " connections to that host remain open";
+ spawnConnections(lk);
+ return;
+ }
+
+ // Otherwise pass the failure on through
+ processFailure(status, std::move(lk));
}));
lk.lock();
} else {
// If it's fine as it is, just put it in the ready queue
addToReady(lk, std::move(conn));
-
- lk.unlock();
- _parent->_factory->getExecutor().schedule(guardCallback([this](auto lk, auto schedStatus) {
- fassert(20004, schedStatus);
- fulfillRequests(lk);
- }));
+ // TODO This should be scheduled on an executor once we have executor-aware pooling
+ fulfillRequests(lk);
}
updateStateInLock();
@@ -783,7 +784,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
// deadlock).
//
// None of the heap manipulation code throws, but it's something to keep in mind.
- auto conn = tryGetConnection(lk);
+ auto conn = tryGetInternal(lk);
if (!conn) {
break;
@@ -795,7 +796,7 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex
_requests.pop_back();
lk.unlock();
- promise.emplaceValue(std::move(conn));
+ promise.emplaceValue(std::move(*conn));
lk.lock();
updateStateInLock();
@@ -846,11 +847,32 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute
// Run the setup callback
lk.unlock();
- handle->setup(_parent->_options.refreshTimeout,
- guardCallback([this](auto lk, auto conn, auto status) {
- finishRefresh(std::move(lk), conn, status);
- }));
-
+ handle->setup(
+ _parent->_options.refreshTimeout,
+ guardCallback([this](
+ stdx::unique_lock<stdx::mutex> lk, ConnectionInterface* connPtr, Status status) {
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If we're in shutdown, we don't need this conn
+ if (_state == State::kInShutdown)
+ return;
+
+ if (status.isOK()) {
+ // If the host and port was dropped, let the connection lapse
+ if (conn->getGeneration() == _generation) {
+ addToReady(lk, std::move(conn));
+ fulfillRequests(lk);
+ }
+ } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) {
+ // If we've exceeded the time limit, restart the connect, rather than
+ // failing all operations. We do this because the various callers
+ // have their own time limit which is unrelated to our internal one.
+ spawnConnections(lk);
+ } else {
+ // If the setup failed, cascade the failure edge
+ processFailure(status, std::move(lk));
+ }
+ }));
// Note that this assumes that the refreshTimeout is sound for the
// setupTimeout