diff options
Diffstat (limited to 'src/mongo/executor/connection_pool.cpp')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 412 |
1 files changed, 185 insertions, 227 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 337e0b52c49..581ec8e3194 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -63,9 +63,9 @@ class ConnectionPool::SpecificPool final : public std::enable_shared_from_this<ConnectionPool::SpecificPool> { public: /** - * These active client methods must be used whenever entering a specific pool outside of the - * shutdown background task. The presence of an active client will bump a counter on the - * specific pool which will prevent the shutdown thread from deleting it. + * Whenever a function enters a specific pool, the function needs to be guarded. + * The presence of one of these guards will bump a counter on the specific pool + * which will prevent the pool from removing itself from the map of pools. * * The complexity comes from the need to hold a lock when writing to the * _activeClients param on the specific pool. Because the code beneath the client needs to lock @@ -73,32 +73,28 @@ public: * lock acquired, move it into the client, then re-acquire to decrement the counter on the way * out. * - * It's used like: + * This callback also (perhaps overly aggressively) binds a shared pointer to the guard. + * It is *always* safe to reference the original specific pool in the guarded function object. * - * pool.runWithActiveClient([](stdx::unique_lock<stdx::mutex> lk){ codeToBeProtected(); }); + * For a function object of signature: + * R riskyBusiness(stdx::unique_lock<stdx::mutex>, ArgTypes...); + * + * It returns a function object of signature: + * R safeCallback(ArgTypes...); */ template <typename Callback> - auto runWithActiveClient(Callback&& cb) { - return runWithActiveClient(stdx::unique_lock<stdx::mutex>(_parent->_mutex), - std::forward<Callback>(cb)); - } - - template <typename Callback> - auto runWithActiveClient(stdx::unique_lock<stdx::mutex> lk, Callback&& cb) { - invariant(lk.owns_lock()); - - _activeClients++; - - const auto guard = MakeGuard([&] { - invariant(!lk.owns_lock()); - stdx::lock_guard<stdx::mutex> lk(_parent->_mutex); - _activeClients--; - }); + auto guardCallback(Callback&& cb) { + return [ cb = std::forward<Callback>(cb), anchor = shared_from_this() ](auto&&... args) { + stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex); + ++(anchor->_activeClients); + + ON_BLOCK_EXIT([anchor]() { + stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex); + --(anchor->_activeClients); + }); - { - decltype(lk) localLk(std::move(lk)); - return cb(std::move(localLk)); - } + return cb(std::move(lk), std::forward<decltype(args)>(args)...); + }; } SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort); @@ -113,6 +109,14 @@ public: stdx::unique_lock<stdx::mutex> lk); /** + * Triggers the shutdown procedure. This function marks the state as kInShutdown + * and calls processFailure below with the status provided. This may not immediately + * delist or destruct this pool. However, both will happen eventually as ConnectionHandles + * are deleted. + */ + void triggerShutdown(const Status& status, stdx::unique_lock<stdx::mutex> lk); + + /** * Cascades a failure across existing connections and requests. Invoking * this function drops all current connections and fails all current * requests with the passed status. @@ -169,18 +173,6 @@ public: _tags = mutateFunc(_tags); } - /** - * See runWithActiveClient for what this controls, and be very very careful to manage the - * refcount correctly. - */ - void incActiveClients(const stdx::unique_lock<stdx::mutex>& lk) { - _activeClients++; - } - - void decActiveClients(const stdx::unique_lock<stdx::mutex>& lk) { - _activeClients--; - } - private: using OwnedConnection = std::shared_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; @@ -198,8 +190,6 @@ private: void spawnConnections(stdx::unique_lock<stdx::mutex>& lk); - void shutdown(); - template <typename OwnershipPoolType> typename OwnershipPoolType::mapped_type takeFromPool( OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr); @@ -292,33 +282,15 @@ ConnectionPool::~ConnectionPool() { void ConnectionPool::shutdown() { _factory->shutdown(); - std::vector<SpecificPool*> pools; - - // Ensure we decrement active clients for all pools that we inc on (because we intend to process - // failures) - const auto guard = MakeGuard([&] { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - for (const auto& pool : pools) { - pool->decActiveClients(lk); - } - }); - // Grab all current pools (under the lock) - { + auto pools = [&] { stdx::unique_lock<stdx::mutex> lk(_mutex); + return _pools; + }(); - for (auto& pair : _pools) { - pools.push_back(pair.second.get()); - pair.second->incActiveClients(lk); - } - } - - // Reacquire the lock per pool and process failures. We'll dec active clients when we're all - // through in the guard - for (const auto& pool : pools) { + for (const auto& pair : pools) { stdx::unique_lock<stdx::mutex> lk(_mutex); - pool->processFailure( + pair.second->triggerShutdown( Status(ErrorCodes::ShutdownInProgress, "Shutting down the connection pool"), std::move(lk)); } @@ -332,42 +304,25 @@ void ConnectionPool::dropConnections(const HostAndPort& hostAndPort) { if (iter == _pools.end()) return; - iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { - iter->second->processFailure( - Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), - std::move(lk)); - }); + auto pool = iter->second; + pool->processFailure(Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), + std::move(lk)); } void ConnectionPool::dropConnections(transport::Session::TagMask tags) { - std::vector<SpecificPool*> pools; - - // Ensure we decrement active clients for all pools that we inc on (because we intend to process - // failures) - const auto guard = MakeGuard([&] { + // Grab all current pools (under the lock) + auto pools = [&] { stdx::unique_lock<stdx::mutex> lk(_mutex); + return _pools; + }(); - for (const auto& pool : pools) { - pool->decActiveClients(lk); - } - }); + for (const auto& pair : pools) { + auto& pool = pair.second; - // Grab all current pools that don't match tags (under the lock) - { stdx::unique_lock<stdx::mutex> lk(_mutex); + if (pool->matchesTags(lk, tags)) + continue; - for (auto& pair : _pools) { - if (!pair.second->matchesTags(lk, tags)) { - pools.push_back(pair.second.get()); - pair.second->incActiveClients(lk); - } - } - } - - // Reacquire the lock per pool and process failures. We'll dec active clients when we're all - // through in the guard - for (const auto& pool : pools) { - stdx::unique_lock<stdx::mutex> lk(_mutex); pool->processFailure( Status(ErrorCodes::PooledConnectionsDropped, "Pooled connections dropped"), std::move(lk)); @@ -384,7 +339,8 @@ void ConnectionPool::mutateTags( if (iter == _pools.end()) return; - iter->second->mutateTags(lk, mutateFunc); + auto pool = iter->second; + pool->mutateTags(lk, mutateFunc); } void ConnectionPool::get(const HostAndPort& hostAndPort, @@ -395,25 +351,22 @@ void ConnectionPool::get(const HostAndPort& hostAndPort, Future<ConnectionPool::ConnectionHandle> ConnectionPool::get(const HostAndPort& hostAndPort, Milliseconds timeout) { - SpecificPool* pool; + std::shared_ptr<SpecificPool> pool; stdx::unique_lock<stdx::mutex> lk(_mutex); auto iter = _pools.find(hostAndPort); if (iter == _pools.end()) { - auto handle = stdx::make_unique<SpecificPool>(this, hostAndPort); - pool = handle.get(); - _pools[hostAndPort] = std::move(handle); + pool = stdx::make_unique<SpecificPool>(this, hostAndPort); + _pools[hostAndPort] = pool; } else { - pool = iter->second.get(); + pool = iter->second; } invariant(pool); - return pool->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { - return pool->getConnection(hostAndPort, timeout, std::move(lk)); - }); + return pool->getConnection(hostAndPort, timeout, std::move(lk)); } void ConnectionPool::appendConnectionStats(ConnectionPoolStats* stats) const { @@ -450,9 +403,8 @@ void ConnectionPool::returnConnection(ConnectionInterface* conn) { str::stream() << "Tried to return connection but no pool found for " << conn->getHostAndPort()); - iter->second->runWithActiveClient(std::move(lk), [&](decltype(lk) lk) { - iter->second->returnConnection(conn, std::move(lk)); - }); + auto pool = iter->second; + pool->returnConnection(conn, std::move(lk)); } ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAndPort& hostAndPort) @@ -469,6 +421,9 @@ ConnectionPool::SpecificPool::SpecificPool(ConnectionPool* parent, const HostAnd ConnectionPool::SpecificPool::~SpecificPool() { DESTRUCTOR_GUARD(_requestTimer->cancelTimeout();) + + invariant(_requests.empty()); + invariant(_checkedOutPool.empty()); } size_t ConnectionPool::SpecificPool::inUseConnections(const stdx::unique_lock<stdx::mutex>& lk) { @@ -495,6 +450,8 @@ size_t ConnectionPool::SpecificPool::openConnections(const stdx::unique_lock<std Future<ConnectionPool::ConnectionHandle> ConnectionPool::SpecificPool::getConnection( const HostAndPort& hostAndPort, Milliseconds timeout, stdx::unique_lock<stdx::mutex> lk) { + invariant(_state != State::kInShutdown); + if (timeout < Milliseconds(0) || timeout > _parent->_options.refreshTimeout) { timeout = _parent->_options.refreshTimeout; } @@ -555,46 +512,43 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr // Unlock in case refresh can occur immediately lk.unlock(); - connPtr->refresh( - _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) { - runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { - auto conn = takeFromProcessingPool(connPtr); - - // If the host and port were dropped, let this lapse - if (conn->getGeneration() != _generation) { - spawnConnections(lk); - return; - } - - // If we're in shutdown, we don't need refreshed connections - if (_state == State::kInShutdown) - return; - - // If the connection refreshed successfully, throw it back in - // the ready pool - if (status.isOK()) { - addToReady(lk, std::move(conn)); - spawnConnections(lk); - return; - } - - // If we've exceeded the time limit, start a new connect, - // rather than failing all operations. We do this because the - // various callers have their own time limit which is unrelated - // to our internal one. - if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { - log() << "Pending connection to host " << _hostAndPort - << " did not complete within the connection timeout," - << " retrying with a new connection;" << openConnections(lk) - << " connections to that host remain open"; - spawnConnections(lk); - return; - } - - // Otherwise pass the failure on through - processFailure(status, std::move(lk)); - }); - }); + connPtr->refresh(_parent->_options.refreshTimeout, + guardCallback([this](stdx::unique_lock<stdx::mutex> lk, + ConnectionInterface* connPtr, + Status status) { + auto conn = takeFromProcessingPool(connPtr); + + // If we're in shutdown, we don't need refreshed connections + if (_state == State::kInShutdown) + return; + + // If the connection refreshed successfully, throw it back in + // the ready pool + if (status.isOK()) { + // If the host and port were dropped, let this lapse + if (conn->getGeneration() == _generation) { + addToReady(lk, std::move(conn)); + } + spawnConnections(lk); + return; + } + + // If we've exceeded the time limit, start a new connect, + // rather than failing all operations. We do this because the + // various callers have their own time limit which is unrelated + // to our internal one. + if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { + log() << "Pending connection to host " << _hostAndPort + << " did not complete within the connection timeout," + << " retrying with a new connection;" << openConnections(lk) + << " connections to that host remain open"; + spawnConnections(lk); + return; + } + + // Otherwise pass the failure on through + processFailure(status, std::move(lk)); + })); lk.lock(); } else { // If it's fine as it is, just put it in the ready queue @@ -616,30 +570,36 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk // immediately check them back in (which kicks off the refresh logic in // returnConnection connPtr->setTimeout(_parent->_options.refreshRequirement, - [ this, connPtr, anchor = shared_from_this() ]() { - runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { - auto conn = takeFromPool(_readyPool, connPtr); + guardCallback([this, connPtr](stdx::unique_lock<stdx::mutex> lk) { + auto conn = takeFromPool(_readyPool, connPtr); - // We've already been checked out. We don't need to refresh - // ourselves. - if (!conn) - return; + // We've already been checked out. We don't need to refresh + // ourselves. + if (!conn) + return; - // If we're in shutdown, we don't need to refresh connections - if (_state == State::kInShutdown) - return; + // If we're in shutdown, we don't need to refresh connections + if (_state == State::kInShutdown) + return; - _checkedOutPool[connPtr] = std::move(conn); + _checkedOutPool[connPtr] = std::move(conn); - connPtr->indicateSuccess(); + connPtr->indicateSuccess(); - returnConnection(connPtr, std::move(lk)); - }); - }); + returnConnection(connPtr, std::move(lk)); + })); fulfillRequests(lk); } +// Sets state to shutdown and kicks off the failure protocol to tank existing connections +void ConnectionPool::SpecificPool::triggerShutdown(const Status& status, + stdx::unique_lock<stdx::mutex> lk) { + _state = State::kInShutdown; + _droppedProcessingPool.clear(); + processFailure(status, std::move(lk)); +} + // Drop connections and fail all requests void ConnectionPool::SpecificPool::processFailure(const Status& status, stdx::unique_lock<stdx::mutex> lk) { @@ -647,7 +607,11 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status, // connections _generation++; - // Drop ready connections + // When a connection enters the ready pool, its timer is set to eventually refresh the + // connection. This requires a lifetime extension of the specific pool because the connection + // timer is tied to the lifetime of the connection, not the pool. That said, we can destruct + // all of the connections and thus timers of which we have ownership. + // In short, clearing the ready pool helps the SpecificPool drain. _readyPool.clear(); // Log something helpful @@ -655,7 +619,10 @@ void ConnectionPool::SpecificPool::processFailure(const Status& status, // Migrate processing connections to the dropped pool for (auto&& x : _processingPool) { - _droppedProcessingPool[x.first] = std::move(x.second); + if (_state != State::kInShutdown) { + // If we're just dropping the pool, we can reuse them later + _droppedProcessingPool[x.first] = std::move(x.second); + } } _processingPool.clear(); @@ -731,7 +698,12 @@ void ConnectionPool::SpecificPool::fulfillRequests(stdx::unique_lock<stdx::mutex // pass it to the user connPtr->resetToUnknown(); lk.unlock(); - promise.emplaceValue(ConnectionHandle(connPtr, ConnectionHandleDeleter(_parent))); + ConnectionHandle handle(connPtr, + guardCallback([this](stdx::unique_lock<stdx::mutex> localLk, + ConnectionPool::ConnectionInterface* conn) { + returnConnection(conn, std::move(localLk)); + })); + promise.emplaceValue(std::move(handle)); lk.lock(); } } @@ -755,8 +727,10 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute }; // While all of our inflight connections are less than our target - while ((_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) && + while ((_state != State::kInShutdown) && + (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() < target()) && (_processingPool.size() < _parent->_options.maxConnecting)) { + OwnedConnection handle; try { // make a new connection and put it in processing @@ -773,28 +747,31 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute // Run the setup callback lk.unlock(); handle->setup( - _parent->_options.refreshTimeout, [this](ConnectionInterface* connPtr, Status status) { - runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { - auto conn = takeFromProcessingPool(connPtr); - - if (conn->getGeneration() != _generation) { - // If the host and port was dropped, let the - // connection lapse - spawnConnections(lk); - } else if (status.isOK()) { + _parent->_options.refreshTimeout, + guardCallback([this]( + stdx::unique_lock<stdx::mutex> lk, ConnectionInterface* connPtr, Status status) { + auto conn = takeFromProcessingPool(connPtr); + + // If we're in shutdown, we don't need this conn + if (_state == State::kInShutdown) + return; + + if (status.isOK()) { + // If the host and port was dropped, let the connection lapse + if (conn->getGeneration() == _generation) { addToReady(lk, std::move(conn)); - spawnConnections(lk); - } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { - // If we've exceeded the time limit, restart the connect, rather than - // failing all operations. We do this because the various callers - // have their own time limit which is unrelated to our internal one. - spawnConnections(lk); - } else { - // If the setup failed, cascade the failure edge - processFailure(status, std::move(lk)); } - }); - }); + spawnConnections(lk); + } else if (status.code() == ErrorCodes::NetworkInterfaceExceededTimeLimit) { + // If we've exceeded the time limit, restart the connect, rather than + // failing all operations. We do this because the various callers + // have their own time limit which is unrelated to our internal one. + spawnConnections(lk); + } else { + // If the setup failed, cascade the failure edge + processFailure(status, std::move(lk)); + } + })); // Note that this assumes that the refreshTimeout is sound for the // setupTimeout @@ -802,45 +779,6 @@ void ConnectionPool::SpecificPool::spawnConnections(stdx::unique_lock<stdx::mute } } -// Called every second after hostTimeout until all processing connections reap -void ConnectionPool::SpecificPool::shutdown() { - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - - // We're racing: - // - // Thread A (this thread) - // * Fired the shutdown timer - // * Came into shutdown() and blocked - // - // Thread B (some new consumer) - // * Requested a new connection - // * Beat thread A to the mutex - // * Cancelled timer (but thread A already made it in) - // * Set state to running - // * released the mutex - // - // So we end up in shutdown, but with kRunning. If we're here we raced and - // we should just bail. - if (_state == State::kRunning) { - return; - } - - _state = State::kInShutdown; - - // If we have processing connections, wait for them to finish or timeout - // before shutdown - if (_processingPool.size() || _droppedProcessingPool.size() || _activeClients) { - _requestTimer->setTimeout(Seconds(1), [this]() { shutdown(); }); - - return; - } - - invariant(_requests.empty()); - invariant(_checkedOutPool.empty()); - - _parent->_pools.erase(_hostAndPort); -} - template <typename OwnershipPoolType> typename OwnershipPoolType::mapped_type ConnectionPool::SpecificPool::takeFromPool( OwnershipPoolType& pool, typename OwnershipPoolType::key_type connPtr) { @@ -856,8 +794,10 @@ typename OwnershipPoolType::mapped_type ConnectionPool::SpecificPool::takeFromPo ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::takeFromProcessingPool( ConnectionInterface* connPtr) { auto conn = takeFromPool(_processingPool, connPtr); - if (conn) + if (conn) { + invariant(_state != State::kInShutdown); return conn; + } return takeFromPool(_droppedProcessingPool, connPtr); } @@ -865,6 +805,16 @@ ConnectionPool::SpecificPool::OwnedConnection ConnectionPool::SpecificPool::take // Updates our state and manages the request timer void ConnectionPool::SpecificPool::updateStateInLock() { + if (_state == State::kInShutdown) { + // If we're in shutdown, there is nothing to update. Our clients are all gone. + if (_processingPool.empty() && !_activeClients) { + // If we have no more clients that require access to us, delist from the parent pool + LOG(2) << "Delisting connection pool for " << _hostAndPort; + _parent->_pools.erase(_hostAndPort); + } + return; + } + if (_requests.size()) { // We have some outstanding requests, we're live @@ -883,8 +833,8 @@ void ConnectionPool::SpecificPool::updateStateInLock() { // We set a timer for the most recent request, then invoke each timed // out request we couldn't service - _requestTimer->setTimeout(timeout, [this]() { - runWithActiveClient([&](stdx::unique_lock<stdx::mutex> lk) { + _requestTimer->setTimeout( + timeout, guardCallback([this](stdx::unique_lock<stdx::mutex> lk) { auto now = _parent->_factory->now(); while (_requests.size()) { @@ -905,8 +855,7 @@ void ConnectionPool::SpecificPool::updateStateInLock() { } updateStateInLock(); - }); - }); + })); } else if (_checkedOutPool.size()) { // If we have no requests, but someone's using a connection, we just // hang around until the next request or a return @@ -929,8 +878,17 @@ void ConnectionPool::SpecificPool::updateStateInLock() { auto timeout = _parent->_options.hostTimeout; - // Set the shutdown timer - _requestTimer->setTimeout(timeout, [this]() { shutdown(); }); + // Set the shutdown timer, this gets reset on any request + _requestTimer->setTimeout(timeout, [ this, anchor = shared_from_this() ]() { + stdx::unique_lock<stdx::mutex> lk(anchor->_parent->_mutex); + if (_state != State::kIdle) + return; + + triggerShutdown( + Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, + "Connection pool has been idle for longer than the host timeout"), + std::move(lk)); + }); } } |