diff options
21 files changed, 2625 insertions, 21 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index d3cf2de6562..fc768259d24 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -144,6 +144,7 @@ error_code("SSLHandshakeFailed", 141) error_code("JSUncatchableError", 142) error_code("CursorInUse", 143) error_code("IncompatibleCatalogManager", 144) +error_code("PooledConnectionsDropped", 145) # Non-sequential error codes (for compatibility only) error_code("RecvStaleConfig", 9996) diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index c3d5f578552..2976a5bee6f 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -51,6 +51,39 @@ env.Library('network_interface_mock', 'task_executor_interface', ]) +env.Library( + target='connection_pool', + source=[ + 'connection_pool.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/util/net/hostandport', + ], +) + +env.CppUnitTest( + target='connection_pool_test', + source=[ + 'connection_pool_test.cpp', + 'connection_pool_test_fixture.cpp', + ], + LIBDEPS=[ + 'connection_pool', + ], +) + +env.CppIntegrationTest( + target='connection_pool_asio_integration_test', + source=[ + 'connection_pool_asio_integration_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/rpc/command_status', + 'network_interface_asio', + ], +) + env.CppUnitTest( target='network_interface_mock_test', source=[ @@ -76,6 +109,7 @@ env.Library( 'async_secure_stream_factory.cpp', 'async_stream.cpp', 'async_stream_factory.cpp', + 'connection_pool_asio.cpp', 'network_interface_asio.cpp', 'network_interface_asio_auth.cpp', 'network_interface_asio_command.cpp', @@ -88,6 +122,7 @@ env.Library( '$BUILD_DIR/mongo/db/auth/authcommon', '$BUILD_DIR/mongo/rpc/rpc', '$BUILD_DIR/third_party/shim_asio', + 'connection_pool', 'downconvert_find_and_getmore_commands', 'network_interface', 'task_executor_interface', diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp index 37693d14047..80e289c2b8c 100644 --- a/src/mongo/executor/async_mock_stream_factory.cpp +++ b/src/mongo/executor/async_mock_stream_factory.cpp @@ -114,6 +114,14 @@ void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf, _io_service->post([writeHandler, size] { writeHandler(std::error_code(), size); }); } +void AsyncMockStreamFactory::MockStream::cancel() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + log() << "cancel() for: " << _target; + + // TODO: do we actually need to mock cancel? + // We may want a cancel queue the same way we have read and write queues +} + void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf, StreamHandler&& readHandler) { stdx::unique_lock<stdx::mutex> lk(_mutex); diff --git a/src/mongo/executor/async_mock_stream_factory.h b/src/mongo/executor/async_mock_stream_factory.h index d5e1cc8722b..23d7fbe625c 100644 --- a/src/mongo/executor/async_mock_stream_factory.h +++ b/src/mongo/executor/async_mock_stream_factory.h @@ -82,6 +82,8 @@ public: StreamState waitUntilBlocked(); + void cancel() override; + std::vector<uint8_t> popWrite(); void pushRead(std::vector<uint8_t> toRead); diff --git a/src/mongo/executor/async_secure_stream.cpp b/src/mongo/executor/async_secure_stream.cpp index e49d6e22e1e..b06667c66c8 100644 --- a/src/mongo/executor/async_secure_stream.cpp +++ b/src/mongo/executor/async_secure_stream.cpp @@ -87,6 +87,10 @@ void AsyncSecureStream::_handleHandshake(std::error_code ec, const std::string& _userHandler(make_error_code(certStatus.getStatus().code())); } +void AsyncSecureStream::cancel() { + _stream.lowest_layer().cancel(); +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/async_secure_stream.h b/src/mongo/executor/async_secure_stream.h index b24b628d4a8..c74e2d72ee2 100644 --- a/src/mongo/executor/async_secure_stream.h +++ b/src/mongo/executor/async_secure_stream.h @@ -46,9 +46,12 @@ public: void connect(const asio::ip::tcp::resolver::iterator endpoints, ConnectHandler&& connectHandler) override; - void write(asio::const_buffer buffer, StreamHandler&& streamHandler); - void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler); + void write(asio::const_buffer buffer, StreamHandler&& streamHandler) override; + + void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) override; + + void cancel() override; private: void _handleConnect(asio::ip::tcp::resolver::iterator iter); diff --git a/src/mongo/executor/async_stream.cpp b/src/mongo/executor/async_stream.cpp index 454ac3f0f42..39e339c7e83 100644 --- a/src/mongo/executor/async_stream.cpp +++ b/src/mongo/executor/async_stream.cpp @@ -56,5 +56,9 @@ void AsyncStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandle asio::async_read(_stream, asio::buffer(buffer), std::move(streamHandler)); } +void AsyncStream::cancel() { + _stream.cancel(); +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/async_stream.h b/src/mongo/executor/async_stream.h index 686d7532e20..80c07810193 100644 --- a/src/mongo/executor/async_stream.h +++ b/src/mongo/executor/async_stream.h @@ -39,12 +39,14 @@ class AsyncStream final : public AsyncStreamInterface { public: AsyncStream(asio::io_service* io_service); - void connect(asio::ip::tcp::resolver::iterator iter, ConnectHandler&& connectHandler); + void connect(asio::ip::tcp::resolver::iterator iter, ConnectHandler&& connectHandler) override; void write(asio::const_buffer buffer, StreamHandler&& streamHandler) override; void read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) override; + void cancel() override; + private: asio::ip::tcp::socket _stream; }; diff --git a/src/mongo/executor/async_stream_interface.h b/src/mongo/executor/async_stream_interface.h index 7f81f9ce219..cc9373dfe0b 100644 --- a/src/mongo/executor/async_stream_interface.h +++ b/src/mongo/executor/async_stream_interface.h @@ -58,6 +58,8 @@ public: virtual void read(asio::mutable_buffer buf, StreamHandler&& readHandler) = 0; + virtual void cancel() = 0; + protected: AsyncStreamInterface() = default; }; diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp new file mode 100644 index 00000000000..a5e2d9e2e91 --- /dev/null +++ b/src/mongo/executor/connection_pool.cpp @@ -0,0 +1,552 @@ +/** * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/connection_pool.h" + +#include "mongo/stdx/memory.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/scopeguard.h" + + +// One interesting implementation note herein concerns how setup() and +// refresh() are invoked outside of the global lock, but setTimeout is not. +// This implementation detail simplifies mocks, allowing them to return +// synchronously sometimes, whereas having timeouts fire instantly adds little +// value. In practice, dumping the locks is always safe (because we restrict +// ourselves to operations over the connection). + +namespace mongo { +namespace executor { + +/** + * A pool for a specific HostAndPort + * + * Pools come into existance the first time a connection is requested and + * go out of existence after hostTimeout passes without any of their + * connections being used. + */ +class ConnectionPool::SpecificPool { +public: + SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort); + ~SpecificPool(); + + void dropConnections(stdx::unique_lock<stdx::mutex> lk); + + /** + * Get's a connection from the specific pool. Sinks a unique_lock from the + * parent to preserve the lock on _mutex + */ + void getConnection(const HostAndPort& hostAndPort, + Milliseconds timeout, + stdx::unique_lock<stdx::mutex> lk, + GetConnectionCallback cb); + + /** + * Returns a connection to a specific pool. Sinks a unique_lock from the + * parent to preserve the lock on _mutex + */ + void returnConnection(ConnectionInterface* connection, stdx::unique_lock<stdx::mutex> lk); + +private: + using OwnedConnection = std::unique_ptr<ConnectionInterface>; + using OwnershipPool = std::unordered_map<ConnectionInterface*, OwnedConnection>; + using Request = std::pair<Date_t, GetConnectionCallback>; + struct RequestComparator { + bool operator()(const Request& a, const Request& b) { + return a.first > b.first; + } + }; + + void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn); + + void failAllRequests(const Status& status, stdx::unique_lock<stdx::mutex> lk); + + void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk); + + void spawnConnections(stdx::unique_lock<stdx::mutex>& lk, const HostAndPort& hostAndPort); + + void shutdown(); + + OwnedConnection takeFromPool(OwnershipPool& pool, ConnectionInterface* connection); + OwnedConnection takeFromProcessingPool(ConnectionInterface* connection); + + void updateStateInLock(); + +private: + ConnectionPool* const _parent; + + const HostAndPort _hostAndPort; + + OwnershipPool _readyPool; + OwnershipPool _processingPool; + OwnershipPool _droppedProcessingPool; + OwnershipPool _checkedOutPool; + + std::priority_queue<Request, std::vector<Request>, RequestComparator> _requests; + + std::unique_ptr<TimerInterface> _requestTimer; + Date_t _requestTimerExpiration; + size_t _generation; + bool _inFulfillRequests; + + /** + * The current state of the pool + * + * The pool begins in a running state. Moves to idle when no requests + * are pending and no connections are checked out. It finally enters + * shutdown after hostTimeout has passed (and waits there for current + * refreshes to process out). + * + * At any point a new request sets the state back to running and + * restarts all timers. + */ + enum class State { + // The pool is active + kRunning, + + // No current activity, waiting for hostTimeout to pass + kIdle, + + // hostTimeout is passed, we're waiting for any processing + // connections to finish before shutting down + kInShutdown, + }; + + State _state; +}; + +// TODO: revisit these durations when we come up with a more pervasive solution +// for NetworkInterfaceASIO's timers +Milliseconds const ConnectionPool::kDefaultRefreshTimeout = Seconds(30); +Milliseconds const ConnectionPool::kDefaultRefreshRequirement = Minutes(1); +Milliseconds const ConnectionPool::kDefaultHostTimeout = Minutes(5); + +ConnectionPool::ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl, Options options) + : _options(std::move(options)), _factory(std::move(impl)) {} + +ConnectionPool::~ConnectionPool() = default; + +void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + auto iter = _pools.find(hostAndPort); + + if (iter == _pools.end()) + return; + + iter->second.get()->dropConnections(std::move(lk)); +} + +void ConnectionPool::get(const HostAndPort& hostAndPort, + Milliseconds timeout, + GetConnectionCallback cb) { + SpecificPool* pool; + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + auto iter = _pools.find(hostAndPort); + + if (iter == _pools.end()) { + auto handle = stdx::make_unique<SpecificPool>(this, hostAndPort); + pool = handle.get(); + _pools[hostAndPort] = std::move(handle); + } else { + pool = iter->second.get(); + } + + invariant(pool); + + pool->getConnection(hostAndPort, timeout, std::move(lk), std::move(cb)); +} + +void ConnectionPool::returnConnection(ConnectionInterface* conn) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + auto iter = _pools.find(conn->getHostAndPort()); + + invariant(iter != _pools.end()); + + iter->second.get()->returnConnection(conn, std::move(lk)); +} + +ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort) + : _parent(parent), + _hostAndPort(hostAndPort), + _requestTimer(parent->_factory->makeTimer()), + _generation(0), + _inFulfillRequests(false), + _state(State::kRunning) {} + +ConnectionPool::SpecificPool::~SpecificPool() { + DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();) +} + +void ConnectionPool::SpecificPool::dropConnections(stdx::unique_lock<stdx::mutex> lk) { + _generation++; + + _readyPool.clear(); + + for (auto&& x : _processingPool) { + _droppedProcessingPool[x.first] = std::move(x.second); + } + + _processingPool.clear(); + + failAllRequests(Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), + std::move(lk)); +} + +void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort, + Milliseconds timeout, + stdx::unique_lock<stdx::mutex> lk, + GetConnectionCallback cb) { + auto expiration = _parent->_factory->now() + timeout; + + _requests.push(make_pair(expiration, std::move(cb))); + + updateStateInLock(); + + spawnConnections(lk, hostAndPort); + fulfillRequests(lk); +} + +void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, + stdx::unique_lock<stdx::mutex> lk) { + auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement; + auto now = _parent->_factory->now(); + + auto conn = takeFromPool(_checkedOutPool, connPtr); + + if (conn->isFailed() || conn->getGeneration() != _generation) { + // If the connection failed or has been dropped, simply let it lapse + // + // TODO: alert via some callback if the host is bad + } else if (needsRefreshTP <= now) { + // If we need to refresh this connection + + if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >= + _parent->_options.minConnections) { + // If we already have minConnections, just let the connection lapse + return; + } + + _processingPool[connPtr] = std::move(conn); + + // Unlock in case refresh can occur immediately + lk.unlock(); + connPtr->refresh(_parent->_options.refreshTimeout, + [this](ConnectionInterface* connPtr, Status status) { + connPtr->indicateUsed(); + + stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); + + auto conn = takeFromProcessingPool(connPtr); + + // If the host and port were dropped, let this lapse + if (conn->getGeneration() != _generation) + return; + + // If we're in shutdown, we don't need refreshed connections + if (_state == State::kInShutdown) + return; + + // If the connection refreshed successfully, throw it back in the ready + // pool + if (status.isOK()) { + addToReady(lk, std::move(conn)); + return; + } + + // Otherwise pass the failure on through + failAllRequests(status, std::move(lk)); + }); + lk.lock(); + } else { + // If it's fine as it is, just put it in the ready queue + addToReady(lk, std::move(conn)); + } + + updateStateInLock(); +} + +/** + * Adds a live connection to the ready pool + */ +void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk, + OwnedConnection conn) { + auto connPtr = conn.get(); + + _readyPool[connPtr] = std::move(conn); + + // Our strategy for refreshing connections is to check them out and + // immediately check them back in (which kicks off the refresh logic in + // returnConnection + connPtr->setTimeout(_parent->_options.refreshRequirement, + [this, connPtr]() { + OwnedConnection conn; + + stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); + + conn = takeFromPool(_readyPool, connPtr); + + // If we're in shutdown, we don't need to refresh connections + if (_state == State::kInShutdown) + return; + + _checkedOutPool[connPtr] = std::move(conn); + + returnConnection(connPtr, std::move(lk)); + }); + + fulfillRequests(lk); +} + +void ConnectionPool::SpecificPool::failAllRequests(const Status& status, + stdx::unique_lock<stdx::mutex> lk) { + // Move the requests out so they aren't visible + // in other threads + decltype(_requests) requestsToFail; + + { + using std::swap; + swap(requestsToFail, _requests); + } + + // Update state to reflect the lack of requests + updateStateInLock(); + + // Drop the lock and process all of the requests + // with the same failed status + lk.unlock(); + + while (requestsToFail.size()) { + requestsToFail.top().second(status); + requestsToFail.pop(); + } +} + +// fulfills as many outstanding requests as possible +void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex>& lk) { + // If some other thread (possibly this thread) is fulfilling requests, + // don't keep padding the callstack. + if (_inFulfillRequests) + return; + + _inFulfillRequests = true; + auto guard = MakeGuard([&] { _inFulfillRequests = false; }); + + while (_requests.size()) { + auto iter = _readyPool.begin(); + + if (iter == _readyPool.end()) + break; + + // Grab the connection and cancel its timeout + auto conn = std::move(iter->second); + _readyPool.erase(iter); + conn->cancelTimeout(); + + // Grab the request and callback + auto cb = std::move(_requests.top().second); + _requests.pop(); + + updateStateInLock(); + + auto connPtr = conn.get(); + + // check out the connection + _checkedOutPool[connPtr] = std::move(conn); + + // pass it to the user + lk.unlock(); + cb(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent))); + lk.lock(); + } +} + +// spawn enough connections to satisfy open requests and minpool, while +// honoring maxpool +void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mutex>& lk, + const HostAndPort& hostAndPort) { + // We want minConnections <= outstanding requests <= maxConnections + auto target = [&] { + return std::max( + _parent->_options.minConnections, + std::min(_requests.size() + _checkedOutPool.size(), _parent->_options.maxConnections)); + }; + + // While all of our inflight connections are less than our target + while (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) { + // make a new connection and put it in processing + auto handle = _parent->_factory->makeConnection(hostAndPort, _generation); + auto connPtr = handle.get(); + _processingPool[connPtr] = std::move(handle); + + // Run the setup callback + lk.unlock(); + connPtr->setup(_parent->_options.refreshTimeout, + [this](ConnectionInterface* connPtr, Status status) { + connPtr->indicateUsed(); + + stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); + + auto conn = takeFromProcessingPool(connPtr); + + if (conn->getGeneration() != _generation) { + // If the host and port was dropped, let the + // connection lapse + } else if (status.isOK()) { + addToReady(lk, std::move(conn)); + } else if (_requests.size()) { + // If the setup request failed, immediately fail + // all other pending requests. + + failAllRequests(status, std::move(lk)); + } + }); + // Note that this assumes that the refreshTimeout is sound for the + // setupTimeout + + lk.lock(); + } +} + +// Called every second after hostTimeout until all processing connections reap +void ConnectionPool::SpecificPool::shutdown() { + stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); + + _state = State::kInShutdown; + + // If we have processing connections, wait for them to finish or timeout + // before shutdown + if (_processingPool.size() || _droppedProcessingPool.size()) { + _requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); }); + + return; + } + + invariant(_requests.empty()); + invariant(_checkedOutPool.empty()); + + _parent->_pools.erase(_hostAndPort); +} + +ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromPool( + OwnershipPool& pool, ConnectionInterface* connPtr) { + auto iter = pool.find(connPtr); + invariant(iter != pool.end()); + + auto conn = std::move(iter->second); + pool.erase(iter); + return conn; +} + +ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromProcessingPool( + ConnectionInterface* connPtr) { + if (_processingPool.count(connPtr)) + return takeFromPool(_processingPool, connPtr); + + return takeFromPool(_droppedProcessingPool, connPtr); +} + + +// Updates our state and manages the request timer +void ConnectionPool::SpecificPool::updateStateInLock() { + if (_requests.size()) { + // We have some outstanding requests, we're live + + // If we were already running and the timer is the same as it was + // before, nothing to do + if (_state == State::kRunning && _requestTimerExpiration == _requests.top().first) + return; + + _state = State::kRunning; + + _requestTimer->cancelTimeout(); + + _requestTimerExpiration = _requests.top().first; + + auto timeout = _requests.top().first - _parent->_factory->now(); + + // We set a timer for the most recent request, then invoke each timed + // out request we couldn't service + _requestTimer->setTimeout( + timeout, + [this]() { + stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); + + auto now = _parent->_factory->now(); + + while (_requests.size()) { + auto& x = _requests.top(); + + if (x.first <= now) { + auto cb = std::move(x.second); + _requests.pop(); + + lk.unlock(); + cb(Status(ErrorCodes::ExceededTimeLimit, + "Couldn't get a connection within the time limit")); + lk.lock(); + } else { + break; + } + } + + updateStateInLock(); + }); + } else if (_checkedOutPool.size()) { + // If we have no requests, but someone's using a connection, we just + // hang around until the next request or a return + + _requestTimer->cancelTimeout(); + _state = State::kRunning; + _requestTimerExpiration = _requestTimerExpiration.max(); + } else { + // If we don't have any live requests and no one has checked out connections + + // If we used to be idle, just bail + if (_state == State::kIdle) + return; + + _state = State::kIdle; + + _requestTimer->cancelTimeout(); + + _requestTimerExpiration = _parent->_factory->now() + _parent->_options.hostTimeout; + + auto timeout = _parent->_options.hostTimeout; + + // Set the shutdown timer + _requestTimer->setTimeout(timeout, [this]() { shutdown(); }); + } +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h new file mode 100644 index 00000000000..d5a0332b5cb --- /dev/null +++ b/src/mongo/executor/connection_pool.h @@ -0,0 +1,281 @@ +/** * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <unordered_map> +#include <queue> + +#include "mongo/base/disallow_copying.h" +#include "mongo/stdx/chrono.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/net/hostandport.h" + +namespace mongo { +namespace executor { + +/** + * The actual user visible connection pool. + * + * This pool is constructed with a DependentTypeFactoryInterface which provides the tools it + * needs to generate connections and manage them over time. + * + * The overall workflow here is to manage separate pools for each unique + * HostAndPort. See comments on the various Options for how the pool operates. + */ +class ConnectionPool { + class ConnectionHandleDeleter; + class SpecificPool; + +public: + class ConnectionInterface; + class DependentTypeFactoryInterface; + class TimerInterface; + + using ConnectionHandle = std::unique_ptr<ConnectionInterface, ConnectionHandleDeleter>; + + using GetConnectionCallback = stdx::function<void(StatusWith<ConnectionHandle>)>; + + static const Milliseconds kDefaultRefreshTimeout; + static const Milliseconds kDefaultRefreshRequirement; + static const Milliseconds kDefaultHostTimeout; + + struct Options { + Options() {} + + /** + * The minimum number of connections to keep alive while the pool is in + * operation + */ + size_t minConnections = 1; + + /** + * The maximum number of connections to spawn for a host. This includes + * pending connections in setup and connections checked out of the pool + * as well as the obvious live connections in the pool. + */ + size_t maxConnections = std::numeric_limits<size_t>::max(); + + /** + * Amount of time to wait before timing out a refresh attempt + */ + Milliseconds refreshTimeout = kDefaultRefreshTimeout; + + /** + * Amount of time a connection may be idle before it cannot be returned + * for a user request and must instead be checked out and refreshed + * before handing to a user. + */ + Milliseconds refreshRequirement = kDefaultRefreshRequirement; + + /** + * Amount of time to keep a specific pool around without any checked + * out connections or new requests + */ + Milliseconds hostTimeout = kDefaultHostTimeout; + }; + + explicit ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl, + Options options = Options{}); + + ~ConnectionPool(); + + void dropConnections(const HostAndPort& hostAndPort); + + void get(const HostAndPort& hostAndPort, Milliseconds timeout, GetConnectionCallback cb); + + /** + * TODO add a function returning connection pool stats + */ + +private: + void returnConnection(ConnectionInterface* connection); + + // Options are set at startup and never changed at run time, so these are + // accessed outside the lock + const Options _options; + + const std::unique_ptr<DependentTypeFactoryInterface> _factory; + + // The global mutex for specific pool access and the generation counter + stdx::mutex _mutex; + std::unordered_map<HostAndPort, std::unique_ptr<SpecificPool>> _pools; +}; + +class ConnectionPool::ConnectionHandleDeleter { +public: + ConnectionHandleDeleter() = default; + ConnectionHandleDeleter(ConnectionPool* pool) : _pool(pool) {} + + void operator()(ConnectionInterface* connection) { + if (_pool && connection) + _pool->returnConnection(connection); + } + +private: + ConnectionPool* _pool = nullptr; +}; + +/** + * Interface for a basic timer + * + * Minimal interface sets a timer with a callback and cancels the timer. + */ +class ConnectionPool::TimerInterface { + MONGO_DISALLOW_COPYING(TimerInterface); + +public: + TimerInterface() = default; + + using TimeoutCallback = stdx::function<void()>; + + virtual ~TimerInterface() = default; + + /** + * Sets the timeout for the timer. Setting an already set timer should + * override the previous timer. + */ + virtual void setTimeout(Milliseconds timeout, TimeoutCallback cb) = 0; + + /** + * It should be safe to cancel a previously canceled, or never set, timer. + */ + virtual void cancelTimeout() = 0; +}; + +/** + * Interface for connection pool connections + * + * Provides a minimal interface to manipulate connections within the pool, + * specifically callbacks to set them up (connect + auth + whatever else), + * refresh them (issue some kind of ping) and manage a timer. + */ +class ConnectionPool::ConnectionInterface : public TimerInterface { + MONGO_DISALLOW_COPYING(ConnectionInterface); + + friend class ConnectionPool; + +public: + ConnectionInterface() = default; + + virtual ~ConnectionInterface() = default; + + /** + * Intended to be called whenever a socket is used in a way which indicates + * liveliness. I.e. if an operation is executed over the connection. + */ + virtual void indicateUsed() = 0; + + /** + * Indicates that a connection has failed. This will prevent the connection + * from re-entering the connection pool. + */ + virtual void indicateFailed() = 0; + + /** + * The HostAndPort for the connection. This should be the same as the + * HostAndPort passed to DependentTypeFactoryInterface::makeConnection. + */ + virtual const HostAndPort& getHostAndPort() const = 0; + +protected: + /** + * Making these protected makes the definitions available to override in + * children. + */ + using SetupCallback = stdx::function<void(ConnectionInterface*, Status)>; + using RefreshCallback = stdx::function<void(ConnectionInterface*, Status)>; + +private: + /** + * Returns the last used time point for the connection + */ + virtual Date_t getLastUsed() const = 0; + + /** + * Returns true if the connection is failed. This implies that it should + * not be returned to the pool. + */ + virtual bool isFailed() const = 0; + + /** + * Sets up the connection. This should include connection + auth + any + * other associated hooks. + */ + virtual void setup(Milliseconds timeout, SetupCallback cb) = 0; + + /** + * Refreshes the connection. This should involve a network round trip and + * should strongly imply an active connection + */ + virtual void refresh(Milliseconds timeout, RefreshCallback cb) = 0; + + /** + * Get the generation of the connection. This is used to track whether to + * continue using a connection after a call to dropConnections() by noting + * if the generation on the specific pool is the same as the generation on + * a connection (if not the connection is from a previous era and should + * not be re-used). + */ + virtual size_t getGeneration() const = 0; +}; + +/** + * Implementation interface for the connection pool + * + * This factory provides generators for connections, timers and a clock for the + * connection pool. + */ +class ConnectionPool::DependentTypeFactoryInterface { + MONGO_DISALLOW_COPYING(DependentTypeFactoryInterface); + +public: + DependentTypeFactoryInterface() = default; + + virtual ~DependentTypeFactoryInterface() = default; + + /** + * Makes a new connection given a host and port + */ + virtual std::unique_ptr<ConnectionInterface> makeConnection(const HostAndPort& hostAndPort, + size_t generation) = 0; + + /** + * Makes a new timer + */ + virtual std::unique_ptr<TimerInterface> makeTimer() = 0; + + /** + * Returns the current time point + */ + virtual Date_t now() = 0; +}; + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp new file mode 100644 index 00000000000..803d0096ddd --- /dev/null +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -0,0 +1,182 @@ +/** * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/connection_pool_asio.h" + +#include <asio.hpp> + +#include "mongo/executor/async_stream_factory_interface.h" + +#include "mongo/rpc/factory.h" +#include "mongo/rpc/reply_interface.h" +#include "mongo/stdx/memory.h" + +namespace mongo { +namespace executor { +namespace connection_pool_asio { + +ASIOTimer::ASIOTimer(asio::io_service* io_service) : _io_service(io_service), _impl(*io_service) {} + +void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) { + _cb = std::move(cb); + + cancelTimeout(); + _impl.expires_after(timeout); + _impl.async_wait([this](const asio::error_code& error) { + auto cb = std::move(_cb); + + if (error != asio::error::operation_aborted) + cb(); + }); +} + +void ASIOTimer::cancelTimeout() { + _impl.cancel(); +} + +ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global) + : _global(global), + _timer(&global->_impl->_io_service), + _hostAndPort(hostAndPort), + _generation(generation), + _impl(makeAsyncOp(this)) {} + +void ASIOConnection::indicateUsed() { + _lastUsed = _global->now(); +} + +void ASIOConnection::indicateFailed() { + _isFailed = true; +} + +const HostAndPort& ASIOConnection::getHostAndPort() const { + return _hostAndPort; +} + +Date_t ASIOConnection::getLastUsed() const { + return _lastUsed; +} + +bool ASIOConnection::isFailed() const { + return _isFailed; +} + +size_t ASIOConnection::getGeneration() const { + return _generation; +} + +std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOConnection* conn) { + return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>( + conn->_global->_impl, + TaskExecutor::CallbackHandle(), + makeIsMasterRequest(conn), + [conn](const TaskExecutor::ResponseStatus& status) { + auto cb = std::move(conn->_setupCallback); + cb(conn, status.isOK() ? Status::OK() : status.getStatus()); + }, + conn->_global->now()); +} + +RemoteCommandRequest ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) { + return {conn->_hostAndPort, std::string("admin"), BSON("isMaster" << 1), BSONObj()}; +} + +void ASIOConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) { + _timer.setTimeout(timeout, std::move(cb)); +} + +void ASIOConnection::cancelTimeout() { + _timer.cancelTimeout(); +} + +void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) { + _setupCallback = std::move(cb); + + _global->_impl->_connect(_impl.get()); +} + +void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { + auto op = _impl.get(); + + _refreshCallback = std::move(cb); + + // Actually timeout refreshes + setTimeout(timeout, + [this]() { + asio::post(_global->_impl->_io_service, + [this] { _impl->connection().stream().cancel(); }); + }); + + // Our pings are isMaster's + auto beginStatus = op->beginCommand(makeIsMasterRequest(this)); + if (!beginStatus.isOK()) { + auto cb = std::move(_refreshCallback); + cb(this, beginStatus); + } + + _global->_impl->_asyncRunCommand( + op->command(), + [this, op](std::error_code ec, size_t bytes) { + cancelTimeout(); + + if (ec) { + return _refreshCallback(this, Status(ErrorCodes::HostUnreachable, ec.message())); + } + + auto cb = std::move(_refreshCallback); + cb(this, Status::OK()); + }); +} + +std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() { + return std::move(_impl); +} + +void ASIOConnection::bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op) { + _impl = std::move(op); +} + +ASIOImpl::ASIOImpl(NetworkInterfaceASIO* impl) : _impl(impl) {} + +Date_t ASIOImpl::now() { + return _impl->now(); +} + +std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() { + return stdx::make_unique<ASIOTimer>(&_impl->_io_service); +} + +std::unique_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection( + const HostAndPort& hostAndPort, size_t generation) { + return stdx::make_unique<ASIOConnection>(hostAndPort, generation, this); +} + +} // namespace connection_pool_asio +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h new file mode 100644 index 00000000000..296a5d20859 --- /dev/null +++ b/src/mongo/executor/connection_pool_asio.h @@ -0,0 +1,119 @@ +/** * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/executor/connection_pool.h" +#include "mongo/executor/network_interface_asio.h" +#include "mongo/executor/network_interface.h" +#include "mongo/executor/async_stream_interface.h" + +namespace mongo { +namespace executor { +namespace connection_pool_asio { + +/** + * Implements connection pool timers on top of asio + */ +class ASIOTimer final : public ConnectionPool::TimerInterface { +public: + ASIOTimer(asio::io_service* service); + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; + void cancelTimeout() override; + +private: + TimeoutCallback _cb; + asio::io_service* const _io_service; + asio::steady_timer _impl; +}; + +/** + * Implements connection pool connections on top of asio + * + * Owns an async op when it's out of the pool + */ +class ASIOConnection final : public ConnectionPool::ConnectionInterface { +public: + ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global); + + void indicateUsed() override; + void indicateFailed() override; + const HostAndPort& getHostAndPort() const override; + + std::unique_ptr<NetworkInterfaceASIO::AsyncOp> releaseAsyncOp(); + void bindAsyncOp(std::unique_ptr<NetworkInterfaceASIO::AsyncOp> op); + +private: + Date_t getLastUsed() const override; + bool isFailed() const override; + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; + void cancelTimeout() override; + + void setup(Milliseconds timeout, SetupCallback cb) override; + void refresh(Milliseconds timeout, RefreshCallback cb) override; + + size_t getGeneration() const override; + + static std::unique_ptr<NetworkInterfaceASIO::AsyncOp> makeAsyncOp(ASIOConnection* conn); + static RemoteCommandRequest makeIsMasterRequest(ASIOConnection* conn); + +private: + SetupCallback _setupCallback; + RefreshCallback _refreshCallback; + ASIOImpl* const _global; + ASIOTimer _timer; + Date_t _lastUsed; + bool _isFailed = false; + HostAndPort _hostAndPort; + size_t _generation; + std::unique_ptr<NetworkInterfaceASIO::AsyncOp> _impl; +}; + +/** + * Implementions connection pool implementation for asio + */ +class ASIOImpl final : public ConnectionPool::DependentTypeFactoryInterface { + friend class ASIOConnection; + +public: + ASIOImpl(NetworkInterfaceASIO* impl); + + std::unique_ptr<ConnectionPool::ConnectionInterface> makeConnection( + const HostAndPort& hostAndPort, size_t generation) override; + std::unique_ptr<ConnectionPool::TimerInterface> makeTimer() override; + + Date_t now() override; + +private: + NetworkInterfaceASIO* const _impl; +}; + +} // namespace connection_pool_asio +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp new file mode 100644 index 00000000000..782e00aa28b --- /dev/null +++ b/src/mongo/executor/connection_pool_asio_integration_test.cpp @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/connection_pool_asio.h" + +#include "mongo/client/connection_string.h" +#include "mongo/executor/async_stream_factory.h" +#include "mongo/executor/network_connection_hook.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/stdx/future.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/integration_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace executor { +namespace { + +class MyNetworkConnectionHook : public NetworkConnectionHook { +public: + Status validateHost(const HostAndPort& remoteHost, + const RemoteCommandResponse& isMasterReply) override { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + _count++; + + return Status::OK(); + } + + StatusWith<boost::optional<RemoteCommandRequest>> makeRequest( + const HostAndPort& remoteHost) override { + return StatusWith<boost::optional<RemoteCommandRequest>>(boost::none); + } + + Status handleReply(const HostAndPort& remoteHost, RemoteCommandResponse&& response) override { + MONGO_UNREACHABLE; + } + + static int count() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + return _count; + } + +private: + static stdx::mutex _mutex; + static int _count; +}; + +stdx::mutex MyNetworkConnectionHook::_mutex; +int MyNetworkConnectionHook::_count = 0; + +TEST(ConnectionPoolASIO, TestPing) { + auto fixture = unittest::getFixtureConnectionString(); + + NetworkInterfaceASIO::Options options; + options.connectionPoolOptions.maxConnections = 10; + + NetworkInterfaceASIO net{stdx::make_unique<AsyncStreamFactory>(), + stdx::make_unique<MyNetworkConnectionHook>(), + options}; + + net.startup(); + auto guard = MakeGuard([&] { net.shutdown(); }); + + const int N = 500; + + std::array<stdx::thread, N> threads; + + for (auto& thread : threads) { + thread = stdx::thread([&net, &fixture]() { + auto status = Status::OK(); + stdx::promise<void> result; + + net.startCommand(TaskExecutor::CallbackHandle(), + RemoteCommandRequest{ + fixture.getServers()[0], "admin", BSON("ping" << 1), BSONObj()}, + [&result, &status](StatusWith<RemoteCommandResponse> resp) { + if (!resp.isOK()) { + status = std::move(resp.getStatus()); + } + result.set_value(); + }); + + result.get_future().get(); + + ASSERT_OK(status); + }); + } + + for (auto&& thread : threads) { + thread.join(); + } + + ASSERT_EQ(MyNetworkConnectionHook::count(), 10); +} + +} // namespace +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp new file mode 100644 index 00000000000..be951ccbafc --- /dev/null +++ b/src/mongo/executor/connection_pool_test.cpp @@ -0,0 +1,790 @@ +/** * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/connection_pool_test_fixture.h" + +#include "mongo/executor/connection_pool.h" +#include "mongo/unittest/unittest.h" +#include "mongo/stdx/memory.h" +#include "mongo/stdx/future.h" + +namespace mongo { +namespace executor { +namespace connection_pool_test_details { + +class ConnectionPoolTest : public unittest::Test { +public: +protected: + void setUp() override {} + + void tearDown() override { + ConnectionImpl::clear(); + TimerImpl::clear(); + } + +private: +}; + +#define CONN2ID(swConn) \ + [](StatusWith<ConnectionPool::ConnectionHandle> & swConn) { \ + ASSERT(swConn.isOK()); \ + return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \ + }(swConn) + +/** + * Verify that we get the same connection if we grab one, return it and grab + * another. + */ +TEST_F(ConnectionPoolTest, SameConn) { + ConnectionPool pool(stdx::make_unique<PoolImpl>()); + + // Grab and stash an id for the first request + size_t conn1Id = 0; + ConnectionImpl::pushSetup(Status::OK()); + pool.get( + HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); }); + + // Grab and stash an id for the second request + size_t conn2Id = 0; + ConnectionImpl::pushSetup(Status::OK()); + pool.get( + HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + + // Verify that we hit them, and that they're the same + ASSERT(conn1Id); + ASSERT(conn2Id); + ASSERT_EQ(conn1Id, conn2Id); +} + +/** + * Verify that a failed connection isn't returned to the pool + */ +TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { + ConnectionPool pool(stdx::make_unique<PoolImpl>()); + + // Grab the first connection and indicate that it failed + size_t conn1Id = 0; + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = CONN2ID(swConn); + swConn.getValue()->indicateFailed(); + }); + + // Grab the second id + size_t conn2Id = 0; + ConnectionImpl::pushSetup(Status::OK()); + pool.get( + HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + + // Verify that we hit them, and that they're different + ASSERT(conn1Id); + ASSERT(conn2Id); + ASSERT_NE(conn1Id, conn2Id); +} + +/** + * Verify that providing different host and ports gives you different + * connections. + */ +TEST_F(ConnectionPoolTest, DifferentHostDifferentConn) { + ConnectionPool pool(stdx::make_unique<PoolImpl>()); + + // Conn 1 from port 30000 + size_t conn1Id = 0; + ConnectionImpl::pushSetup(Status::OK()); + pool.get( + HostAndPort("localhost:30000"), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); }); + + // Conn 2 from port 30001 + size_t conn2Id = 0; + ConnectionImpl::pushSetup(Status::OK()); + pool.get( + HostAndPort("localhost:30001"), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + + // Hit them and not the same + ASSERT(conn1Id); + ASSERT(conn2Id); + ASSERT_NE(conn1Id, conn2Id); +} + +/** + * Verify that not returning handle's to the pool spins up new connections. + */ +TEST_F(ConnectionPoolTest, DifferentConnWithoutReturn) { + ConnectionPool pool(stdx::make_unique<PoolImpl>()); + + // Get the first connection, move it out rather than letting it return + ConnectionPool::ConnectionHandle conn1; + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + conn1 = std::move(swConn.getValue()); + }); + + // Get the second connection, move it out rather than letting it return + ConnectionPool::ConnectionHandle conn2; + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + conn2 = std::move(swConn.getValue()); + }); + + // Verify that the two connections are different + ASSERT_NE(conn1.get(), conn2.get()); +} + +/** + * Verify that timing out on setup works as expected (a bad status is + * returned). + * + * Note that the lack of pushSetup() calls delays the get. + */ +TEST_F(ConnectionPoolTest, TimeoutOnSetup) { + ConnectionPool pool(stdx::make_unique<PoolImpl>()); + + bool notOk = false; + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { notOk = !swConn.isOK(); }); + + PoolImpl::setNow(now + Milliseconds(5000)); + + ASSERT(notOk); +} + +/** + * Verify that refresh callbacks happen at the appropriate moments. + */ +TEST_F(ConnectionPoolTest, refreshHappens) { + bool refreshedA = false; + bool refreshedB = false; + ConnectionImpl::pushRefresh([&]() { + refreshedA = true; + return Status::OK(); + }); + + ConnectionImpl::pushRefresh([&]() { + refreshedB = true; + return Status::OK(); + }); + + ConnectionPool::Options options; + options.refreshRequirement = Milliseconds(1000); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + // Get a connection + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { ASSERT(swConn.isOK()); }); + + // After 1 second, one refresh has occurred + PoolImpl::setNow(now + Milliseconds(1000)); + ASSERT(refreshedA); + ASSERT(!refreshedB); + + // After 1.5 seconds, the second refresh still hasn't triggered + PoolImpl::setNow(now + Milliseconds(1500)); + ASSERT(!refreshedB); + + // At 2 seconds, the second refresh has triggered + PoolImpl::setNow(now + Milliseconds(2000)); + ASSERT(refreshedB); +} + +/** + * Verify that refresh can timeout. + */ +TEST_F(ConnectionPoolTest, refreshTimeoutHappens) { + ConnectionPool::Options options; + options.refreshRequirement = Milliseconds(1000); + options.refreshTimeout = Milliseconds(2000); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + size_t conn1Id = 0; + + // Grab a connection and verify it's good + ConnectionImpl::pushSetup(Status::OK()); + pool.get( + HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); }); + + PoolImpl::setNow(now + Milliseconds(500)); + + size_t conn2Id = 0; + // Make sure we still get the first one + pool.get( + HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + ASSERT_EQ(conn1Id, conn2Id); + + // This should trigger a refresh, but not time it out. So now we have one + // connection sitting in refresh. + PoolImpl::setNow(now + Milliseconds(2000)); + bool reachedA = false; + + // This will wait because we have a refreshing connection, so it'll wait to + // see if that pans out. In this case, we'll get a failure on timeout. + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(10000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(!swConn.isOK()); + + reachedA = true; + }); + ASSERT(!reachedA); + + // Let the refresh timeout + PoolImpl::setNow(now + Milliseconds(4000)); + + bool reachedB = false; + + // Make sure we can get a new connection + pool.get(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(CONN2ID(swConn), conn1Id); + reachedB = true; + }); + + ASSERT(reachedA); + ASSERT(reachedB); +} + +/** + * Verify that requests are served in expiration order, not insertion order + */ +TEST_F(ConnectionPoolTest, requestsServedByUrgency) { + ConnectionPool pool(stdx::make_unique<PoolImpl>()); + + bool reachedA = false; + bool reachedB = false; + + ConnectionPool::ConnectionHandle conn; + + pool.get(HostAndPort(), + Milliseconds(2000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + reachedA = true; + }); + + pool.get(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + reachedB = true; + + conn = std::move(swConn.getValue()); + }); + + ConnectionImpl::pushSetup(Status::OK()); + + // Note thate we hit the 1 second request, but not the 2 second + ASSERT(reachedB); + ASSERT(!reachedA); + + conn.reset(); + + // Now that we've returned the connection, we see the second has been + // called + ASSERT(reachedA); +} + +/** + * Verify that we respect maxConnections + */ +TEST_F(ConnectionPoolTest, maxPoolRespected) { + ConnectionPool::Options options; + options.minConnections = 1; + options.maxConnections = 2; + ConnectionPool pool(stdx::make_unique<PoolImpl>(), options); + + ConnectionPool::ConnectionHandle conn1; + ConnectionPool::ConnectionHandle conn2; + ConnectionPool::ConnectionHandle conn3; + + // Make 3 requests, each which keep their connection (don't return it to + // the pool) + pool.get(HostAndPort(), + Milliseconds(3000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn3 = std::move(swConn.getValue()); + }); + pool.get(HostAndPort(), + Milliseconds(2000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn2 = std::move(swConn.getValue()); + }); + pool.get(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn1 = std::move(swConn.getValue()); + }); + + ConnectionImpl::pushSetup(Status::OK()); + ConnectionImpl::pushSetup(Status::OK()); + ConnectionImpl::pushSetup(Status::OK()); + + // Note that only two have run + ASSERT(conn1); + ASSERT(conn2); + ASSERT(!conn3); + + // Return 1 + ConnectionPool::ConnectionInterface* conn1Ptr = conn1.get(); + conn1.reset(); + + // Verify that it's the one that pops out for request 3 + ASSERT_EQ(conn1Ptr, conn3.get()); +} + +/** + * Verify that minConnections is respected + */ +TEST_F(ConnectionPoolTest, minPoolRespected) { + ConnectionPool::Options options; + options.minConnections = 2; + options.maxConnections = 3; + options.refreshRequirement = Milliseconds(1000); + options.refreshTimeout = Milliseconds(2000); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + ConnectionPool::ConnectionHandle conn1; + ConnectionPool::ConnectionHandle conn2; + ConnectionPool::ConnectionHandle conn3; + + // Grab one connection without returning it + pool.get(HostAndPort(), + Milliseconds(1000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn1 = std::move(swConn.getValue()); + }); + + bool reachedA = false; + bool reachedB = false; + bool reachedC = false; + + ConnectionImpl::pushSetup([&]() { + reachedA = true; + return Status::OK(); + }); + ConnectionImpl::pushSetup([&]() { + reachedB = true; + return Status::OK(); + }); + ConnectionImpl::pushSetup([&]() { + reachedC = true; + return Status::OK(); + }); + + // Verify that two setups were invoked, even without two requests (the + // minConnections == 2) + ASSERT(reachedA); + ASSERT(reachedB); + ASSERT(!reachedC); + + // Two more get's without returns + pool.get(HostAndPort(), + Milliseconds(2000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn2 = std::move(swConn.getValue()); + }); + pool.get(HostAndPort(), + Milliseconds(3000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(swConn.isOK()); + + conn3 = std::move(swConn.getValue()); + }); + + ASSERT(conn2); + ASSERT(conn3); + + reachedA = false; + reachedB = false; + reachedC = false; + + ConnectionImpl::pushRefresh([&]() { + reachedA = true; + return Status::OK(); + }); + ConnectionImpl::pushRefresh([&]() { + reachedB = true; + return Status::OK(); + }); + ConnectionImpl::pushRefresh([&]() { + reachedC = true; + return Status::OK(); + }); + + // Return each connection over 1, 2 and 3 ms + PoolImpl::setNow(now + Milliseconds(1)); + conn1->indicateUsed(); + conn1.reset(); + + PoolImpl::setNow(now + Milliseconds(2)); + conn2->indicateUsed(); + conn2.reset(); + + PoolImpl::setNow(now + Milliseconds(3)); + conn3->indicateUsed(); + conn3.reset(); + + // Jump 5 seconds and verify that refreshes only two refreshes occurred + PoolImpl::setNow(now + Milliseconds(5000)); + + ASSERT(reachedA); + ASSERT(reachedB); + ASSERT(!reachedC); +} + + +/** + * Verify that the hostTimeout is respected. This implies that an idle + * hostAndPort drops it's connections. + */ +TEST_F(ConnectionPoolTest, hostTimeoutHappens) { + ConnectionPool::Options options; + options.refreshRequirement = Milliseconds(5000); + options.refreshTimeout = Milliseconds(5000); + options.hostTimeout = Milliseconds(1000); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + size_t connId = 0; + + bool reachedA = false; + // Grab 1 connection and return it + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + connId = CONN2ID(swConn); + reachedA = true; + }); + + ASSERT(reachedA); + + // Jump pass the hostTimeout + PoolImpl::setNow(now + Milliseconds(1000)); + + bool reachedB = false; + + // Verify that a new connection was spawned + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(connId, CONN2ID(swConn)); + reachedB = true; + }); + + ASSERT(reachedB); +} + + +/** + * Verify that the hostTimeout happens, but that continued gets delay + * activation. + */ +TEST_F(ConnectionPoolTest, hostTimeoutHappensMoreGetsDelay) { + ConnectionPool::Options options; + options.refreshRequirement = Milliseconds(5000); + options.refreshTimeout = Milliseconds(5000); + options.hostTimeout = Milliseconds(1000); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + size_t connId = 0; + + bool reachedA = false; + + // Grab and return + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + connId = CONN2ID(swConn); + reachedA = true; + }); + ASSERT(reachedA); + + // Jump almost up to the hostTimeout + PoolImpl::setNow(now + Milliseconds(999)); + + bool reachedB = false; + // Same connection + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_EQ(connId, CONN2ID(swConn)); + reachedB = true; + }); + ASSERT(reachedB); + + // Now we've timed out + PoolImpl::setNow(now + Milliseconds(2000)); + + bool reachedC = false; + // Different id + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(connId, CONN2ID(swConn)); + reachedC = true; + }); + + ASSERT(reachedC); +} + + +/** + * Verify that the hostTimeout happens and that having a connection checked out + * delays things + */ +TEST_F(ConnectionPoolTest, hostTimeoutHappensCheckoutDelays) { + ConnectionPool::Options options; + options.refreshRequirement = Milliseconds(5000); + options.refreshTimeout = Milliseconds(5000); + options.hostTimeout = Milliseconds(1000); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), options); + + auto now = Date_t::now(); + + PoolImpl::setNow(now); + + ConnectionPool::ConnectionHandle conn1; + size_t conn1Id = 0; + size_t conn2Id = 0; + + // save 1 connection + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn1Id = CONN2ID(swConn); + conn1 = std::move(swConn.getValue()); + }); + + ASSERT(conn1Id); + + // return the second + ConnectionImpl::pushSetup(Status::OK()); + pool.get( + HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn2Id = CONN2ID(swConn); }); + + ASSERT(conn2Id); + + // hostTimeout has passed + PoolImpl::setNow(now + Milliseconds(1000)); + + bool reachedA = false; + + // conn 2 is still there + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_EQ(conn2Id, CONN2ID(swConn)); + reachedA = true; + }); + + ASSERT(reachedA); + + // return conn 1 + conn1.reset(); + + // expire the pool + PoolImpl::setNow(now + Milliseconds(2000)); + + bool reachedB = false; + + // make sure that this is a new id + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(conn1Id, CONN2ID(swConn)); + ASSERT_NE(conn2Id, CONN2ID(swConn)); + reachedB = true; + }); + ASSERT(reachedB); +} + +/** + * Verify that drop connections works + */ +TEST_F(ConnectionPoolTest, dropConnections) { + ConnectionPool::Options options; + + // ensure that only 1 connection is floating around + options.maxConnections = 1; + options.refreshRequirement = Seconds(1); + options.refreshTimeout = Seconds(2); + ConnectionPool pool(stdx::make_unique<PoolImpl>(), options); + + auto now = Date_t::now(); + PoolImpl::setNow(now); + + // Grab the first connection id + size_t conn1Id = 0; + ConnectionImpl::pushSetup(Status::OK()); + pool.get( + HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); }); + ASSERT(conn1Id); + + // Grab it and this time keep it out of the pool + ConnectionPool::ConnectionHandle handle; + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_EQ(CONN2ID(swConn), conn1Id); + handle = std::move(swConn.getValue()); + }); + + ASSERT(handle); + + bool reachedA = false; + + // Queue up a request. This won't fire until we drop connections, then it + // will fail. + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT(!swConn.isOK()); + reachedA = true; + }); + ASSERT(!reachedA); + + // fails the previous get + pool.dropConnections(HostAndPort()); + + ASSERT(reachedA); + + // return the connection + handle.reset(); + + // Make sure that a new connection request properly disposed of the gen1 + // connection + size_t conn2Id = 0; + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + conn2Id = CONN2ID(swConn); + + ASSERT_NE(conn2Id, conn1Id); + }); + ASSERT(conn2Id); + + // Push conn2 into refresh + PoolImpl::setNow(now + Milliseconds(1500)); + + // drop the connections + pool.dropConnections(HostAndPort()); + + // refresh still pending + PoolImpl::setNow(now + Milliseconds(2500)); + + // Verify that a new connection came out, despite the gen2 connection still + // being pending + bool reachedB = false; + ConnectionImpl::pushSetup(Status::OK()); + pool.get(HostAndPort(), + Milliseconds(5000), + [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { + ASSERT_NE(CONN2ID(swConn), conn2Id); + reachedB = true; + }); + + ASSERT(reachedB); +} + +} // namespace connection_pool_test_details +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp new file mode 100644 index 00000000000..860275ebf39 --- /dev/null +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -0,0 +1,211 @@ +/** * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/connection_pool_test_fixture.h" + +#include "mongo/stdx/memory.h" + +namespace mongo { +namespace executor { +namespace connection_pool_test_details { + +TimerImpl::TimerImpl(PoolImpl* global) : _global(global) {} + +TimerImpl::~TimerImpl() { + cancelTimeout(); +} + +void TimerImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) { + _cb = std::move(cb); + _expiration = _global->now() + timeout; + + _timers.emplace(this); +} + +void TimerImpl::cancelTimeout() { + _timers.erase(this); +} + +void TimerImpl::clear() { + _timers.clear(); +} + +void TimerImpl::fireIfNecessary() { + auto now = PoolImpl().now(); + + auto timers = _timers; + + for (auto&& x : timers) { + if (_timers.count(x) && (x->_expiration <= now)) { + x->_cb(); + } + } +} + +std::set<TimerImpl*> TimerImpl::_timers; + +ConnectionImpl::ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global) + : _hostAndPort(hostAndPort), + _timer(global), + _global(global), + _id(_idCounter++), + _generation(generation) {} + +void ConnectionImpl::indicateUsed() { + _lastUsed = _global->now(); +} + +void ConnectionImpl::indicateFailed() { + _isFailed = true; +} + +size_t ConnectionImpl::id() const { + return _id; +} + +const HostAndPort& ConnectionImpl::getHostAndPort() const { + return _hostAndPort; +} + +void ConnectionImpl::clear() { + _setupQueue.clear(); + _refreshQueue.clear(); + _pushSetupQueue.clear(); + _pushRefreshQueue.clear(); +} + +void ConnectionImpl::pushSetup(PushSetupCallback status) { + _pushSetupQueue.push_back(status); + + if (_setupQueue.size()) { + _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()()); + _setupQueue.pop_front(); + _pushSetupQueue.pop_front(); + } +} + +void ConnectionImpl::pushSetup(Status status) { + pushSetup([status]() { return status; }); +} + +void ConnectionImpl::pushRefresh(PushRefreshCallback status) { + _pushRefreshQueue.push_back(status); + + if (_refreshQueue.size()) { + _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()()); + _refreshQueue.pop_front(); + _pushRefreshQueue.pop_front(); + } +} + +void ConnectionImpl::pushRefresh(Status status) { + pushRefresh([status]() { return status; }); +} + +Date_t ConnectionImpl::getLastUsed() const { + return _lastUsed; +} + +bool ConnectionImpl::isFailed() const { + return _isFailed; +} + +void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) { + _timer.setTimeout(timeout, cb); +} + +void ConnectionImpl::cancelTimeout() { + _timer.cancelTimeout(); +} + +void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) { + _setupCallback = std::move(cb); + + _timer.setTimeout( + timeout, + [this] { _setupCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); }); + + _setupQueue.push_back(this); + + if (_pushSetupQueue.size()) { + _setupQueue.front()->_setupCallback(_setupQueue.front(), _pushSetupQueue.front()()); + _setupQueue.pop_front(); + _pushSetupQueue.pop_front(); + } +} + +void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) { + _refreshCallback = std::move(cb); + + _timer.setTimeout( + timeout, + [this] { _refreshCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); }); + + _refreshQueue.push_back(this); + + if (_pushRefreshQueue.size()) { + _refreshQueue.front()->_refreshCallback(_refreshQueue.front(), _pushRefreshQueue.front()()); + _refreshQueue.pop_front(); + _pushRefreshQueue.pop_front(); + } +} + +size_t ConnectionImpl::getGeneration() const { + return _generation; +} + +std::deque<ConnectionImpl::PushSetupCallback> ConnectionImpl::_pushSetupQueue; +std::deque<ConnectionImpl::PushRefreshCallback> ConnectionImpl::_pushRefreshQueue; +std::deque<ConnectionImpl*> ConnectionImpl::_setupQueue; +std::deque<ConnectionImpl*> ConnectionImpl::_refreshQueue; +size_t ConnectionImpl::_idCounter = 1; + +std::unique_ptr<ConnectionPool::ConnectionInterface> PoolImpl::makeConnection( + const HostAndPort& hostAndPort, size_t generation) { + return stdx::make_unique<ConnectionImpl>(hostAndPort, generation, this); +} + +std::unique_ptr<ConnectionPool::TimerInterface> PoolImpl::makeTimer() { + return stdx::make_unique<TimerImpl>(this); +} + +Date_t PoolImpl::now() { + return _now.get_value_or(Date_t::now()); +} + +void PoolImpl::setNow(Date_t now) { + _now = now; + TimerImpl::fireIfNecessary(); +} + +boost::optional<Date_t> PoolImpl::_now; + +} // namespace connection_pool_test_details +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h new file mode 100644 index 00000000000..684b458fd28 --- /dev/null +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -0,0 +1,163 @@ +/** * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <deque> +#include <memory> +#include <set> + +#include "mongo/executor/connection_pool.h" + +namespace mongo { +namespace executor { +namespace connection_pool_test_details { + +class ConnectionPoolTest; +class PoolImpl; + +/** + * Mock interface for the timer + */ +class TimerImpl final : public ConnectionPool::TimerInterface { +public: + TimerImpl(PoolImpl* global); + ~TimerImpl() override; + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; + + void cancelTimeout() override; + + // launches all timers for whom now() has passed + static void fireIfNecessary(); + + // dump all timers + static void clear(); + +private: + static std::set<TimerImpl*> _timers; + + TimeoutCallback _cb; + PoolImpl* _global; + Date_t _expiration; +}; + +/** + * Mock interface for the connections + * + * pushSetup() and pushRefresh() calls can be queued up ahead of time (in which + * case callbacks immediately fire), or calls queue up and pushSetup() and + * pushRefresh() fire as they're called. + */ +class ConnectionImpl final : public ConnectionPool::ConnectionInterface { +public: + using PushSetupCallback = stdx::function<Status()>; + using PushRefreshCallback = stdx::function<Status()>; + + ConnectionImpl(const HostAndPort& hostAndPort, size_t generation, PoolImpl* global); + + size_t id() const; + + void indicateUsed() override; + + void indicateFailed() override; + + const HostAndPort& getHostAndPort() const override; + + // Dump all connection callbacks + static void clear(); + + // Push either a callback that returns the status for a setup, or just the Status + static void pushSetup(PushSetupCallback status); + static void pushSetup(Status status); + + // Push either a callback that returns the status for a refresh, or just the Status + static void pushRefresh(PushRefreshCallback status); + static void pushRefresh(Status status); + +private: + Date_t getLastUsed() const override; + + bool isFailed() const override; + + void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; + + void cancelTimeout() override; + + void setup(Milliseconds timeout, SetupCallback cb) override; + + void refresh(Milliseconds timeout, RefreshCallback cb) override; + + size_t getGeneration() const override; + + HostAndPort _hostAndPort; + Date_t _lastUsed; + bool _isFailed = false; + SetupCallback _setupCallback; + RefreshCallback _refreshCallback; + TimerImpl _timer; + PoolImpl* _global; + size_t _id; + size_t _generation; + + // Answer queues + static std::deque<PushSetupCallback> _pushSetupQueue; + static std::deque<PushRefreshCallback> _pushRefreshQueue; + + // Question queues + static std::deque<ConnectionImpl*> _setupQueue; + static std::deque<ConnectionImpl*> _refreshQueue; + + static size_t _idCounter; +}; + +/** + * Mock for the pool implementation + */ +class PoolImpl final : public ConnectionPool::DependentTypeFactoryInterface { + friend class ConnectionImpl; + +public: + std::unique_ptr<ConnectionPool::ConnectionInterface> makeConnection( + const HostAndPort& hostAndPort, size_t generation) override; + + std::unique_ptr<ConnectionPool::TimerInterface> makeTimer() override; + + Date_t now() override; + + /** + * setNow() can be used to fire all timers that have passed a point in time + */ + static void setNow(Date_t now); + +private: + ConnectionPool* _pool = nullptr; + + static boost::optional<Date_t> _now; +}; + +} // namespace connection_pool_test_details +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 3e43011c7b2..49e6e2475d8 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -36,6 +36,7 @@ #include "mongo/executor/async_stream_interface.h" #include "mongo/executor/async_stream_factory.h" +#include "mongo/executor/connection_pool_asio.h" #include "mongo/stdx/chrono.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -47,18 +48,22 @@ namespace mongo { namespace executor { NetworkInterfaceASIO::NetworkInterfaceASIO( - std::unique_ptr<AsyncStreamFactoryInterface> streamFactory) - : NetworkInterfaceASIO(std::move(streamFactory), nullptr) {} + std::unique_ptr<AsyncStreamFactoryInterface> streamFactory, Options options) + : NetworkInterfaceASIO(std::move(streamFactory), nullptr, std::move(options)) {} NetworkInterfaceASIO::NetworkInterfaceASIO( std::unique_ptr<AsyncStreamFactoryInterface> streamFactory, - std::unique_ptr<NetworkConnectionHook> networkConnectionHook) - : _io_service(), + std::unique_ptr<NetworkConnectionHook> networkConnectionHook, + Options options) + : _options(std::move(options)), + _io_service(), _hook(std::move(networkConnectionHook)), _resolver(_io_service), _state(State::kReady), _streamFactory(std::move(streamFactory)), - _isExecutorRunnable(false) {} + _isExecutorRunnable(false), + _connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this), + _options.connectionPoolOptions) {} std::string NetworkInterfaceASIO::getDiagnosticString() { str::stream output; @@ -126,16 +131,62 @@ Date_t NetworkInterfaceASIO::now() { void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { - auto ownedOp = stdx::make_unique<AsyncOp>(this, cbHandle, request, onFinish, now()); - - AsyncOp* op = ownedOp.get(); - { stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgress.emplace(op, std::move(ownedOp)); + _inGetConnection.push_back(cbHandle); } - asio::post(_io_service, [this, op]() { _startCommand(op); }); + auto startTime = now(); + + auto nextStep = [this, startTime, cbHandle, request, onFinish]( + StatusWith<ConnectionPool::ConnectionHandle> swConn) { + + if (!swConn.isOK()) { + { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + + auto& v = _inGetConnection; + auto iter = std::find(v.begin(), v.end(), cbHandle); + if (iter != v.end()) + v.erase(iter); + } + + onFinish(swConn.getStatus()); + return; + } + + auto conn = static_cast<connection_pool_asio::ASIOConnection*>(swConn.getValue().get()); + + auto ownedOp = conn->releaseAsyncOp(); + AsyncOp* op = ownedOp.get(); + + { + stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); + + auto iter = std::find(_inGetConnection.begin(), _inGetConnection.end(), cbHandle); + + // If we didn't find the request, we've been canceled + if (iter == _inGetConnection.end()) + return; + + _inGetConnection.erase(iter); + _inProgress.emplace(op, std::move(ownedOp)); + } + + op->_cbHandle = cbHandle; + op->_request = request; + op->_onFinish = onFinish; + op->_connectionPoolHandle = std::move(swConn.getValue()); + op->_start = startTime; + + _beginCommunication(op); + }; + + // TODO: thread some higher level timeout through, rather than 10 seconds, + // once we make timeouts pervasive in this api. + asio::post( + _io_service, + [this, request, nextStep] { _connectionPool.get(request.target, Seconds(10), nextStep); }); } void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index f36cbf12e0d..c5679ed6be9 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -35,9 +35,11 @@ #include <string> #include <system_error> #include <unordered_map> +#include <vector> #include "mongo/base/status.h" #include "mongo/base/system_error.h" +#include "mongo/executor/connection_pool.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/remote_command_request.h" @@ -53,6 +55,12 @@ namespace mongo { namespace executor { +namespace connection_pool_asio { +class ASIOConnection; +class ASIOTimer; +class ASIOImpl; +} + class AsyncStreamFactoryInterface; class AsyncStreamInterface; @@ -61,10 +69,20 @@ class AsyncStreamInterface; * Kohlhoff's ASIO library instead of existing MongoDB networking primitives. */ class NetworkInterfaceASIO final : public NetworkInterface { + friend class connection_pool_asio::ASIOConnection; + friend class connection_pool_asio::ASIOTimer; + friend class connection_pool_asio::ASIOImpl; + public: + struct Options { + ConnectionPool::Options connectionPoolOptions; + }; + + NetworkInterfaceASIO(std::unique_ptr<AsyncStreamFactoryInterface> streamFactory, + std::unique_ptr<NetworkConnectionHook> networkConnectionHook, + Options = Options()); NetworkInterfaceASIO(std::unique_ptr<AsyncStreamFactoryInterface> streamFactory, - std::unique_ptr<NetworkConnectionHook> networkConnectionHook); - NetworkInterfaceASIO(std::unique_ptr<AsyncStreamFactoryInterface> streamFactory); + Options = Options()); std::string getDiagnosticString() override; std::string getHostName() override; void startup() override; @@ -172,6 +190,8 @@ private: * Helper object to manage individual network operations. */ class AsyncOp { + friend class NetworkInterfaceASIO; + public: AsyncOp(NetworkInterfaceASIO* net, const TaskExecutor::CallbackHandle& cbHandle, @@ -215,6 +235,10 @@ private: RemoteCommandRequest _request; RemoteCommandCompletionFn _onFinish; + // AsyncOp's have a handle to their connection pool handle. They are + // also owned by it when they're in the pool + ConnectionPool::ConnectionHandle _connectionPoolHandle; + /** * The connection state used to service this request. We wrap it in an optional * as it is instantiated at some point after the AsyncOp is created. @@ -227,7 +251,7 @@ private: */ boost::optional<rpc::Protocol> _operationProtocol; - const Date_t _start; + Date_t _start; AtomicUInt64 _canceled; @@ -237,6 +261,7 @@ private: * representing its current running or next-to-be-run command, if there is one. */ boost::optional<AsyncCommand> _command; + bool _inSetup; }; void _startCommand(AsyncOp* op); @@ -280,6 +305,8 @@ private: void _asyncRunCommand(AsyncCommand* cmd, NetworkOpHandler handler); + Options _options; + asio::io_service _io_service; stdx::thread _serviceRunner; @@ -293,10 +320,13 @@ private: stdx::mutex _inProgressMutex; std::unordered_map<AsyncOp*, std::unique_ptr<AsyncOp>> _inProgress; + std::vector<TaskExecutor::CallbackHandle> _inGetConnection; stdx::mutex _executorMutex; bool _isExecutorRunnable; stdx::condition_variable _isExecutorRunnableCondition; + + ConnectionPool _connectionPool; }; template <typename T, typename R, typename... MethodArgs, typename... DeducedArgs> diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index 452b26a6aec..889ba5c11d5 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -35,6 +35,8 @@ #include <type_traits> #include <utility> +#include "mongo/executor/connection_pool_asio.h" +#include "mongo/executor/async_stream_interface.h" #include "mongo/db/dbmessage.h" #include "mongo/db/jsobj.h" #include "mongo/executor/async_stream_interface.h" @@ -204,6 +206,19 @@ void NetworkInterfaceASIO::_startCommand(AsyncOp* op) { } void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { + // The way that we connect connections for the connection pool is by + // starting the callback chain with connect(), but getting off at the first + // _beginCommunication. I.e. all AsyncOp's start off with _inSetup == true + // and arrive here as they're connected and authed. Once they hit here, we + // return to the connection pool's get() callback with _inSetup == false, + // so we can proceed with user operations after they return to this + // codepath. + if (op->_inSetup) { + op->_inSetup = false; + op->finish(op->command()->response(rpc::Protocol::kOpQuery, now())); + return; + } + auto beginStatus = op->beginCommand(op->request()); if (!beginStatus.isOK()) { return _completeOperation(op, beginStatus); @@ -237,13 +252,30 @@ void NetworkInterfaceASIO::_networkErrorCallback(AsyncOp* op, const std::error_c void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& resp) { op->finish(resp); + std::unique_ptr<AsyncOp> ownedOp; + { - // NOTE: op will be deleted in the call to erase() below. - // It is invalid to reference op after this point. stdx::lock_guard<stdx::mutex> lk(_inProgressMutex); - _inProgress.erase(op); + + auto iter = _inProgress.find(op); + + // We're in connection start + if (iter == _inProgress.end()) { + return; + } + + ownedOp = std::move(iter->second); + _inProgress.erase(iter); } + invariant(ownedOp); + + auto conn = std::move(op->_connectionPoolHandle); + auto asioConn = static_cast<connection_pool_asio::ASIOConnection*>(conn.get()); + + asioConn->bindAsyncOp(std::move(ownedOp)); + asioConn->indicateUsed(); + signalWorkAvailable(); } diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index 069ed7f98bc..44be4846d5f 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -30,9 +30,12 @@ #include "mongo/platform/basic.h" +#include "mongo/executor/network_interface_asio.h" + #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/lite_parsed_query.h" #include "mongo/executor/async_stream_interface.h" +#include "mongo/executor/connection_pool_asio.h" #include "mongo/executor/downconvert_find_and_getmore_commands.h" #include "mongo/executor/network_interface_asio.h" #include "mongo/rpc/factory.h" @@ -75,7 +78,8 @@ NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner, _request(request), _onFinish(onFinish), _start(now), - _canceled(0) {} + _canceled(0), + _inSetup(true) {} void NetworkInterfaceASIO::AsyncOp::cancel() { // An operation may be in mid-flight when it is canceled, so we |