diff options
author | Ben Caimano <ben.caimano@mongodb.com> | 2019-09-19 16:10:51 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-09-19 16:10:51 +0000 |
commit | e9f5360ba5fba5598e2556816a9d7d818ab586c7 (patch) | |
tree | 6a5c89191b1c3ffdc68501c9be1e620004ec583f /src/mongo | |
parent | 7a2417228adc2ee34925fcf6084a6bfc93469567 (diff) | |
download | mongo-e9f5360ba5fba5598e2556816a9d7d818ab586c7.tar.gz |
SERVER-42790 SERVER-42930 ConnectionPool controller updates must batch across hosts
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 127 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 33 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.cpp | 12 |
3 files changed, 114 insertions, 58 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index c2705ae9e1d..27d856444dd 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -33,8 +33,6 @@ #include "mongo/executor/connection_pool.h" -#include <fmt/format.h> - #include "mongo/bson/bsonobjbuilder.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/remote_command_request.h" @@ -149,7 +147,8 @@ public: data.target = maxConns; } - return {{data.host}, stats.health.isExpired}; + auto fate = stats.health.isExpired ? HostFate::kShouldDie : HostFate::kShouldLive; + return {{std::make_pair(data.host, fate)}}; } void removeHost(PoolId id) override { stdx::lock_guard lk(_mutex); @@ -199,7 +198,6 @@ protected: */ class ConnectionPool::SpecificPool final : public std::enable_shared_from_this<ConnectionPool::SpecificPool> { - static constexpr int kDiagnosticLogLevel = 3; public: /** @@ -319,6 +317,10 @@ public: _tags = mutateFunc(_tags); } + auto sslMode() const { + return _sslMode; + } + void fassertSSLModeIs(transport::ConnectSSLMode desired) const { if (desired != _sslMode) { severe() << "Mixing ssl modes for a single host is not supported"; @@ -326,6 +328,14 @@ public: } } + // Update the controller and potentially change the controls + HostGroupState updateController(); + + /** + * Establishes connections until the ControllerInterface's target is met. + */ + void spawnConnections(); + private: using OwnedConnection = std::shared_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; @@ -339,11 +349,6 @@ private: ConnectionHandle makeHandle(ConnectionInterface* connection); - /** - * Establishes connections until the ControllerInterface's target is met. - */ - void spawnConnections(); - void finishRefresh(ConnectionInterface* connPtr, Status status); void addToReady(OwnedConnection conn); @@ -368,9 +373,6 @@ private: // Update the event timer for this host pool void updateEventTimer(); - // Update the controller and potentially change the controls - void updateController(); - private: const std::shared_ptr<ConnectionPool> _parent; @@ -395,13 +397,6 @@ private: // It increases when we process a failure. If a connection is from a previous generation, // it will be discarded on return/refresh. size_t _generation = 0; - - // When the pool needs to potentially die or spawn connections, updateController() is scheduled - // onto the executor and this flag is set. When updateController() finishes running, this flag - // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once - // it is closer to steady state. - bool _updateScheduled = false; - size_t _created = 0; transport::Session::TagMask _tags = transport::Session::kPending; @@ -522,6 +517,7 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort, .thenRunOn(_factory->getExecutor()) .getAsync(std::move(cb)); }; + LOG(kDiagnosticLogLevel) << "Scheduling get for " << hostAndPort << " with timeout " << timeout; _factory->getExecutor()->schedule(std::move(getConnectionFunc)); } @@ -1047,9 +1043,9 @@ void ConnectionPool::SpecificPool::updateEventTimer() { _eventTimer->setTimeout(timeout, std::move(deferredStateUpdateFunc)); } -void ConnectionPool::SpecificPool::updateController() { +auto ConnectionPool::SpecificPool::updateController() -> HostGroupState { if (_health.isShutdown) { - return; + return {}; } auto& controller = *_parent->_controller; @@ -1064,41 +1060,66 @@ void ConnectionPool::SpecificPool::updateController() { }; LOG(kDiagnosticLogLevel) << "Updating controller for " << _hostAndPort << " with State: " << state; - auto hostGroup = controller.updateHost(_id, std::move(state)); + return controller.updateHost(_id, std::move(state)); +} + +void ConnectionPool::_updateController() { + stdx::lock_guard lk(_mutex); + _shouldUpdateController = false; - // If we can shutdown, then do so - if (hostGroup.canShutdown) { - for (const auto& host : hostGroup.hosts) { - auto it = _parent->_pools.find(host); - if (it == _parent->_pools.end()) { + // First we call updateController() on each pool that needs it and cache the results + struct Command { + HostFate fate; + transport::ConnectSSLMode sslMode; + size_t updateId = 0; + }; + stdx::unordered_map<HostAndPort, Command> hosts; + for (auto& [pool, updateId] : _poolsToUpdate) { + for (auto [otherHost, fate] : pool->updateController().fates) { + auto& cmd = hosts[otherHost]; + if (cmd.updateId > updateId) { continue; } - auto& pool = it->second; - - // At the moment, controllers will never mark for shutdown a pool with active - // connections or pending requests. isExpired is never true if these invariants are - // false. That's not to say that it's a terrible idea, but if this happens then we - // should review what it means to be expired. - - invariant(pool->_checkedOutPool.empty()); - invariant(pool->_requests.empty()); - - pool->triggerShutdown(Status(ErrorCodes::ShutdownInProgress, - str::stream() << "Pool for " << host << " has expired.")); + cmd.fate = fate; + cmd.sslMode = pool->sslMode(); } - return; } - - - // Make sure all related hosts exist - for (const auto& host : hostGroup.hosts) { - if (auto& pool = _parent->_pools[host]; !pool) { - pool = SpecificPool::make(_parent, host, _sslMode); + _poolsToUpdate.clear(); + + // Then we go through each cached result host by host and address the HostFate we were given + for (auto& [host, cmd] : hosts) { + switch (cmd.fate) { + case HostFate::kShouldLive: { + LOG(kDiagnosticLogLevel) << "Vivifying pool for " << host; + auto& pool = _pools[host]; + if (!pool) { + pool = SpecificPool::make(shared_from_this(), host, cmd.sslMode); + } + pool->spawnConnections(); + } break; + case HostFate::kShouldDie: { + auto it = _pools.find(host); + if (it == _pools.end()) { + continue; + } + + LOG(kDiagnosticLogLevel) << "Killing pool for " << host; + auto& pool = it->second; + + // At the moment, controllers will never mark for shutdown a pool with active + // connections or pending requests. isExpired is never true if these invariants are + // false. That's not to say that it's a terrible idea, but if this happens then we + // should review what it means to be expired. + dassert(pool->inUseConnections() == 0); + dassert(pool->requestsPending() == 0); + + pool->triggerShutdown( + Status(ErrorCodes::ShutdownInProgress, + str::stream() << "Pool for " << host << " has expired.")); + } break; } } - - spawnConnections(); } // Updates our state and manages the request timer @@ -1112,17 +1133,17 @@ void ConnectionPool::SpecificPool::updateState() { updateEventTimer(); updateHealth(); - if (std::exchange(_updateScheduled, true)) { + _parent->_poolsToUpdate[shared_from_this()] = ++_parent->_lastUpdateId; + if (std::exchange(_parent->_shouldUpdateController, true)) { return; } + LOG(kDiagnosticLogLevel) << "Scheduling controller update"; ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) // - .getAsync([this, anchor = shared_from_this()](Status&& status) mutable { + .getAsync([parent = _parent](Status&& status) mutable { invariant(status); - stdx::lock_guard lk(_parent->_mutex); - _updateScheduled = false; - updateController(); + parent->_updateController(); }); } diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 70ee3c652c5..7f43941f8cb 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -90,6 +90,8 @@ public: static const Status kConnectionStateUnknown; + static constexpr int kDiagnosticLogLevel = 4; + struct Options { Options() {} @@ -210,13 +212,27 @@ public: }; /** - * A set of hosts and a flag canShutdown for if the group can shutdown + * A HostFate is a HostAndPort specific signal from a Controller to the ConnectionPool + * + * - kShouldLive implies that if the SpecificPool doesn't already exist, it should be created + * - kShouldDie implies that if the SpecificPool does exist, it should shutdown + */ + enum class HostFate { + kShouldLive, + kShouldDie, + }; + + /** + * A set of (HostAndPort, HostFate) pairs representing the HostGroup * * This should only be constructed by a ControllerInterface */ struct HostGroupState { - std::vector<HostAndPort> hosts; - bool canShutdown = false; + // While this is a list of pairs, the two controllers in use today each have a predictable + // pattern: + // * A single host with a single fate + // * A list of hosts (i.e. a replica set) all with the same fate + std::vector<std::pair<HostAndPort, HostFate>> fates; }; explicit ConnectionPool(std::shared_ptr<DependentTypeFactoryInterface> impl, @@ -247,6 +263,8 @@ public: size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const; private: + void _updateController(); + std::string _name; const std::shared_ptr<DependentTypeFactoryInterface> _factory; @@ -259,6 +277,14 @@ private: PoolId _nextPoolId = 0; stdx::unordered_map<HostAndPort, std::shared_ptr<SpecificPool>> _pools; + // When the pool needs to potentially die or spawn connections, _updateController() is scheduled + // onto the executor and this flag is set. When _updateController() finishes running, this flag + // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once + // it is closer to steady state. + bool _shouldUpdateController = false; + size_t _lastUpdateId = 0; + stdx::unordered_map<std::shared_ptr<SpecificPool>, size_t> _poolsToUpdate; + EgressTagCloserManager* _manager; }; @@ -413,6 +439,7 @@ public: using HostState = typename ConnectionPool::HostState; using ConnectionControls = typename ConnectionPool::ConnectionControls; using HostGroupState = typename ConnectionPool::HostGroupState; + using HostFate = typename ConnectionPool::HostFate; using PoolId = typename ConnectionPool::PoolId; virtual ~ControllerInterface() = default; diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp index 871293699ea..fc252fcba91 100644 --- a/src/mongo/s/sharding_task_executor_pool_controller.cpp +++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp @@ -222,7 +222,8 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState& // If the pool isn't in a groupData, we can return now auto groupData = poolData.groupData.lock(); if (!groupData || groupData->state.passives.count(poolData.host)) { - return {{poolData.host}, poolData.isAbleToShutdown}; + auto fate = poolData.isAbleToShutdown ? HostFate::kShouldDie : HostFate::kShouldLive; + return {{std::make_pair(poolData.host, fate)}}; } switch (gParameters.matchingStrategy.load()) { @@ -254,7 +255,14 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState& std::all_of(groupData->poolIds.begin(), groupData->poolIds.end(), [&](auto otherId) { return getOrInvariant(_poolDatas, otherId).isAbleToShutdown; }); - return {groupData->state.connStr.getServers(), shouldShutdown}; + + HostGroupState hostGroupState; + hostGroupState.fates.reserve(groupData->state.connStr.getServers().size()); + auto fate = shouldShutdown ? HostFate::kShouldDie : HostFate::kShouldLive; + for (const auto& host : groupData->state.connStr.getServers()) { + hostGroupState.fates.emplace_back(host, fate); + } + return {hostGroupState}; } void ShardingTaskExecutorPoolController::removeHost(PoolId id) { |