diff options
author | Jason Carey <jcarey@argv.me> | 2015-09-02 16:19:04 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2015-09-09 13:28:53 -0400 |
commit | 08557952c61958166265817ca8bd0c991c01dd59 (patch) | |
tree | 5095403149b1f1109bd20977429a2c3afb954968 /src/mongo | |
parent | 80eff9c7d945b0c447261de8733a0fd154e32d27 (diff) | |
download | mongo-08557952c61958166265817ca8bd0c991c01dd59.tar.gz |
SERVER-19769 Fixups for Connection Pooling
Various fixes for connection pooling
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 84 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 8 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_asio.cpp | 52 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_asio.h | 10 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_asio_integration_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.cpp | 8 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool_test_fixture.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio.cpp | 9 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio.h | 4 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_asio_command.cpp | 8 |
11 files changed, 116 insertions, 77 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index a5e2d9e2e91..7afa474a5e4 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -56,8 +56,6 @@ public: SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort); ~SpecificPool(); - void dropConnections(stdx::unique_lock<stdx::mutex> lk); - /** * Get's a connection from the specific pool. Sinks a unique_lock from the * parent to preserve the lock on _mutex @@ -68,6 +66,13 @@ public: GetConnectionCallback cb); /** + * Cascade a failure across existing connections and requests. Invoking + * this function drops all current connections and fails all current + * requests with the passed status. + */ + void processFailure(const Status& status, stdx::unique_lock<stdx::mutex> lk); + + /** * Returns a connection to a specific pool. Sinks a unique_lock from the * parent to preserve the lock on _mutex */ @@ -85,8 +90,6 @@ private: void addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn); - void failAllRequests(const Status& status, stdx::unique_lock<stdx::mutex> lk); - void fulfillRequests(stdx::unique_lock<stdx::mutex>& lk); void spawnConnections(stdx::unique_lock<stdx::mutex>& lk, const HostAndPort& hostAndPort); @@ -143,8 +146,8 @@ private: // TODO: revisit these durations when we come up with a more pervasive solution // for NetworkInterfaceASIO's timers -Milliseconds const ConnectionPool::kDefaultRefreshTimeout = Seconds(30); -Milliseconds const ConnectionPool::kDefaultRefreshRequirement = Minutes(1); +Milliseconds const ConnectionPool::kDefaultRefreshTimeout = Minutes(5); +Milliseconds const ConnectionPool::kDefaultRefreshRequirement = Minutes(5); Milliseconds const ConnectionPool::kDefaultHostTimeout = Minutes(5); ConnectionPool::ConnectionPool(std::unique_ptr<DependentTypeFactoryInterface> impl, Options options) @@ -160,7 +163,8 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) { if (iter == _pools.end()) return; - iter->second.get()->dropConnections(std::move(lk)); + iter->second.get()->processFailure( + Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk)); } void ConnectionPool::get(const HostAndPort& hostAndPort, @@ -207,21 +211,6 @@ ConnectionPool::SpecificPool::~SpecificPool() { DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();) } -void ConnectionPool::SpecificPool::dropConnections(stdx::unique_lock<stdx::mutex> lk) { - _generation++; - - _readyPool.clear(); - - for (auto&& x : _processingPool) { - _droppedProcessingPool[x.first] = std::move(x.second); - } - - _processingPool.clear(); - - failAllRequests(Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), - std::move(lk)); -} - void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort, Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk, @@ -239,15 +228,25 @@ void ConnectionPool::SpecificPool::getConnection(const HostAndPort& hostAndPort, void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr, stdx::unique_lock<stdx::mutex> lk) { auto needsRefreshTP = connPtr->getLastUsed() + _parent->_options.refreshRequirement; - auto now = _parent->_factory->now(); auto conn = takeFromPool(_checkedOutPool, connPtr); - if (conn->isFailed() || conn->getGeneration() != _generation) { - // If the connection failed or has been dropped, simply let it lapse + updateStateInLock(); + + if (conn->getGeneration() != _generation) { + // If the connection is from an older generation, just return. + return; + } + + if (!conn->getStatus().isOK()) { + // If the connection failed cascade to all callers. // // TODO: alert via some callback if the host is bad - } else if (needsRefreshTP <= now) { + return processFailure(conn->getStatus(), std::move(lk)); + } + + auto now = _parent->_factory->now(); + if (needsRefreshTP <= now) { // If we need to refresh this connection if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >= @@ -284,7 +283,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr } // Otherwise pass the failure on through - failAllRequests(status, std::move(lk)); + processFailure(status, std::move(lk)); }); lk.lock(); } else { @@ -295,9 +294,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr updateStateInLock(); } -/** - * Adds a live connection to the ready pool - */ +// Adds a live connection to the ready pool void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk, OwnedConnection conn) { auto connPtr = conn.get(); @@ -327,12 +324,25 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk fulfillRequests(lk); } -void ConnectionPool::SpecificPool::failAllRequests(const Status& status, - stdx::unique_lock<stdx::mutex> lk) { +// Drop connections and fail all requests +void ConnectionPool::SpecificPool::processFailure(const Status& status, + stdx::unique_lock<stdx::mutex> lk) { + // Bump the generation so we don't reuse any pending or checked out + // connections + _generation++; + + // Drop ready connections + _readyPool.clear(); + + // Migrate processing connections to the dropped pool + for (auto&& x : _processingPool) { + _droppedProcessingPool[x.first] = std::move(x.second); + } + _processingPool.clear(); + // Move the requests out so they aren't visible // in other threads decltype(_requests) requestsToFail; - { using std::swap; swap(requestsToFail, _requests); @@ -423,11 +433,9 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute // connection lapse } else if (status.isOK()) { addToReady(lk, std::move(conn)); - } else if (_requests.size()) { - // If the setup request failed, immediately fail - // all other pending requests. - - failAllRequests(status, std::move(lk)); + } else { + // If the setup failed, cascade the failure edge + processFailure(status, std::move(lk)); } }); // Note that this assumes that the refreshTimeout is sound for the diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index d5a0332b5cb..d46b18b9812 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -196,7 +196,7 @@ public: * Indicates that a connection has failed. This will prevent the connection * from re-entering the connection pool. */ - virtual void indicateFailed() = 0; + virtual void indicateFailed(Status status) = 0; /** * The HostAndPort for the connection. This should be the same as the @@ -219,10 +219,10 @@ private: virtual Date_t getLastUsed() const = 0; /** - * Returns true if the connection is failed. This implies that it should - * not be returned to the pool. + * Returns the status associated with the connection. If the status is not + * OK, the connection will not be returned to the pool. */ - virtual bool isFailed() const = 0; + virtual const Status& getStatus() const = 0; /** * Sets up the connection. This should include connection + auth + any diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp index 803d0096ddd..62a21cd40d6 100644 --- a/src/mongo/executor/connection_pool_asio.cpp +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -34,6 +34,7 @@ #include "mongo/executor/async_stream_factory_interface.h" #include "mongo/rpc/factory.h" +#include "mongo/rpc/legacy_request_builder.h" #include "mongo/rpc/reply_interface.h" #include "mongo/stdx/memory.h" @@ -41,22 +42,37 @@ namespace mongo { namespace executor { namespace connection_pool_asio { -ASIOTimer::ASIOTimer(asio::io_service* io_service) : _io_service(io_service), _impl(*io_service) {} +ASIOTimer::ASIOTimer(asio::io_service* io_service) + : _io_service(io_service), _impl(*io_service), _canceled(true) {} + +ASIOTimer::~ASIOTimer() { + cancelTimeout(); +} void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) { _cb = std::move(cb); cancelTimeout(); _impl.expires_after(timeout); + _canceled = false; + _impl.async_wait([this](const asio::error_code& error) { - auto cb = std::move(_cb); + if (error == asio::error::operation_aborted) { + return; + } + + bool wasCanceled = _canceled; + _canceled = true; - if (error != asio::error::operation_aborted) + if (!wasCanceled) { + auto cb = std::move(_cb); cb(); + } }); } void ASIOTimer::cancelTimeout() { + _canceled = true; _impl.cancel(); } @@ -71,8 +87,8 @@ void ASIOConnection::indicateUsed() { _lastUsed = _global->now(); } -void ASIOConnection::indicateFailed() { - _isFailed = true; +void ASIOConnection::indicateFailed(Status status) { + _status = std::move(status); } const HostAndPort& ASIOConnection::getHostAndPort() const { @@ -83,8 +99,8 @@ Date_t ASIOConnection::getLastUsed() const { return _lastUsed; } -bool ASIOConnection::isFailed() const { - return _isFailed; +const Status& ASIOConnection::getStatus() const { + return _status; } size_t ASIOConnection::getGeneration() const { @@ -95,7 +111,8 @@ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOC return stdx::make_unique<NetworkInterfaceASIO::AsyncOp>( conn->_global->_impl, TaskExecutor::CallbackHandle(), - makeIsMasterRequest(conn), + RemoteCommandRequest{ + conn->getHostAndPort(), std::string("admin"), BSON("isMaster" << 1), BSONObj()}, [conn](const TaskExecutor::ResponseStatus& status) { auto cb = std::move(conn->_setupCallback); cb(conn, status.isOK() ? Status::OK() : status.getStatus()); @@ -103,8 +120,14 @@ std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::makeAsyncOp(ASIOC conn->_global->now()); } -RemoteCommandRequest ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) { - return {conn->_hostAndPort, std::string("admin"), BSON("isMaster" << 1), BSONObj()}; +Message ASIOConnection::makeIsMasterRequest(ASIOConnection* conn) { + rpc::LegacyRequestBuilder requestBuilder{}; + requestBuilder.setDatabase("admin"); + requestBuilder.setCommandName("isMaster"); + requestBuilder.setMetadata(rpc::makeEmptyMetadata()); + requestBuilder.setCommandArgs(BSON("isMaster" << 1)); + + return std::move(*(requestBuilder.done())); } void ASIOConnection::setTimeout(Milliseconds timeout, TimeoutCallback cb) { @@ -138,6 +161,7 @@ void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { if (!beginStatus.isOK()) { auto cb = std::move(_refreshCallback); cb(this, beginStatus); + return; } _global->_impl->_asyncRunCommand( @@ -145,11 +169,11 @@ void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { [this, op](std::error_code ec, size_t bytes) { cancelTimeout(); - if (ec) { - return _refreshCallback(this, Status(ErrorCodes::HostUnreachable, ec.message())); - } - auto cb = std::move(_refreshCallback); + + if (ec) + return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); + cb(this, Status::OK()); }); } diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h index 296a5d20859..b99db55aa31 100644 --- a/src/mongo/executor/connection_pool_asio.h +++ b/src/mongo/executor/connection_pool_asio.h @@ -42,6 +42,7 @@ namespace connection_pool_asio { class ASIOTimer final : public ConnectionPool::TimerInterface { public: ASIOTimer(asio::io_service* service); + ~ASIOTimer(); void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; void cancelTimeout() override; @@ -50,6 +51,7 @@ private: TimeoutCallback _cb; asio::io_service* const _io_service; asio::steady_timer _impl; + bool _canceled; }; /** @@ -62,7 +64,7 @@ public: ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global); void indicateUsed() override; - void indicateFailed() override; + void indicateFailed(Status status) override; const HostAndPort& getHostAndPort() const override; std::unique_ptr<NetworkInterfaceASIO::AsyncOp> releaseAsyncOp(); @@ -70,7 +72,7 @@ public: private: Date_t getLastUsed() const override; - bool isFailed() const override; + const Status& getStatus() const override; void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; void cancelTimeout() override; @@ -81,7 +83,7 @@ private: size_t getGeneration() const override; static std::unique_ptr<NetworkInterfaceASIO::AsyncOp> makeAsyncOp(ASIOConnection* conn); - static RemoteCommandRequest makeIsMasterRequest(ASIOConnection* conn); + static Message makeIsMasterRequest(ASIOConnection* conn); private: SetupCallback _setupCallback; @@ -89,7 +91,7 @@ private: ASIOImpl* const _global; ASIOTimer _timer; Date_t _lastUsed; - bool _isFailed = false; + Status _status = Status::OK(); HostAndPort _hostAndPort; size_t _generation; std::unique_ptr<NetworkInterfaceASIO::AsyncOp> _impl; diff --git a/src/mongo/executor/connection_pool_asio_integration_test.cpp b/src/mongo/executor/connection_pool_asio_integration_test.cpp index 782e00aa28b..9c82a7e13e4 100644 --- a/src/mongo/executor/connection_pool_asio_integration_test.cpp +++ b/src/mongo/executor/connection_pool_asio_integration_test.cpp @@ -91,7 +91,7 @@ TEST(ConnectionPoolASIO, TestPing) { net.startup(); auto guard = MakeGuard([&] { net.shutdown(); }); - const int N = 500; + const int N = 50; std::array<stdx::thread, N> threads; diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index be951ccbafc..12f59572dd2 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -99,7 +99,7 @@ TEST_F(ConnectionPoolTest, FailedConnDifferentConn) { Milliseconds(5000), [&](StatusWith<ConnectionPool::ConnectionHandle> swConn) { conn1Id = CONN2ID(swConn); - swConn.getValue()->indicateFailed(); + swConn.getValue()->indicateFailed(Status(ErrorCodes::BadValue, "error")); }); // Grab the second id diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index 860275ebf39..e7c9703984f 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -81,8 +81,8 @@ void ConnectionImpl::indicateUsed() { _lastUsed = _global->now(); } -void ConnectionImpl::indicateFailed() { - _isFailed = true; +void ConnectionImpl::indicateFailed(Status status) { + _status = std::move(status); } size_t ConnectionImpl::id() const { @@ -132,8 +132,8 @@ Date_t ConnectionImpl::getLastUsed() const { return _lastUsed; } -bool ConnectionImpl::isFailed() const { - return _isFailed; +const Status& ConnectionImpl::getStatus() const { + return _status; } void ConnectionImpl::setTimeout(Milliseconds timeout, TimeoutCallback cb) { diff --git a/src/mongo/executor/connection_pool_test_fixture.h b/src/mongo/executor/connection_pool_test_fixture.h index 684b458fd28..edc83c4c401 100644 --- a/src/mongo/executor/connection_pool_test_fixture.h +++ b/src/mongo/executor/connection_pool_test_fixture.h @@ -82,7 +82,7 @@ public: void indicateUsed() override; - void indicateFailed() override; + void indicateFailed(Status status) override; const HostAndPort& getHostAndPort() const override; @@ -100,7 +100,7 @@ public: private: Date_t getLastUsed() const override; - bool isFailed() const override; + const Status& getStatus() const override; void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; @@ -114,7 +114,7 @@ private: HostAndPort _hostAndPort; Date_t _lastUsed; - bool _isFailed = false; + Status _status = Status::OK(); SetupCallback _setupCallback; RefreshCallback _refreshCallback; TimerImpl _timer; diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index c4c31596049..01b36857900 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -61,9 +61,9 @@ NetworkInterfaceASIO::NetworkInterfaceASIO( _resolver(_io_service), _state(State::kReady), _streamFactory(std::move(streamFactory)), - _isExecutorRunnable(false), _connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this), - _options.connectionPoolOptions) {} + _options.connectionPoolOptions), + _isExecutorRunnable(false) {} std::string NetworkInterfaceASIO::getDiagnosticString() { str::stream output; @@ -152,6 +152,7 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa } onFinish(swConn.getStatus()); + signalWorkAvailable(); return; } @@ -182,11 +183,11 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa _beginCommunication(op); }; - // TODO: thread some higher level timeout through, rather than 10 seconds, + // TODO: thread some higher level timeout through, rather than 5 minutes, // once we make timeouts pervasive in this api. asio::post( _io_service, - [this, request, nextStep] { _connectionPool.get(request.target, Seconds(10), nextStep); }); + [this, request, nextStep] { _connectionPool.get(request.target, Minutes(5), nextStep); }); } void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index c2f01806b10..0577728e741 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -319,6 +319,8 @@ private: std::unique_ptr<AsyncStreamFactoryInterface> _streamFactory; + ConnectionPool _connectionPool; + stdx::mutex _inProgressMutex; std::unordered_map<AsyncOp*, std::unique_ptr<AsyncOp>> _inProgress; std::vector<TaskExecutor::CallbackHandle> _inGetConnection; @@ -326,8 +328,6 @@ private: stdx::mutex _executorMutex; bool _isExecutorRunnable; stdx::condition_variable _isExecutorRunnableCondition; - - ConnectionPool _connectionPool; }; template <typename T, typename R, typename... MethodArgs, typename... DeducedArgs> diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index 889ba5c11d5..8d20c9ac62e 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -215,7 +215,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { // codepath. if (op->_inSetup) { op->_inSetup = false; - op->finish(op->command()->response(rpc::Protocol::kOpQuery, now())); + op->finish(RemoteCommandResponse()); return; } @@ -274,7 +274,11 @@ void NetworkInterfaceASIO::_completeOperation(AsyncOp* op, const ResponseStatus& auto asioConn = static_cast<connection_pool_asio::ASIOConnection*>(conn.get()); asioConn->bindAsyncOp(std::move(ownedOp)); - asioConn->indicateUsed(); + if (!resp.isOK()) { + asioConn->indicateFailed(resp.getStatus()); + } else { + asioConn->indicateUsed(); + } signalWorkAvailable(); } |