summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2015-08-05 15:53:30 -0400
committerJason Carey <jcarey@argv.me>2015-09-01 18:41:36 -0400
commit7bb78ed5a98c25009d2f5295d091b3afd3b625f6 (patch)
tree95608bc13ef9395cb83999adc5a4f812a31ecd9f
parent0a6c20bdc7128d1f13e967a7cc6219b1dfc38b6b (diff)
downloadmongo-7bb78ed5a98c25009d2f5295d091b3afd3b625f6.tar.gz
SERVER-19769 Pool connections in NetworkInterfaceASIO
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/executor/SConscript35
-rw-r--r--src/mongo/executor/async_mock_stream_factory.cpp8
-rw-r--r--src/mongo/executor/async_mock_stream_factory.h2
-rw-r--r--src/mongo/executor/async_secure_stream.cpp4
-rw-r--r--src/mongo/executor/async_secure_stream.h7
-rw-r--r--src/mongo/executor/async_stream.cpp4
-rw-r--r--src/mongo/executor/async_stream.h4
-rw-r--r--src/mongo/executor/async_stream_interface.h2
-rw-r--r--src/mongo/executor/connection_pool.cpp552
-rw-r--r--src/mongo/executor/connection_pool.h281
-rw-r--r--src/mongo/executor/connection_pool_asio.cpp182
-rw-r--r--src/mongo/executor/connection_pool_asio.h119
-rw-r--r--src/mongo/executor/connection_pool_asio_integration_test.cpp128
-rw-r--r--src/mongo/executor/connection_pool_test.cpp790
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp211
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.h163
-rw-r--r--src/mongo/executor/network_interface_asio.cpp73
-rw-r--r--src/mongo/executor/network_interface_asio.h36
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp38
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp6
21 files changed, 2625 insertions, 21 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index d3cf2de6562..fc768259d24 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -144,6 +144,7 @@ error_code("SSLHandshakeFailed", 141)
error_code("JSUncatchableError", 142)
error_code("CursorInUse", 143)
error_code("IncompatibleCatalogManager", 144)
+error_code("PooledConnectionsDropped", 145)
# Non-sequential error codes (for compatibility only)
error_code("RecvStaleConfig", 9996)
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index c3d5f578552..2976a5bee6f 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -51,6 +51,39 @@ env.Library('network_interface_mock',
'task_executor_interface',
])
+env.Library(
+ target='connection_pool',
+ source=[
+ 'connection_pool.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/util/net/hostandport',
+ ],
+)
+
+env.CppUnitTest(
+ target='connection_pool_test',
+ source=[
+ 'connection_pool_test.cpp',
+ 'connection_pool_test_fixture.cpp',
+ ],
+ LIBDEPS=[
+ 'connection_pool',
+ ],
+)
+
+env.CppIntegrationTest(
+ target='connection_pool_asio_integration_test',
+ source=[
+ 'connection_pool_asio_integration_test.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/rpc/command_status',
+ 'network_interface_asio',
+ ],
+)
+
env.CppUnitTest(
target='network_interface_mock_test',
source=[
@@ -76,6 +109,7 @@ env.Library(
'async_secure_stream_factory.cpp',
'async_stream.cpp',
'async_stream_factory.cpp',
+ 'connection_pool_asio.cpp',
'network_interface_asio.cpp',
'network_interface_asio_auth.cpp',
'network_interface_asio_command.cpp',
@@ -88,6 +122,7 @@ env.Library(
'$BUILD_DIR/mongo/db/auth/authcommon',
'$BUILD_DIR/mongo/rpc/rpc',
'$BUILD_DIR/third_party/shim_asio',
+ 'connection_pool',
'downconvert_find_and_getmore_commands',
'network_interface',
'task_executor_interface',
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp
index 37693d14047..80e289c2b8c 100644
--- a/src/mongo/executor/async_mock_stream_factory.cpp
+++ b/src/mongo/executor/async_mock_stream_factory.cpp
@@ -114,6 +114,14 @@ void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf,
_io_service->post([writeHandler, size] { writeHandler(std::error_code(), size); });
}
+void AsyncMockStreamFactory::MockStream::cancel() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ log() << "cancel() for: " << _target;
+
+ // TODO: do we actually need to mock cancel?
+ // We may want a cancel queue the same way we have read and write queues
+}
+
void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf,
StreamHandler&& readHandler) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/executor/async_mock_stream_factory.h b/src/mongo/executor/async_mock_stream_factory.h
index d5e1cc8722b..23d7fbe625c 100644
--- a/src/mongo/executor/async_mock_stream_factory.h
+++ b/src/mongo/executor/async_mock_stream_factory.h
@@ -82,6 +82,8 @@ public:
StreamState waitUntilBlocked();
+ void cancel() override;
+
std::vector<uint8_t> popWrite();
void pushRead(std::vector<uint8_t> toRead);
diff --git a/src/mongo/executor/async_secure_stream.cpp b/src/mongo/executor/async_secure_stream.cpp
index e49d6e22e1e..b06667c66c8 100644
--- a/src/mongo/executor/async_secure_stream.cpp
+++ b/src/mongo/executor/async_secure_stream.cpp
@@ -87,6 +87,10 @@ void AsyncSecureStream::_handleHandshake(std::error_code ec, const std::string&
_userHandler(make_error_code(certStatus.getStatus().code()));
}
+void AsyncSecureStream::cancel() {
+ _stream.lowest_layer().cancel();
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/async_secure_stream.h b/src/mongo/executor/async_secure_stream.h
index b24b628d4a8..c74e2d72ee2 100644
--- a/src/mongo/executor/async_secure_stream.h
+++ b/src/mongo/executor/async_secure_stream.h
@@ -46,9 +46,12 @@ public:
void connect(const asio::ip::tcp::resolver::iterator endpoints,
ConnectHandler&& connectHandler) override;
- void write(asio::const_buffer buffer, StreamHandler&& streamHandler);
- void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler);
+ void write(asio::const_buffer buffer, StreamHandler&& streamHandler) override;
+
+ void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) override;
+
+ void cancel() override;
private:
void _handleConnect(asio::ip::tcp::resolver::iterator iter);
diff --git a/src/mongo/executor/async_stream.cpp b/src/mongo/executor/async_stream.cpp
index 454ac3f0f42..39e339c7e83 100644
--- a/src/mongo/executor/async_stream.cpp
+++ b/src/mongo/executor/async_stream.cpp
@@ -56,5 +56,9 @@ void AsyncStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandle
asio::async_read(_stream, asio::buffer(buffer), std::move(streamHandler));
}
+void AsyncStream::cancel() {
+ _stream.cancel();
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/async_stream.h b/src/mongo/executor/async_stream.h
index 686d7532e20..80c07810193 100644
--- a/src/mongo/executor/async_stream.h
+++ b/src/mongo/executor/async_stream.h
@@ -39,12 +39,14 @@ class AsyncStream final : public AsyncStreamInterface {
public:
AsyncStream(asio::io_service* io_service);
- void connect(asio::ip::tcp::resolver::iterator iter, ConnectHandler&& connectHandler);
+ void connect(asio::ip::tcp::resolver::iterator iter, ConnectHandler&& connectHandler) override;
void write(asio::const_buffer buffer, StreamHandler&& streamHandler) override;
void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) override;
+ void cancel() override;
+
private:
asio::ip::tcp::socket _stream;
};
diff --git a/src/mongo/executor/async_stream_interface.h b/src/mongo/executor/async_stream_interface.h
index 7f81f9ce219..cc9373dfe0b 100644
--- a/src/mongo/executor/async_stream_interface.h
+++ b/src/mongo/executor/async_stream_interface.h
@@ -58,6 +58,8 @@ public:
virtual void read(asio::mutable_buffer buf, StreamHandler&& readHandler) = 0;
+ virtual void cancel() = 0;
+
protected:
AsyncStreamInterface() = default;
};
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
new file mode 100644
index 00000000000..a5e2d9e2e91
--- /dev/null
+++ b/src/mongo/executor/connection_pool.cpp
@@ -0,0 +1,552 @@
+/** * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/connection_pool.h"
+
+#include "mongo/stdx/memory.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/scopeguard.h"
+
+
+// One interesting implementation note herein concerns how setup() and
+// refresh() are invoked outside of the global lock, but setTimeout is not.
+// This implementation detail simplifies mocks, allowing them to return
+// synchronously sometimes, whereas having timeouts fire instantly adds little
+// value. In practice, dumping the locks is always safe (because we restrict
+// ourselves to operations over the connection).
+
+namespace mongo {
+namespace executor {
+
+/**
+ * A pool for a specific HostAndPort
+ *
+ * Pools come into existance the first time a connection is requested and
+ * go out of existence after hostTimeout passes without any of their
+ * connections being used.
+ */
+class ConnectionPool::SpecificPool {
+public:
+ SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort);
+ ~SpecificPool();
+
+ void dropConnections(stdx::unique_lock<stdx::mutex> lk);
+
+ /**
+ * Get's a connection from the specific pool. Sinks a unique_lock from the
+ * parent to preserve the lock on _mutex
+ */
+ void getConnection(const HostAndPort& hostAndPort,
+ Milliseconds timeout,
+ stdx::unique_lock<stdx::mutex> lk,
+ GetConnectionCallback cb);
+
+ /**
+ * Returns a connection to a specific pool. Sinks a unique_lock from the
+ * parent to preserve the lock on _mutex
+ */
+ void returnConnection(ConnectionInterface* connection, stdx::unique_lock<stdx::mutex> lk);
+
+private:
+ using OwnedConnection = std::unique_ptr<ConnectionInterface>;
+ using OwnershipPool = std::unordered_map<ConnectionInterface*, OwnedConnection>;
+ using Request = std::pair<Date_t, GetConnectionCallback>;
+ struct RequestComparator {
+ bool operator()(const Request& a, const Request& b) {
+ return a.first > b.first;
+ }
+ };
+
+ void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn);
+
+ void failAllRequests(const Status& status, stdx::unique_lock<stdx::mutex> lk);
+
+ void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk);
+
+ void spawnConnections(stdx::unique_lock<stdx::mutex>& lk, const HostAndPort& hostAndPort);
+
+ void shutdown();
+
+ OwnedConnection takeFromPool(OwnershipPool& pool, ConnectionInterface* connection);
+ OwnedConnection takeFromProcessingPool(ConnectionInterface* connection);
+
+ void updateStateInLock();
+
+private:
+ ConnectionPool* const _parent;
+
+ const HostAndPort _hostAndPort;
+
+ OwnershipPool _readyPool;
+ OwnershipPool _processingPool;
+ OwnershipPool _droppedProcessingPool;
+ OwnershipPool _checkedOutPool;
+
+ std::priority_queue<Request, std::vector<Request>, RequestComparator> _requests;
+
+ std::unique_ptr<TimerInterface> _requestTimer;
+ Date_t _requestTimerExpiration;
+ size_t _generation;
+ bool _inFulfillRequests;
+
+ /**
+ * The current state of the pool
+ *
+ * The pool begins in a running state. Moves to idle when no requests
+ * are pending and no connections are checked out. It finally enters
+ * shutdown after hostTimeout has passed (and waits there for current
+ * refreshes to process out).
+ *
+ * At any point a new request sets the state back to running and
+ * restarts all timers.
+ */
+ enum class State {
+ // The pool is active
+ kRunning,
+
+ // No current activity, waiting for hostTimeout to pass
+ kIdle,
+
+ // hostTimeout is passed, we're waiting for any processing
+ // connections to finish before shutting down
+ kInShutdown,
+ };
+
+ State _state;
+};
+
+// TODO: revisit these durations when we come up with a more pervasive solution
+// for NetworkInterfaceASIO's timers
+Milliseconds const ConnectionPool::kDefaultRefreshTimeout = Seconds(30);
+Milliseconds const ConnectionPool::kDefaultRefreshRequirement = Minutes(1);
+Milliseconds const ConnectionPool::kDefaultHostTimeout = Minutes(5);
+
+ConnectionPool::ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl, Options options)
+ : _options(std::move(options)), _factory(std::move(impl)) {}
+
+ConnectionPool::~ConnectionPool() = default;
+
+void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ auto iter = _pools.find(hostAndPort);
+
+ if (iter == _pools.end())
+ return;
+
+ iter->second.get()->dropConnections(std::move(lk));
+}
+
+void ConnectionPool::get(const HostAndPort& hostAndPort,
+ Milliseconds timeout,
+ GetConnectionCallback cb) {
+ SpecificPool* pool;
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ auto iter = _pools.find(hostAndPort);
+
+ if (iter == _pools.end()) {
+ auto handle = stdx::make_unique<SpecificPool>(this, hostAndPort);
+ pool = handle.get();
+ _pools[hostAndPort] = std::move(handle);
+ } else {
+ pool = iter->second.get();
+ }
+
+ invariant(pool);
+
+ pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb));
+}
+
+void ConnectionPool::returnConnection(ConnectionInterface* conn) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ auto iter = _pools.find(conn->getHostAndPort());
+
+ invariant(iter != _pools.end());
+
+ iter->second.get()->returnConnection(conn, std::move(lk));
+}
+
+ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort)
+ : _parent(parent),
+ _hostAndPort(hostAndPort),
+ _requestTimer(parent->_factory->makeTimer()),
+ _generation(0),
+ _inFulfillRequests(false),
+ _state(State::kRunning) {}
+
+ConnectionPool::SpecificPool::~SpecificPool() {
+ DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();)
+}
+
+void ConnectionPool::SpecificPool::dropConnections(stdx::unique_lock<stdx::mutex> lk) {
+ _generation++;
+
+ _readyPool.clear();
+
+ for (auto&& x : _processingPool) {
+ _droppedProcessingPool[x.first] = std::move(x.second);
+ }
+
+ _processingPool.clear();
+
+ failAllRequests(Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"),
+ std::move(lk));
+}
+
+void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort,
+ Milliseconds timeout,
+ stdx::unique_lock<stdx::mutex> lk,
+ GetConnectionCallback cb) {
+ auto expiration = _parent->_factory->now() + timeout;
+
+ _requests.push(make_pair(expiration, std::move(cb)));
+
+ updateStateInLock();
+
+ spawnConnections(lk, hostAndPort);
+ fulfillRequests(lk);
+}
+
+void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr,
+ stdx::unique_lock<stdx::mutex> lk) {
+ auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement;
+ auto now = _parent->_factory->now();
+
+ auto conn = takeFromPool(_checkedOutPool, connPtr);
+
+ if (conn->isFailed() || conn->getGeneration() != _generation) {
+ // If the connection failed or has been dropped, simply let it lapse
+ //
+ // TODO: alert via some callback if the host is bad
+ } else if (needsRefreshTP <= now) {
+ // If we need to refresh this connection
+
+ if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >=
+ _parent->_options.minConnections) {
+ // If we already have minConnections, just let the connection lapse
+ return;
+ }
+
+ _processingPool[connPtr] = std::move(conn);
+
+ // Unlock in case refresh can occur immediately
+ lk.unlock();
+ connPtr->refresh(_parent->_options.refreshTimeout,
+ [this](ConnectionInterface* connPtr, Status status) {
+ connPtr->indicateUsed();
+
+ stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
+
+ auto conn = takeFromProcessingPool(connPtr);
+
+ // If the host and port were dropped, let this lapse
+ if (conn->getGeneration() != _generation)
+ return;
+
+ // If we're in shutdown, we don't need refreshed connections
+ if (_state == State::kInShutdown)
+ return;
+
+ // If the connection refreshed successfully, throw it back in the ready
+ // pool
+ if (status.isOK()) {
+ addToReady(lk, std::move(conn));
+ return;
+ }
+
+ // Otherwise pass the failure on through
+ failAllRequests(status, std::move(lk));
+ });
+ lk.lock();
+ } else {
+ // If it's fine as it is, just put it in the ready queue
+ addToReady(lk, std::move(conn));
+ }
+
+ updateStateInLock();
+}
+
+/**
+ * Adds a live connection to the ready pool
+ */
+void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk,
+ OwnedConnection conn) {
+ auto connPtr = conn.get();
+
+ _readyPool[connPtr] = std::move(conn);
+
+ // Our strategy for refreshing connections is to check them out and
+ // immediately check them back in (which kicks off the refresh logic in
+ // returnConnection
+ connPtr->setTimeout(_parent->_options.refreshRequirement,
+ [this, connPtr]() {
+ OwnedConnection conn;
+
+ stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
+
+ conn = takeFromPool(_readyPool, connPtr);
+
+ // If we're in shutdown, we don't need to refresh connections
+ if (_state == State::kInShutdown)
+ return;
+
+ _checkedOutPool[connPtr] = std::move(conn);
+
+ returnConnection(connPtr, std::move(lk));
+ });
+
+ fulfillRequests(lk);
+}
+
+void ConnectionPool::SpecificPool::failAllRequests(const Status& status,
+ stdx::unique_lock<stdx::mutex> lk) {
+ // Move the requests out so they aren't visible
+ // in other threads
+ decltype(_requests) requestsToFail;
+
+ {
+ using std::swap;
+ swap(requestsToFail, _requests);
+ }
+
+ // Update state to reflect the lack of requests
+ updateStateInLock();
+
+ // Drop the lock and process all of the requests
+ // with the same failed status
+ lk.unlock();
+
+ while (requestsToFail.size()) {
+ requestsToFail.top().second(status);
+ requestsToFail.pop();
+ }
+}
+
+// fulfills as many outstanding requests as possible
+void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex>& lk) {
+ // If some other thread (possibly this thread) is fulfilling requests,
+ // don't keep padding the callstack.
+ if (_inFulfillRequests)
+ return;
+
+ _inFulfillRequests = true;
+ auto guard = MakeGuard([&] { _inFulfillRequests = false; });
+
+ while (_requests.size()) {
+ auto iter = _readyPool.begin();
+
+ if (iter == _readyPool.end())
+ break;
+
+ // Grab the connection and cancel its timeout
+ auto conn = std::move(iter->second);
+ _readyPool.erase(iter);
+ conn->cancelTimeout();
+
+ // Grab the request and callback
+ auto cb = std::move(_requests.top().second);
+ _requests.pop();
+
+ updateStateInLock();
+
+ auto connPtr = conn.get();
+
+ // check out the connection
+ _checkedOutPool[connPtr] = std::move(conn);
+
+ // pass it to the user
+ lk.unlock();
+ cb(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent)));
+ lk.lock();
+ }
+}
+
+// spawn enough connections to satisfy open requests and minpool, while
+// honoring maxpool
+void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mutex>& lk,
+ const HostAndPort& hostAndPort) {
+ // We want minConnections <= outstanding requests <= maxConnections
+ auto target = [&] {
+ return std::max(
+ _parent->_options.minConnections,
+ std::min(_requests.size() + _checkedOutPool.size(), _parent->_options.maxConnections));
+ };
+
+ // While all of our inflight connections are less than our target
+ while (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) {
+ // make a new connection and put it in processing
+ auto handle = _parent->_factory->makeConnection(hostAndPort, _generation);
+ auto connPtr = handle.get();
+ _processingPool[connPtr] = std::move(handle);
+
+ // Run the setup callback
+ lk.unlock();
+ connPtr->setup(_parent->_options.refreshTimeout,
+ [this](ConnectionInterface* connPtr, Status status) {
+ connPtr->indicateUsed();
+
+ stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
+
+ auto conn = takeFromProcessingPool(connPtr);
+
+ if (conn->getGeneration() != _generation) {
+ // If the host and port was dropped, let the
+ // connection lapse
+ } else if (status.isOK()) {
+ addToReady(lk, std::move(conn));
+ } else if (_requests.size()) {
+ // If the setup request failed, immediately fail
+ // all other pending requests.
+
+ failAllRequests(status, std::move(lk));
+ }
+ });
+ // Note that this assumes that the refreshTimeout is sound for the
+ // setupTimeout
+
+ lk.lock();
+ }
+}
+
+// Called every second after hostTimeout until all processing connections reap
+void ConnectionPool::SpecificPool::shutdown() {
+ stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
+
+ _state = State::kInShutdown;
+
+ // If we have processing connections, wait for them to finish or timeout
+ // before shutdown
+ if (_processingPool.size() || _droppedProcessingPool.size()) {
+ _requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); });
+
+ return;
+ }
+
+ invariant(_requests.empty());
+ invariant(_checkedOutPool.empty());
+
+ _parent->_pools.erase(_hostAndPort);
+}
+
+ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromPool(
+ OwnershipPool& pool, ConnectionInterface* connPtr) {
+ auto iter = pool.find(connPtr);
+ invariant(iter != pool.end());
+
+ auto conn = std::move(iter->second);
+ pool.erase(iter);
+ return conn;
+}
+
+ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromProcessingPool(
+ ConnectionInterface* connPtr) {
+ if (_processingPool.count(connPtr))
+ return takeFromPool(_processingPool, connPtr);
+
+ return takeFromPool(_droppedProcessingPool, connPtr);
+}
+
+
+// Updates our state and manages the request timer
+void ConnectionPool::SpecificPool::updateStateInLock() {
+ if (_requests.size()) {
+ // We have some outstanding requests, we're live
+
+ // If we were already running and the timer is the same as it was
+ // before, nothing to do
+ if (_state == State::kRunning && _requestTimerExpiration == _requests.top().first)
+ return;
+
+ _state = State::kRunning;
+
+ _requestTimer->cancelTimeout();
+
+ _requestTimerExpiration = _requests.top().first;
+
+ auto timeout = _requests.top().first - _parent->_factory->now();
+
+ // We set a timer for the most recent request, then invoke each timed
+ // out request we couldn't service
+ _requestTimer->setTimeout(
+ timeout,
+ [this]() {
+ stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
+
+ auto now = _parent->_factory->now();
+
+ while (_requests.size()) {
+ auto& x = _requests.top();
+
+ if (x.first <= now) {
+ auto cb = std::move(x.second);
+ _requests.pop();
+
+ lk.unlock();
+ cb(Status(ErrorCodes::ExceededTimeLimit,
+ "Couldn't get a connection within the time limit"));
+ lk.lock();
+ } else {
+ break;
+ }
+ }
+
+ updateStateInLock();
+ });
+ } else if (_checkedOutPool.size()) {
+ // If we have no requests, but someone's using a connection, we just
+ // hang around until the next request or a return
+
+ _requestTimer->cancelTimeout();
+ _state = State::kRunning;
+ _requestTimerExpiration = _requestTimerExpiration.max();
+ } else {
+ // If we don't have any live requests and no one has checked out connections
+
+ // If we used to be idle, just bail
+ if (_state == State::kIdle)
+ return;
+
+ _state = State::kIdle;
+
+ _requestTimer->cancelTimeout();
+
+ _requestTimerExpiration = _parent->_factory->now() + _parent->_options.hostTimeout;
+
+ auto timeout = _parent->_options.hostTimeout;
+
+ // Set the shutdown timer
+ _requestTimer->setTimeout(timeout, [this]() { shutdown(); });
+ }
+}
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
new file mode 100644
index 00000000000..d5a0332b5cb
--- /dev/null
+++ b/src/mongo/executor/connection_pool.h
@@ -0,0 +1,281 @@
+/** * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <unordered_map>
+#include <queue>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/stdx/chrono.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/util/net/hostandport.h"
+
+namespace mongo {
+namespace executor {
+
+/**
+ * The actual user visible connection pool.
+ *
+ * This pool is constructed with a DependentTypeFactoryInterface which provides the tools it
+ * needs to generate connections and manage them over time.
+ *
+ * The overall workflow here is to manage separate pools for each unique
+ * HostAndPort. See comments on the various Options for how the pool operates.
+ */
+class ConnectionPool {
+ class ConnectionHandleDeleter;
+ class SpecificPool;
+
+public:
+ class ConnectionInterface;
+ class DependentTypeFactoryInterface;
+ class TimerInterface;
+
+ using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>;
+
+ using GetConnectionCallback = stdx::function<void(StatusWith<ConnectionHandle>)>;
+
+ static const Milliseconds kDefaultRefreshTimeout;
+ static const Milliseconds kDefaultRefreshRequirement;
+ static const Milliseconds kDefaultHostTimeout;
+
+ struct Options {
+ Options() {}
+
+ /**
+ * The minimum number of connections to keep alive while the pool is in
+ * operation
+ */
+ size_t minConnections = 1;
+
+ /**
+ * The maximum number of connections to spawn for a host. This includes
+ * pending connections in setup and connections checked out of the pool
+ * as well as the obvious live connections in the pool.
+ */
+ size_t maxConnections = std::numeric_limits<size_t>::max();
+
+ /**
+ * Amount of time to wait before timing out a refresh attempt
+ */
+ Milliseconds refreshTimeout = kDefaultRefreshTimeout;
+
+ /**
+ * Amount of time a connection may be idle before it cannot be returned
+ * for a user request and must instead be checked out and refreshed
+ * before handing to a user.
+ */
+ Milliseconds refreshRequirement = kDefaultRefreshRequirement;
+
+ /**
+ * Amount of time to keep a specific pool around without any checked
+ * out connections or new requests
+ */
+ Milliseconds hostTimeout = kDefaultHostTimeout;
+ };
+
+ explicit ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl,
+ Options options = Options{});
+
+ ~ConnectionPool();
+
+ void dropConnections(const HostAndPort& hostAndPort);
+
+ void get(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb);
+
+ /**
+ * TODO add a function returning connection pool stats
+ */
+
+private:
+ void returnConnection(ConnectionInterface* connection);
+
+ // Options are set at startup and never changed at run time, so these are
+ // accessed outside the lock
+ const Options _options;
+
+ const std::unique_ptr<DependentTypeFactoryInterface> _factory;
+
+ // The global mutex for specific pool access and the generation counter
+ stdx::mutex _mutex;
+ std::unordered_map<HostAndPort, std::unique_ptr<SpecificPool>> _pools;
+};
+
+class ConnectionPool::ConnectionHandleDeleter {
+public:
+ ConnectionHandleDeleter() = default;
+ ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {}
+
+ void operator()(ConnectionInterface* connection) {
+ if (_pool && connection)
+ _pool->returnConnection(connection);
+ }
+
+private:
+ ConnectionPool* _pool = nullptr;
+};
+
+/**
+ * Interface for a basic timer
+ *
+ * Minimal interface sets a timer with a callback and cancels the timer.
+ */
+class ConnectionPool::TimerInterface {
+ MONGO_DISALLOW_COPYING(TimerInterface);
+
+public:
+ TimerInterface() = default;
+
+ using TimeoutCallback = stdx::function<void()>;
+
+ virtual ~TimerInterface() = default;
+
+ /**
+ * Sets the timeout for the timer. Setting an already set timer should
+ * override the previous timer.
+ */
+ virtual void setTimeout(Milliseconds timeout, TimeoutCallback cb) = 0;
+
+ /**
+ * It should be safe to cancel a previously canceled, or never set, timer.
+ */
+ virtual void cancelTimeout() = 0;
+};
+
+/**
+ * Interface for connection pool connections
+ *
+ * Provides a minimal interface to manipulate connections within the pool,
+ * specifically callbacks to set them up (connect + auth + whatever else),
+ * refresh them (issue some kind of ping) and manage a timer.
+ */
+class ConnectionPool::ConnectionInterface : public TimerInterface {
+ MONGO_DISALLOW_COPYING(ConnectionInterface);
+
+ friend class ConnectionPool;
+
+public:
+ ConnectionInterface() = default;
+
+ virtual ~ConnectionInterface() = default;
+
+ /**
+ * Intended to be called whenever a socket is used in a way which indicates
+ * liveliness. I.e. if an operation is executed over the connection.
+ */
+ virtual void indicateUsed() = 0;
+
+ /**
+ * Indicates that a connection has failed. This will prevent the connection
+ * from re-entering the connection pool.
+ */
+ virtual void indicateFailed() = 0;
+
+ /**
+ * The HostAndPort for the connection. This should be the same as the
+ * HostAndPort passed to DependentTypeFactoryInterface::makeConnection.
+ */
+ virtual const HostAndPort& getHostAndPort() const = 0;
+
+protected:
+ /**
+ * Making these protected makes the definitions available to override in
+ * children.
+ */
+ using SetupCallback = stdx::function<void(ConnectionInterface*, Status)>;
+ using RefreshCallback = stdx::function<void(ConnectionInterface*, Status)>;
+
+private:
+ /**
+ * Returns the last used time point for the connection
+ */
+ virtual Date_t getLastUsed() const = 0;
+
+ /**
+ * Returns true if the connection is failed. This implies that it should
+ * not be returned to the pool.
+ */
+ virtual bool isFailed() const = 0;
+
+ /**
+ * Sets up the connection. This should include connection + auth + any
+ * other associated hooks.
+ */
+ virtual void setup(Milliseconds timeout, SetupCallback cb) = 0;
+
+ /**
+ * Refreshes the connection. This should involve a network round trip and
+ * should strongly imply an active connection
+ */
+ virtual void refresh(Milliseconds timeout, RefreshCallback cb) = 0;
+
+ /**
+ * Get the generation of the connection. This is used to track whether to
+ * continue using a connection after a call to dropConnections() by noting
+ * if the generation on the specific pool is the same as the generation on
+ * a connection (if not the connection is from a previous era and should
+ * not be re-used).
+ */
+ virtual size_t getGeneration() const = 0;
+};
+
+/**
+ * Implementation interface for the connection pool
+ *
+ * This factory provides generators for connections, timers and a clock for the
+ * connection pool.
+ */
+class ConnectionPool::DependentTypeFactoryInterface {
+ MONGO_DISALLOW_COPYING(DependentTypeFactoryInterface);
+
+public:
+ DependentTypeFactoryInterface() = default;
+
+ virtual ~DependentTypeFactoryInterface() = default;
+
+ /**
+ * Makes a new connection given a host and port
+ */
+ virtual std::unique_ptr<ConnectionInterface> makeConnection(const HostAndPort& hostAndPort,
+ size_t generation) = 0;
+
+ /**
+ * Makes a new timer
+ */
+ virtual std::unique_ptr<TimerInterface> makeTimer() = 0;
+
+ /**
+ * Returns the current time point
+ */
+ virtual Date_t now() = 0;
+};
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp
new file mode 100644
index 00000000000..803d0096ddd
--- /dev/null
+++ b/src/mongo/executor/connection_pool_asio.cpp
@@ -0,0 +1,182 @@
+/** * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/connection_pool_asio.h"
+
+#include <asio.hpp>
+
+#include "mongo/executor/async_stream_factory_interface.h"
+
+#include "mongo/rpc/factory.h"
+#include "mongo/rpc/reply_interface.h"
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+namespace executor {
+namespace connection_pool_asio {
+
+ASIOTimer::ASIOTimer(asio::io_service* io_service) : _io_service(io_service), _impl(*io_service) {}
+
+void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
+ _cb = std::move(cb);
+
+ cancelTimeout();
+ _impl.expires_after(timeout);
+ _impl.async_wait([this](const asio::error_code& error) {
+ auto cb = std::move(_cb);
+
+ if (error != asio::error::operation_aborted)
+ cb();
+ });
+}
+
+void ASIOTimer::cancelTimeout() {
+ _impl.cancel();
+}
+
+ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global)
+ : _global(global),
+ _timer(&global->_impl->_io_service),
+ _hostAndPort(hostAndPort),
+ _generation(generation),
+ _impl(makeAsyncOp(this)) {}
+
+void ASIOConnection::indicateUsed() {
+ _lastUsed = _global->now();
+}
+
+void ASIOConnection::indicateFailed() {
+ _isFailed = true;
+}
+
+const HostAndPort& ASIOConnection::getHostAndPort() const {
+ return _hostAndPort;
+}
+
+Date_t ASIOConnection::getLastUsed() const {
+ return _lastUsed;
+}
+
+bool ASIOConnection::isFailed() const {
+ return _isFailed;
+}
+
+size_t ASIOConnection::getGeneration() const {
+ return _generation;
+}
+
+std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOConnection* conn) {
+ return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>(
+ conn->_global->_impl,
+ TaskExecutor::CallbackHandle(),
+ makeIsMasterRequest(conn),
+ [conn](const TaskExecutor::ResponseStatus& status) {
+ auto cb = std::move(conn->_setupCallback);
+ cb(conn, status.isOK() ? Status::OK() : status.getStatus());
+ },
+ conn->_global->now());
+}
+
+RemoteCommandRequest ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) {
+ return {conn->_hostAndPort, std::string("admin"), BSON("isMaster" << 1), BSONObj()};
+}
+
+void ASIOConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
+ _timer.setTimeout(timeout, std::move(cb));
+}
+
+void ASIOConnection::cancelTimeout() {
+ _timer.cancelTimeout();
+}
+
+void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) {
+ _setupCallback = std::move(cb);
+
+ _global->_impl->_connect(_impl.get());
+}
+
+void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
+ auto op = _impl.get();
+
+ _refreshCallback = std::move(cb);
+
+ // Actually timeout refreshes
+ setTimeout(timeout,
+ [this]() {
+ asio::post(_global->_impl->_io_service,
+ [this] { _impl->connection().stream().cancel(); });
+ });
+
+ // Our pings are isMaster's
+ auto beginStatus = op->beginCommand(makeIsMasterRequest(this));
+ if (!beginStatus.isOK()) {
+ auto cb = std::move(_refreshCallback);
+ cb(this, beginStatus);
+ }
+
+ _global->_impl->_asyncRunCommand(
+ op->command(),
+ [this, op](std::error_code ec, size_t bytes) {
+ cancelTimeout();
+
+ if (ec) {
+ return _refreshCallback(this, Status(ErrorCodes::HostUnreachable, ec.message()));
+ }
+
+ auto cb = std::move(_refreshCallback);
+ cb(this, Status::OK());
+ });
+}
+
+std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() {
+ return std::move(_impl);
+}
+
+void ASIOConnection::bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op) {
+ _impl = std::move(op);
+}
+
+ASIOImpl::ASIOImpl(NetworkInterfaceASIO* impl) : _impl(impl) {}
+
+Date_t ASIOImpl::now() {
+ return _impl->now();
+}
+
+std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() {
+ return stdx::make_unique<ASIOTimer>(&_impl->_io_service);
+}
+
+std::unique_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection(
+ const HostAndPort& hostAndPort, size_t generation) {
+ return stdx::make_unique<ASIOConnection>(hostAndPort, generation, this);
+}
+
+} // namespace connection_pool_asio
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h
new file mode 100644
index 00000000000..296a5d20859
--- /dev/null
+++ b/src/mongo/executor/connection_pool_asio.h
@@ -0,0 +1,119 @@
+/** * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/executor/connection_pool.h"
+#include "mongo/executor/network_interface_asio.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/executor/async_stream_interface.h"
+
+namespace mongo {
+namespace executor {
+namespace connection_pool_asio {
+
+/**
+ * Implements connection pool timers on top of asio
+ */
+class ASIOTimer final : public ConnectionPool::TimerInterface {
+public:
+ ASIOTimer(asio::io_service* service);
+
+ void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
+ void cancelTimeout() override;
+
+private:
+ TimeoutCallback _cb;
+ asio::io_service* const _io_service;
+ asio::steady_timer _impl;
+};
+
+/**
+ * Implements connection pool connections on top of asio
+ *
+ * Owns an async op when it's out of the pool
+ */
+class ASIOConnection final : public ConnectionPool::ConnectionInterface {
+public:
+ ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global);
+
+ void indicateUsed() override;
+ void indicateFailed() override;
+ const HostAndPort& getHostAndPort() const override;
+
+ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> releaseAsyncOp();
+ void bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op);
+
+private:
+ Date_t getLastUsed() const override;
+ bool isFailed() const override;
+
+ void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
+ void cancelTimeout() override;
+
+ void setup(Milliseconds timeout, SetupCallback cb) override;
+ void refresh(Milliseconds timeout, RefreshCallback cb) override;
+
+ size_t getGeneration() const override;
+
+ static std::unique_ptr<NetworkInterfaceASIO::AsyncOp> makeAsyncOp(ASIOConnection* conn);
+ static RemoteCommandRequest makeIsMasterRequest(ASIOConnection* conn);
+
+private:
+ SetupCallback _setupCallback;
+ RefreshCallback _refreshCallback;
+ ASIOImpl* const _global;
+ ASIOTimer _timer;
+ Date_t _lastUsed;
+ bool _isFailed = false;
+ HostAndPort _hostAndPort;
+ size_t _generation;
+ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> _impl;
+};
+
+/**
+ * Implementions connection pool implementation for asio
+ */
+class ASIOImpl final : public ConnectionPool::DependentTypeFactoryInterface {
+ friend class ASIOConnection;
+
+public:
+ ASIOImpl(NetworkInterfaceASIO* impl);
+
+ std::unique_ptr<ConnectionPool::ConnectionInterface> makeConnection(
+ const HostAndPort& hostAndPort, size_t generation) override;
+ std::unique_ptr<ConnectionPool::TimerInterface> makeTimer() override;
+
+ Date_t now() override;
+
+private:
+ NetworkInterfaceASIO* const _impl;
+};
+
+} // namespace connection_pool_asio
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp
new file mode 100644
index 00000000000..782e00aa28b
--- /dev/null
+++ b/src/mongo/executor/connection_pool_asio_integration_test.cpp
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/connection_pool_asio.h"
+
+#include "mongo/client/connection_string.h"
+#include "mongo/executor/async_stream_factory.h"
+#include "mongo/executor/network_connection_hook.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/stdx/future.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/integration_test.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+class MyNetworkConnectionHook : public NetworkConnectionHook {
+public:
+ Status validateHost(const HostAndPort& remoteHost,
+ const RemoteCommandResponse& isMasterReply) override {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ _count++;
+
+ return Status::OK();
+ }
+
+ StatusWith<boost::optional<RemoteCommandRequest>> makeRequest(
+ const HostAndPort& remoteHost) override {
+ return StatusWith<boost::optional<RemoteCommandRequest>>(boost::none);
+ }
+
+ Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) override {
+ MONGO_UNREACHABLE;
+ }
+
+ static int count() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ return _count;
+ }
+
+private:
+ static stdx::mutex _mutex;
+ static int _count;
+};
+
+stdx::mutex MyNetworkConnectionHook::_mutex;
+int MyNetworkConnectionHook::_count = 0;
+
+TEST(ConnectionPoolASIO, TestPing) {
+ auto fixture = unittest::getFixtureConnectionString();
+
+ NetworkInterfaceASIO::Options options;
+ options.connectionPoolOptions.maxConnections = 10;
+
+ NetworkInterfaceASIO net{stdx::make_unique<AsyncStreamFactory>(),
+ stdx::make_unique<MyNetworkConnectionHook>(),
+ options};
+
+ net.startup();
+ auto guard = MakeGuard([&] { net.shutdown(); });
+
+ const int N = 500;
+
+ std::array<stdx::thread, N> threads;
+
+ for (auto& thread : threads) {
+ thread = stdx::thread([&net, &fixture]() {
+ auto status = Status::OK();
+ stdx::promise<void> result;
+
+ net.startCommand(TaskExecutor::CallbackHandle(),
+ RemoteCommandRequest{
+ fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()},
+ [&result, &status](StatusWith<RemoteCommandResponse> resp) {
+ if (!resp.isOK()) {
+ status = std::move(resp.getStatus());
+ }
+ result.set_value();
+ });
+
+ result.get_future().get();
+
+ ASSERT_OK(status);
+ });
+ }
+
+ for (auto&& thread : threads) {
+ thread.join();
+ }
+
+ ASSERT_EQ(MyNetworkConnectionHook::count(), 10);
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
new file mode 100644
index 00000000000..be951ccbafc
--- /dev/null
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -0,0 +1,790 @@
+/** * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/connection_pool_test_fixture.h"
+
+#include "mongo/executor/connection_pool.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/stdx/future.h"
+
+namespace mongo {
+namespace executor {
+namespace connection_pool_test_details {
+
+class ConnectionPoolTest : public unittest::Test {
+public:
+protected:
+ void setUp() override {}
+
+ void tearDown() override {
+ ConnectionImpl::clear();
+ TimerImpl::clear();
+ }
+
+private:
+};
+
+#define CONN2ID(swConn) \
+ [](StatusWith<ConnectionPool::ConnectionHandle> & swConn) { \
+ ASSERT(swConn.isOK()); \
+ return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \
+ }(swConn)
+
+/**
+ * Verify that we get the same connection if we grab one, return it and grab
+ * another.
+ */
+TEST_F(ConnectionPoolTest, SameConn) {
+ ConnectionPool pool(stdx::make_unique<PoolImpl>());
+
+ // Grab and stash an id for the first request
+ size_t conn1Id = 0;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(
+ HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); });
+
+ // Grab and stash an id for the second request
+ size_t conn2Id = 0;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(
+ HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); });
+
+ // Verify that we hit them, and that they're the same
+ ASSERT(conn1Id);
+ ASSERT(conn2Id);
+ ASSERT_EQ(conn1Id, conn2Id);
+}
+
+/**
+ * Verify that a failed connection isn't returned to the pool
+ */
+TEST_F(ConnectionPoolTest, FailedConnDifferentConn) {
+ ConnectionPool pool(stdx::make_unique<PoolImpl>());
+
+ // Grab the first connection and indicate that it failed
+ size_t conn1Id = 0;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn1Id = CONN2ID(swConn);
+ swConn.getValue()->indicateFailed();
+ });
+
+ // Grab the second id
+ size_t conn2Id = 0;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(
+ HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); });
+
+ // Verify that we hit them, and that they're different
+ ASSERT(conn1Id);
+ ASSERT(conn2Id);
+ ASSERT_NE(conn1Id, conn2Id);
+}
+
+/**
+ * Verify that providing different host and ports gives you different
+ * connections.
+ */
+TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) {
+ ConnectionPool pool(stdx::make_unique<PoolImpl>());
+
+ // Conn 1 from port 30000
+ size_t conn1Id = 0;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(
+ HostAndPort("localhost:30000"),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); });
+
+ // Conn 2 from port 30001
+ size_t conn2Id = 0;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(
+ HostAndPort("localhost:30001"),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); });
+
+ // Hit them and not the same
+ ASSERT(conn1Id);
+ ASSERT(conn2Id);
+ ASSERT_NE(conn1Id, conn2Id);
+}
+
+/**
+ * Verify that not returning handle's to the pool spins up new connections.
+ */
+TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) {
+ ConnectionPool pool(stdx::make_unique<PoolImpl>());
+
+ // Get the first connection, move it out rather than letting it return
+ ConnectionPool::ConnectionHandle conn1;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ conn1 = std::move(swConn.getValue());
+ });
+
+ // Get the second connection, move it out rather than letting it return
+ ConnectionPool::ConnectionHandle conn2;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+ conn2 = std::move(swConn.getValue());
+ });
+
+ // Verify that the two connections are different
+ ASSERT_NE(conn1.get(), conn2.get());
+}
+
+/**
+ * Verify that timing out on setup works as expected (a bad status is
+ * returned).
+ *
+ * Note that the lack of pushSetup() calls delays the get.
+ */
+TEST_F(ConnectionPoolTest, TimeoutOnSetup) {
+ ConnectionPool pool(stdx::make_unique<PoolImpl>());
+
+ bool notOk = false;
+
+ auto now = Date_t::now();
+
+ PoolImpl::setNow(now);
+
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { notOk = !swConn.isOK(); });
+
+ PoolImpl::setNow(now + Milliseconds(5000));
+
+ ASSERT(notOk);
+}
+
+/**
+ * Verify that refresh callbacks happen at the appropriate moments.
+ */
+TEST_F(ConnectionPoolTest, refreshHappens) {
+ bool refreshedA = false;
+ bool refreshedB = false;
+ ConnectionImpl::pushRefresh([&]() {
+ refreshedA = true;
+ return Status::OK();
+ });
+
+ ConnectionImpl::pushRefresh([&]() {
+ refreshedB = true;
+ return Status::OK();
+ });
+
+ ConnectionPool::Options options;
+ options.refreshRequirement = Milliseconds(1000);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), options);
+
+ auto now = Date_t::now();
+
+ PoolImpl::setNow(now);
+
+ // Get a connection
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); });
+
+ // After 1 second, one refresh has occurred
+ PoolImpl::setNow(now + Milliseconds(1000));
+ ASSERT(refreshedA);
+ ASSERT(!refreshedB);
+
+ // After 1.5 seconds, the second refresh still hasn't triggered
+ PoolImpl::setNow(now + Milliseconds(1500));
+ ASSERT(!refreshedB);
+
+ // At 2 seconds, the second refresh has triggered
+ PoolImpl::setNow(now + Milliseconds(2000));
+ ASSERT(refreshedB);
+}
+
+/**
+ * Verify that refresh can timeout.
+ */
+TEST_F(ConnectionPoolTest, refreshTimeoutHappens) {
+ ConnectionPool::Options options;
+ options.refreshRequirement = Milliseconds(1000);
+ options.refreshTimeout = Milliseconds(2000);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), options);
+
+ auto now = Date_t::now();
+
+ PoolImpl::setNow(now);
+
+ size_t conn1Id = 0;
+
+ // Grab a connection and verify it's good
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(
+ HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); });
+
+ PoolImpl::setNow(now + Milliseconds(500));
+
+ size_t conn2Id = 0;
+ // Make sure we still get the first one
+ pool.get(
+ HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); });
+ ASSERT_EQ(conn1Id, conn2Id);
+
+ // This should trigger a refresh, but not time it out. So now we have one
+ // connection sitting in refresh.
+ PoolImpl::setNow(now + Milliseconds(2000));
+ bool reachedA = false;
+
+ // This will wait because we have a refreshing connection, so it'll wait to
+ // see if that pans out. In this case, we'll get a failure on timeout.
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(10000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(!swConn.isOK());
+
+ reachedA = true;
+ });
+ ASSERT(!reachedA);
+
+ // Let the refresh timeout
+ PoolImpl::setNow(now + Milliseconds(4000));
+
+ bool reachedB = false;
+
+ // Make sure we can get a new connection
+ pool.get(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(CONN2ID(swConn), conn1Id);
+ reachedB = true;
+ });
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+}
+
+/**
+ * Verify that requests are served in expiration order, not insertion order
+ */
+TEST_F(ConnectionPoolTest, requestsServedByUrgency) {
+ ConnectionPool pool(stdx::make_unique<PoolImpl>());
+
+ bool reachedA = false;
+ bool reachedB = false;
+
+ ConnectionPool::ConnectionHandle conn;
+
+ pool.get(HostAndPort(),
+ Milliseconds(2000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ reachedA = true;
+ });
+
+ pool.get(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ reachedB = true;
+
+ conn = std::move(swConn.getValue());
+ });
+
+ ConnectionImpl::pushSetup(Status::OK());
+
+ // Note thate we hit the 1 second request, but not the 2 second
+ ASSERT(reachedB);
+ ASSERT(!reachedA);
+
+ conn.reset();
+
+ // Now that we've returned the connection, we see the second has been
+ // called
+ ASSERT(reachedA);
+}
+
+/**
+ * Verify that we respect maxConnections
+ */
+TEST_F(ConnectionPoolTest, maxPoolRespected) {
+ ConnectionPool::Options options;
+ options.minConnections = 1;
+ options.maxConnections = 2;
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), options);
+
+ ConnectionPool::ConnectionHandle conn1;
+ ConnectionPool::ConnectionHandle conn2;
+ ConnectionPool::ConnectionHandle conn3;
+
+ // Make 3 requests, each which keep their connection (don't return it to
+ // the pool)
+ pool.get(HostAndPort(),
+ Milliseconds(3000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn3 = std::move(swConn.getValue());
+ });
+ pool.get(HostAndPort(),
+ Milliseconds(2000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn2 = std::move(swConn.getValue());
+ });
+ pool.get(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn1 = std::move(swConn.getValue());
+ });
+
+ ConnectionImpl::pushSetup(Status::OK());
+ ConnectionImpl::pushSetup(Status::OK());
+ ConnectionImpl::pushSetup(Status::OK());
+
+ // Note that only two have run
+ ASSERT(conn1);
+ ASSERT(conn2);
+ ASSERT(!conn3);
+
+ // Return 1
+ ConnectionPool::ConnectionInterface* conn1Ptr = conn1.get();
+ conn1.reset();
+
+ // Verify that it's the one that pops out for request 3
+ ASSERT_EQ(conn1Ptr, conn3.get());
+}
+
+/**
+ * Verify that minConnections is respected
+ */
+TEST_F(ConnectionPoolTest, minPoolRespected) {
+ ConnectionPool::Options options;
+ options.minConnections = 2;
+ options.maxConnections = 3;
+ options.refreshRequirement = Milliseconds(1000);
+ options.refreshTimeout = Milliseconds(2000);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), options);
+
+ auto now = Date_t::now();
+
+ PoolImpl::setNow(now);
+
+ ConnectionPool::ConnectionHandle conn1;
+ ConnectionPool::ConnectionHandle conn2;
+ ConnectionPool::ConnectionHandle conn3;
+
+ // Grab one connection without returning it
+ pool.get(HostAndPort(),
+ Milliseconds(1000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn1 = std::move(swConn.getValue());
+ });
+
+ bool reachedA = false;
+ bool reachedB = false;
+ bool reachedC = false;
+
+ ConnectionImpl::pushSetup([&]() {
+ reachedA = true;
+ return Status::OK();
+ });
+ ConnectionImpl::pushSetup([&]() {
+ reachedB = true;
+ return Status::OK();
+ });
+ ConnectionImpl::pushSetup([&]() {
+ reachedC = true;
+ return Status::OK();
+ });
+
+ // Verify that two setups were invoked, even without two requests (the
+ // minConnections == 2)
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+ ASSERT(!reachedC);
+
+ // Two more get's without returns
+ pool.get(HostAndPort(),
+ Milliseconds(2000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn2 = std::move(swConn.getValue());
+ });
+ pool.get(HostAndPort(),
+ Milliseconds(3000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(swConn.isOK());
+
+ conn3 = std::move(swConn.getValue());
+ });
+
+ ASSERT(conn2);
+ ASSERT(conn3);
+
+ reachedA = false;
+ reachedB = false;
+ reachedC = false;
+
+ ConnectionImpl::pushRefresh([&]() {
+ reachedA = true;
+ return Status::OK();
+ });
+ ConnectionImpl::pushRefresh([&]() {
+ reachedB = true;
+ return Status::OK();
+ });
+ ConnectionImpl::pushRefresh([&]() {
+ reachedC = true;
+ return Status::OK();
+ });
+
+ // Return each connection over 1, 2 and 3 ms
+ PoolImpl::setNow(now + Milliseconds(1));
+ conn1->indicateUsed();
+ conn1.reset();
+
+ PoolImpl::setNow(now + Milliseconds(2));
+ conn2->indicateUsed();
+ conn2.reset();
+
+ PoolImpl::setNow(now + Milliseconds(3));
+ conn3->indicateUsed();
+ conn3.reset();
+
+ // Jump 5 seconds and verify that refreshes only two refreshes occurred
+ PoolImpl::setNow(now + Milliseconds(5000));
+
+ ASSERT(reachedA);
+ ASSERT(reachedB);
+ ASSERT(!reachedC);
+}
+
+
+/**
+ * Verify that the hostTimeout is respected. This implies that an idle
+ * hostAndPort drops it's connections.
+ */
+TEST_F(ConnectionPoolTest, hostTimeoutHappens) {
+ ConnectionPool::Options options;
+ options.refreshRequirement = Milliseconds(5000);
+ options.refreshTimeout = Milliseconds(5000);
+ options.hostTimeout = Milliseconds(1000);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), options);
+
+ auto now = Date_t::now();
+
+ PoolImpl::setNow(now);
+
+ size_t connId = 0;
+
+ bool reachedA = false;
+ // Grab 1 connection and return it
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ connId = CONN2ID(swConn);
+ reachedA = true;
+ });
+
+ ASSERT(reachedA);
+
+ // Jump pass the hostTimeout
+ PoolImpl::setNow(now + Milliseconds(1000));
+
+ bool reachedB = false;
+
+ // Verify that a new connection was spawned
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(connId, CONN2ID(swConn));
+ reachedB = true;
+ });
+
+ ASSERT(reachedB);
+}
+
+
+/**
+ * Verify that the hostTimeout happens, but that continued gets delay
+ * activation.
+ */
+TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) {
+ ConnectionPool::Options options;
+ options.refreshRequirement = Milliseconds(5000);
+ options.refreshTimeout = Milliseconds(5000);
+ options.hostTimeout = Milliseconds(1000);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), options);
+
+ auto now = Date_t::now();
+
+ PoolImpl::setNow(now);
+
+ size_t connId = 0;
+
+ bool reachedA = false;
+
+ // Grab and return
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ connId = CONN2ID(swConn);
+ reachedA = true;
+ });
+ ASSERT(reachedA);
+
+ // Jump almost up to the hostTimeout
+ PoolImpl::setNow(now + Milliseconds(999));
+
+ bool reachedB = false;
+ // Same connection
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_EQ(connId, CONN2ID(swConn));
+ reachedB = true;
+ });
+ ASSERT(reachedB);
+
+ // Now we've timed out
+ PoolImpl::setNow(now + Milliseconds(2000));
+
+ bool reachedC = false;
+ // Different id
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(connId, CONN2ID(swConn));
+ reachedC = true;
+ });
+
+ ASSERT(reachedC);
+}
+
+
+/**
+ * Verify that the hostTimeout happens and that having a connection checked out
+ * delays things
+ */
+TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) {
+ ConnectionPool::Options options;
+ options.refreshRequirement = Milliseconds(5000);
+ options.refreshTimeout = Milliseconds(5000);
+ options.hostTimeout = Milliseconds(1000);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), options);
+
+ auto now = Date_t::now();
+
+ PoolImpl::setNow(now);
+
+ ConnectionPool::ConnectionHandle conn1;
+ size_t conn1Id = 0;
+ size_t conn2Id = 0;
+
+ // save 1 connection
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn1Id = CONN2ID(swConn);
+ conn1 = std::move(swConn.getValue());
+ });
+
+ ASSERT(conn1Id);
+
+ // return the second
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(
+ HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); });
+
+ ASSERT(conn2Id);
+
+ // hostTimeout has passed
+ PoolImpl::setNow(now + Milliseconds(1000));
+
+ bool reachedA = false;
+
+ // conn 2 is still there
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_EQ(conn2Id, CONN2ID(swConn));
+ reachedA = true;
+ });
+
+ ASSERT(reachedA);
+
+ // return conn 1
+ conn1.reset();
+
+ // expire the pool
+ PoolImpl::setNow(now + Milliseconds(2000));
+
+ bool reachedB = false;
+
+ // make sure that this is a new id
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(conn1Id, CONN2ID(swConn));
+ ASSERT_NE(conn2Id, CONN2ID(swConn));
+ reachedB = true;
+ });
+ ASSERT(reachedB);
+}
+
+/**
+ * Verify that drop connections works
+ */
+TEST_F(ConnectionPoolTest, dropConnections) {
+ ConnectionPool::Options options;
+
+ // ensure that only 1 connection is floating around
+ options.maxConnections = 1;
+ options.refreshRequirement = Seconds(1);
+ options.refreshTimeout = Seconds(2);
+ ConnectionPool pool(stdx::make_unique<PoolImpl>(), options);
+
+ auto now = Date_t::now();
+ PoolImpl::setNow(now);
+
+ // Grab the first connection id
+ size_t conn1Id = 0;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(
+ HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); });
+ ASSERT(conn1Id);
+
+ // Grab it and this time keep it out of the pool
+ ConnectionPool::ConnectionHandle handle;
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_EQ(CONN2ID(swConn), conn1Id);
+ handle = std::move(swConn.getValue());
+ });
+
+ ASSERT(handle);
+
+ bool reachedA = false;
+
+ // Queue up a request. This won't fire until we drop connections, then it
+ // will fail.
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT(!swConn.isOK());
+ reachedA = true;
+ });
+ ASSERT(!reachedA);
+
+ // fails the previous get
+ pool.dropConnections(HostAndPort());
+
+ ASSERT(reachedA);
+
+ // return the connection
+ handle.reset();
+
+ // Make sure that a new connection request properly disposed of the gen1
+ // connection
+ size_t conn2Id = 0;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ conn2Id = CONN2ID(swConn);
+
+ ASSERT_NE(conn2Id, conn1Id);
+ });
+ ASSERT(conn2Id);
+
+ // Push conn2 into refresh
+ PoolImpl::setNow(now + Milliseconds(1500));
+
+ // drop the connections
+ pool.dropConnections(HostAndPort());
+
+ // refresh still pending
+ PoolImpl::setNow(now + Milliseconds(2500));
+
+ // Verify that a new connection came out, despite the gen2 connection still
+ // being pending
+ bool reachedB = false;
+ ConnectionImpl::pushSetup(Status::OK());
+ pool.get(HostAndPort(),
+ Milliseconds(5000),
+ [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+ ASSERT_NE(CONN2ID(swConn), conn2Id);
+ reachedB = true;
+ });
+
+ ASSERT(reachedB);
+}
+
+} // namespace connection_pool_test_details
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
new file mode 100644
index 00000000000..860275ebf39
--- /dev/null
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -0,0 +1,211 @@
+/** * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/connection_pool_test_fixture.h"
+
+#include "mongo/stdx/memory.h"
+
+namespace mongo {
+namespace executor {
+namespace connection_pool_test_details {
+
+TimerImpl::TimerImpl(PoolImpl* global) : _global(global) {}
+
+TimerImpl::~TimerImpl() {
+ cancelTimeout();
+}
+
+void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
+ _cb = std::move(cb);
+ _expiration = _global->now() + timeout;
+
+ _timers.emplace(this);
+}
+
+void TimerImpl::cancelTimeout() {
+ _timers.erase(this);
+}
+
+void TimerImpl::clear() {
+ _timers.clear();
+}
+
+void TimerImpl::fireIfNecessary() {
+ auto now = PoolImpl().now();
+
+ auto timers = _timers;
+
+ for (auto&& x : timers) {
+ if (_timers.count(x) && (x->_expiration <= now)) {
+ x->_cb();
+ }
+ }
+}
+
+std::set<TimerImpl*> TimerImpl::_timers;
+
+ConnectionImpl::ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global)
+ : _hostAndPort(hostAndPort),
+ _timer(global),
+ _global(global),
+ _id(_idCounter++),
+ _generation(generation) {}
+
+void ConnectionImpl::indicateUsed() {
+ _lastUsed = _global->now();
+}
+
+void ConnectionImpl::indicateFailed() {
+ _isFailed = true;
+}
+
+size_t ConnectionImpl::id() const {
+ return _id;
+}
+
+const HostAndPort& ConnectionImpl::getHostAndPort() const {
+ return _hostAndPort;
+}
+
+void ConnectionImpl::clear() {
+ _setupQueue.clear();
+ _refreshQueue.clear();
+ _pushSetupQueue.clear();
+ _pushRefreshQueue.clear();
+}
+
+void ConnectionImpl::pushSetup(PushSetupCallback status) {
+ _pushSetupQueue.push_back(status);
+
+ if (_setupQueue.size()) {
+ _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()());
+ _setupQueue.pop_front();
+ _pushSetupQueue.pop_front();
+ }
+}
+
+void ConnectionImpl::pushSetup(Status status) {
+ pushSetup([status]() { return status; });
+}
+
+void ConnectionImpl::pushRefresh(PushRefreshCallback status) {
+ _pushRefreshQueue.push_back(status);
+
+ if (_refreshQueue.size()) {
+ _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()());
+ _refreshQueue.pop_front();
+ _pushRefreshQueue.pop_front();
+ }
+}
+
+void ConnectionImpl::pushRefresh(Status status) {
+ pushRefresh([status]() { return status; });
+}
+
+Date_t ConnectionImpl::getLastUsed() const {
+ return _lastUsed;
+}
+
+bool ConnectionImpl::isFailed() const {
+ return _isFailed;
+}
+
+void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
+ _timer.setTimeout(timeout, cb);
+}
+
+void ConnectionImpl::cancelTimeout() {
+ _timer.cancelTimeout();
+}
+
+void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) {
+ _setupCallback = std::move(cb);
+
+ _timer.setTimeout(
+ timeout,
+ [this] { _setupCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); });
+
+ _setupQueue.push_back(this);
+
+ if (_pushSetupQueue.size()) {
+ _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()());
+ _setupQueue.pop_front();
+ _pushSetupQueue.pop_front();
+ }
+}
+
+void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) {
+ _refreshCallback = std::move(cb);
+
+ _timer.setTimeout(
+ timeout,
+ [this] { _refreshCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); });
+
+ _refreshQueue.push_back(this);
+
+ if (_pushRefreshQueue.size()) {
+ _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()());
+ _refreshQueue.pop_front();
+ _pushRefreshQueue.pop_front();
+ }
+}
+
+size_t ConnectionImpl::getGeneration() const {
+ return _generation;
+}
+
+std::deque<ConnectionImpl::PushSetupCallback> ConnectionImpl::_pushSetupQueue;
+std::deque<ConnectionImpl::PushRefreshCallback> ConnectionImpl::_pushRefreshQueue;
+std::deque<ConnectionImpl*> ConnectionImpl::_setupQueue;
+std::deque<ConnectionImpl*> ConnectionImpl::_refreshQueue;
+size_t ConnectionImpl::_idCounter = 1;
+
+std::unique_ptr<ConnectionPool::ConnectionInterface> PoolImpl::makeConnection(
+ const HostAndPort& hostAndPort, size_t generation) {
+ return stdx::make_unique<ConnectionImpl>(hostAndPort, generation, this);
+}
+
+std::unique_ptr<ConnectionPool::TimerInterface> PoolImpl::makeTimer() {
+ return stdx::make_unique<TimerImpl>(this);
+}
+
+Date_t PoolImpl::now() {
+ return _now.get_value_or(Date_t::now());
+}
+
+void PoolImpl::setNow(Date_t now) {
+ _now = now;
+ TimerImpl::fireIfNecessary();
+}
+
+boost::optional<Date_t> PoolImpl::_now;
+
+} // namespace connection_pool_test_details
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h
new file mode 100644
index 00000000000..684b458fd28
--- /dev/null
+++ b/src/mongo/executor/connection_pool_test_fixture.h
@@ -0,0 +1,163 @@
+/** * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include <deque>
+#include <memory>
+#include <set>
+
+#include "mongo/executor/connection_pool.h"
+
+namespace mongo {
+namespace executor {
+namespace connection_pool_test_details {
+
+class ConnectionPoolTest;
+class PoolImpl;
+
+/**
+ * Mock interface for the timer
+ */
+class TimerImpl final : public ConnectionPool::TimerInterface {
+public:
+ TimerImpl(PoolImpl* global);
+ ~TimerImpl() override;
+
+ void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
+
+ void cancelTimeout() override;
+
+ // launches all timers for whom now() has passed
+ static void fireIfNecessary();
+
+ // dump all timers
+ static void clear();
+
+private:
+ static std::set<TimerImpl*> _timers;
+
+ TimeoutCallback _cb;
+ PoolImpl* _global;
+ Date_t _expiration;
+};
+
+/**
+ * Mock interface for the connections
+ *
+ * pushSetup() and pushRefresh() calls can be queued up ahead of time (in which
+ * case callbacks immediately fire), or calls queue up and pushSetup() and
+ * pushRefresh() fire as they're called.
+ */
+class ConnectionImpl final : public ConnectionPool::ConnectionInterface {
+public:
+ using PushSetupCallback = stdx::function<Status()>;
+ using PushRefreshCallback = stdx::function<Status()>;
+
+ ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global);
+
+ size_t id() const;
+
+ void indicateUsed() override;
+
+ void indicateFailed() override;
+
+ const HostAndPort& getHostAndPort() const override;
+
+ // Dump all connection callbacks
+ static void clear();
+
+ // Push either a callback that returns the status for a setup, or just the Status
+ static void pushSetup(PushSetupCallback status);
+ static void pushSetup(Status status);
+
+ // Push either a callback that returns the status for a refresh, or just the Status
+ static void pushRefresh(PushRefreshCallback status);
+ static void pushRefresh(Status status);
+
+private:
+ Date_t getLastUsed() const override;
+
+ bool isFailed() const override;
+
+ void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
+
+ void cancelTimeout() override;
+
+ void setup(Milliseconds timeout, SetupCallback cb) override;
+
+ void refresh(Milliseconds timeout, RefreshCallback cb) override;
+
+ size_t getGeneration() const override;
+
+ HostAndPort _hostAndPort;
+ Date_t _lastUsed;
+ bool _isFailed = false;
+ SetupCallback _setupCallback;
+ RefreshCallback _refreshCallback;
+ TimerImpl _timer;
+ PoolImpl* _global;
+ size_t _id;
+ size_t _generation;
+
+ // Answer queues
+ static std::deque<PushSetupCallback> _pushSetupQueue;
+ static std::deque<PushRefreshCallback> _pushRefreshQueue;
+
+ // Question queues
+ static std::deque<ConnectionImpl*> _setupQueue;
+ static std::deque<ConnectionImpl*> _refreshQueue;
+
+ static size_t _idCounter;
+};
+
+/**
+ * Mock for the pool implementation
+ */
+class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface {
+ friend class ConnectionImpl;
+
+public:
+ std::unique_ptr<ConnectionPool::ConnectionInterface> makeConnection(
+ const HostAndPort& hostAndPort, size_t generation) override;
+
+ std::unique_ptr<ConnectionPool::TimerInterface> makeTimer() override;
+
+ Date_t now() override;
+
+ /**
+ * setNow() can be used to fire all timers that have passed a point in time
+ */
+ static void setNow(Date_t now);
+
+private:
+ ConnectionPool* _pool = nullptr;
+
+ static boost::optional<Date_t> _now;
+};
+
+} // namespace connection_pool_test_details
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index 3e43011c7b2..49e6e2475d8 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -36,6 +36,7 @@
#include "mongo/executor/async_stream_interface.h"
#include "mongo/executor/async_stream_factory.h"
+#include "mongo/executor/connection_pool_asio.h"
#include "mongo/stdx/chrono.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -47,18 +48,22 @@ namespace mongo {
namespace executor {
NetworkInterfaceASIO::NetworkInterfaceASIO(
- std::unique_ptr<AsyncStreamFactoryInterface> streamFactory)
- : NetworkInterfaceASIO(std::move(streamFactory), nullptr) {}
+ std::unique_ptr<AsyncStreamFactoryInterface> streamFactory, Options options)
+ : NetworkInterfaceASIO(std::move(streamFactory), nullptr, std::move(options)) {}
NetworkInterfaceASIO::NetworkInterfaceASIO(
std::unique_ptr<AsyncStreamFactoryInterface> streamFactory,
- std::unique_ptr<NetworkConnectionHook> networkConnectionHook)
- : _io_service(),
+ std::unique_ptr<NetworkConnectionHook> networkConnectionHook,
+ Options options)
+ : _options(std::move(options)),
+ _io_service(),
_hook(std::move(networkConnectionHook)),
_resolver(_io_service),
_state(State::kReady),
_streamFactory(std::move(streamFactory)),
- _isExecutorRunnable(false) {}
+ _isExecutorRunnable(false),
+ _connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this),
+ _options.connectionPoolOptions) {}
std::string NetworkInterfaceASIO::getDiagnosticString() {
str::stream output;
@@ -126,16 +131,62 @@ Date_t NetworkInterfaceASIO::now() {
void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish) {
- auto ownedOp = stdx::make_unique<AsyncOp>(this, cbHandle, request, onFinish, now());
-
- AsyncOp* op = ownedOp.get();
-
{
stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgress.emplace(op, std::move(ownedOp));
+ _inGetConnection.push_back(cbHandle);
}
- asio::post(_io_service, [this, op]() { _startCommand(op); });
+ auto startTime = now();
+
+ auto nextStep = [this, startTime, cbHandle, request, onFinish](
+ StatusWith<ConnectionPool::ConnectionHandle> swConn) {
+
+ if (!swConn.isOK()) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+
+ auto& v = _inGetConnection;
+ auto iter = std::find(v.begin(), v.end(), cbHandle);
+ if (iter != v.end())
+ v.erase(iter);
+ }
+
+ onFinish(swConn.getStatus());
+ return;
+ }
+
+ auto conn = static_cast<connection_pool_asio::ASIOConnection*>(swConn.getValue().get());
+
+ auto ownedOp = conn->releaseAsyncOp();
+ AsyncOp* op = ownedOp.get();
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+
+ auto iter = std::find(_inGetConnection.begin(), _inGetConnection.end(), cbHandle);
+
+ // If we didn't find the request, we've been canceled
+ if (iter == _inGetConnection.end())
+ return;
+
+ _inGetConnection.erase(iter);
+ _inProgress.emplace(op, std::move(ownedOp));
+ }
+
+ op->_cbHandle = cbHandle;
+ op->_request = request;
+ op->_onFinish = onFinish;
+ op->_connectionPoolHandle = std::move(swConn.getValue());
+ op->_start = startTime;
+
+ _beginCommunication(op);
+ };
+
+ // TODO: thread some higher level timeout through, rather than 10 seconds,
+ // once we make timeouts pervasive in this api.
+ asio::post(
+ _io_service,
+ [this, request, nextStep] { _connectionPool.get(request.target, Seconds(10), nextStep); });
}
void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index f36cbf12e0d..c5679ed6be9 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -35,9 +35,11 @@
#include <string>
#include <system_error>
#include <unordered_map>
+#include <vector>
#include "mongo/base/status.h"
#include "mongo/base/system_error.h"
+#include "mongo/executor/connection_pool.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface.h"
#include "mongo/executor/remote_command_request.h"
@@ -53,6 +55,12 @@
namespace mongo {
namespace executor {
+namespace connection_pool_asio {
+class ASIOConnection;
+class ASIOTimer;
+class ASIOImpl;
+}
+
class AsyncStreamFactoryInterface;
class AsyncStreamInterface;
@@ -61,10 +69,20 @@ class AsyncStreamInterface;
* Kohlhoff's ASIO library instead of existing MongoDB networking primitives.
*/
class NetworkInterfaceASIO final : public NetworkInterface {
+ friend class connection_pool_asio::ASIOConnection;
+ friend class connection_pool_asio::ASIOTimer;
+ friend class connection_pool_asio::ASIOImpl;
+
public:
+ struct Options {
+ ConnectionPool::Options connectionPoolOptions;
+ };
+
+ NetworkInterfaceASIO(std::unique_ptr<AsyncStreamFactoryInterface> streamFactory,
+ std::unique_ptr<NetworkConnectionHook> networkConnectionHook,
+ Options = Options());
NetworkInterfaceASIO(std::unique_ptr<AsyncStreamFactoryInterface> streamFactory,
- std::unique_ptr<NetworkConnectionHook> networkConnectionHook);
- NetworkInterfaceASIO(std::unique_ptr<AsyncStreamFactoryInterface> streamFactory);
+ Options = Options());
std::string getDiagnosticString() override;
std::string getHostName() override;
void startup() override;
@@ -172,6 +190,8 @@ private:
* Helper object to manage individual network operations.
*/
class AsyncOp {
+ friend class NetworkInterfaceASIO;
+
public:
AsyncOp(NetworkInterfaceASIO* net,
const TaskExecutor::CallbackHandle& cbHandle,
@@ -215,6 +235,10 @@ private:
RemoteCommandRequest _request;
RemoteCommandCompletionFn _onFinish;
+ // AsyncOp's have a handle to their connection pool handle. They are
+ // also owned by it when they're in the pool
+ ConnectionPool::ConnectionHandle _connectionPoolHandle;
+
/**
* The connection state used to service this request. We wrap it in an optional
* as it is instantiated at some point after the AsyncOp is created.
@@ -227,7 +251,7 @@ private:
*/
boost::optional<rpc::Protocol> _operationProtocol;
- const Date_t _start;
+ Date_t _start;
AtomicUInt64 _canceled;
@@ -237,6 +261,7 @@ private:
* representing its current running or next-to-be-run command, if there is one.
*/
boost::optional<AsyncCommand> _command;
+ bool _inSetup;
};
void _startCommand(AsyncOp* op);
@@ -280,6 +305,8 @@ private:
void _asyncRunCommand(AsyncCommand* cmd, NetworkOpHandler handler);
+ Options _options;
+
asio::io_service _io_service;
stdx::thread _serviceRunner;
@@ -293,10 +320,13 @@ private:
stdx::mutex _inProgressMutex;
std::unordered_map<AsyncOp*, std::unique_ptr<AsyncOp>> _inProgress;
+ std::vector<TaskExecutor::CallbackHandle> _inGetConnection;
stdx::mutex _executorMutex;
bool _isExecutorRunnable;
stdx::condition_variable _isExecutorRunnableCondition;
+
+ ConnectionPool _connectionPool;
};
template <typename T, typename R, typename... MethodArgs, typename... DeducedArgs>
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index 452b26a6aec..889ba5c11d5 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -35,6 +35,8 @@
#include <type_traits>
#include <utility>
+#include "mongo/executor/connection_pool_asio.h"
+#include "mongo/executor/async_stream_interface.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/jsobj.h"
#include "mongo/executor/async_stream_interface.h"
@@ -204,6 +206,19 @@ void NetworkInterfaceASIO::_startCommand(AsyncOp* op) {
}
void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
+ // The way that we connect connections for the connection pool is by
+ // starting the callback chain with connect(), but getting off at the first
+ // _beginCommunication. I.e. all AsyncOp's start off with _inSetup == true
+ // and arrive here as they're connected and authed. Once they hit here, we
+ // return to the connection pool's get() callback with _inSetup == false,
+ // so we can proceed with user operations after they return to this
+ // codepath.
+ if (op->_inSetup) {
+ op->_inSetup = false;
+ op->finish(op->command()->response(rpc::Protocol::kOpQuery, now()));
+ return;
+ }
+
auto beginStatus = op->beginCommand(op->request());
if (!beginStatus.isOK()) {
return _completeOperation(op, beginStatus);
@@ -237,13 +252,30 @@ void NetworkInterfaceASIO::_networkErrorCallback(AsyncOp* op, const std::error_c
void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& resp) {
op->finish(resp);
+ std::unique_ptr<AsyncOp> ownedOp;
+
{
- // NOTE: op will be deleted in the call to erase() below.
- // It is invalid to reference op after this point.
stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgress.erase(op);
+
+ auto iter = _inProgress.find(op);
+
+ // We're in connection start
+ if (iter == _inProgress.end()) {
+ return;
+ }
+
+ ownedOp = std::move(iter->second);
+ _inProgress.erase(iter);
}
+ invariant(ownedOp);
+
+ auto conn = std::move(op->_connectionPoolHandle);
+ auto asioConn = static_cast<connection_pool_asio::ASIOConnection*>(conn.get());
+
+ asioConn->bindAsyncOp(std::move(ownedOp));
+ asioConn->indicateUsed();
+
signalWorkAvailable();
}
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
index 069ed7f98bc..44be4846d5f 100644
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ b/src/mongo/executor/network_interface_asio_operation.cpp
@@ -30,9 +30,12 @@
#include "mongo/platform/basic.h"
+#include "mongo/executor/network_interface_asio.h"
+
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/executor/async_stream_interface.h"
+#include "mongo/executor/connection_pool_asio.h"
#include "mongo/executor/downconvert_find_and_getmore_commands.h"
#include "mongo/executor/network_interface_asio.h"
#include "mongo/rpc/factory.h"
@@ -75,7 +78,8 @@ NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner,
_request(request),
_onFinish(onFinish),
_start(now),
- _canceled(0) {}
+ _canceled(0),
+ _inSetup(true) {}
void NetworkInterfaceASIO::AsyncOp::cancel() {
// An operation may be in mid-flight when it is canceled, so we