diff options
author | Pavithra Vetriselvan <pavithra.vetriselvan@mongodb.com> | 2019-09-23 16:06:14 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-09-23 16:06:14 +0000 |
commit | 28a50ae351071acdf18745c0f3bb54c84485d3ba (patch) | |
tree | daec7dff7a6b447086db4de074448d1715a4e0b0 /src/mongo | |
parent | 35ba7ee8716fb1ece51d28097777ecd8f688568a (diff) | |
download | mongo-28a50ae351071acdf18745c0f3bb54c84485d3ba.tar.gz |
Revert "SERVER-42790 SERVER-42930 ConnectionPool controller updates must batch across hosts"
This reverts commit e9f5360ba5fba5598e2556816a9d7d818ab586c7.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/executor/connection_pool.cpp | 127 | ||||
-rw-r--r-- | src/mongo/executor/connection_pool.h | 33 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor_pool_controller.cpp | 12 |
3 files changed, 58 insertions, 114 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index 27d856444dd..c2705ae9e1d 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -33,6 +33,8 @@ #include "mongo/executor/connection_pool.h" +#include <fmt/format.h> + #include "mongo/bson/bsonobjbuilder.h" #include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/remote_command_request.h" @@ -147,8 +149,7 @@ public: data.target = maxConns; } - auto fate = stats.health.isExpired ? HostFate::kShouldDie : HostFate::kShouldLive; - return {{std::make_pair(data.host, fate)}}; + return {{data.host}, stats.health.isExpired}; } void removeHost(PoolId id) override { stdx::lock_guard lk(_mutex); @@ -198,6 +199,7 @@ protected: */ class ConnectionPool::SpecificPool final : public std::enable_shared_from_this<ConnectionPool::SpecificPool> { + static constexpr int kDiagnosticLogLevel = 3; public: /** @@ -317,10 +319,6 @@ public: _tags = mutateFunc(_tags); } - auto sslMode() const { - return _sslMode; - } - void fassertSSLModeIs(transport::ConnectSSLMode desired) const { if (desired != _sslMode) { severe() << "Mixing ssl modes for a single host is not supported"; @@ -328,14 +326,6 @@ public: } } - // Update the controller and potentially change the controls - HostGroupState updateController(); - - /** - * Establishes connections until the ControllerInterface's target is met. - */ - void spawnConnections(); - private: using OwnedConnection = std::shared_ptr<ConnectionInterface>; using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>; @@ -349,6 +339,11 @@ private: ConnectionHandle makeHandle(ConnectionInterface* connection); + /** + * Establishes connections until the ControllerInterface's target is met. + */ + void spawnConnections(); + void finishRefresh(ConnectionInterface* connPtr, Status status); void addToReady(OwnedConnection conn); @@ -373,6 +368,9 @@ private: // Update the event timer for this host pool void updateEventTimer(); + // Update the controller and potentially change the controls + void updateController(); + private: const std::shared_ptr<ConnectionPool> _parent; @@ -397,6 +395,13 @@ private: // It increases when we process a failure. If a connection is from a previous generation, // it will be discarded on return/refresh. size_t _generation = 0; + + // When the pool needs to potentially die or spawn connections, updateController() is scheduled + // onto the executor and this flag is set. When updateController() finishes running, this flag + // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once + // it is closer to steady state. + bool _updateScheduled = false; + size_t _created = 0; transport::Session::TagMask _tags = transport::Session::kPending; @@ -517,7 +522,6 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort, .thenRunOn(_factory->getExecutor()) .getAsync(std::move(cb)); }; - LOG(kDiagnosticLogLevel) << "Scheduling get for " << hostAndPort << " with timeout " << timeout; _factory->getExecutor()->schedule(std::move(getConnectionFunc)); } @@ -1043,9 +1047,9 @@ void ConnectionPool::SpecificPool::updateEventTimer() { _eventTimer->setTimeout(timeout, std::move(deferredStateUpdateFunc)); } -auto ConnectionPool::SpecificPool::updateController() -> HostGroupState { +void ConnectionPool::SpecificPool::updateController() { if (_health.isShutdown) { - return {}; + return; } auto& controller = *_parent->_controller; @@ -1060,66 +1064,41 @@ auto ConnectionPool::SpecificPool::updateController() -> HostGroupState { }; LOG(kDiagnosticLogLevel) << "Updating controller for " << _hostAndPort << " with State: " << state; - return controller.updateHost(_id, std::move(state)); -} - -void ConnectionPool::_updateController() { - stdx::lock_guard lk(_mutex); - _shouldUpdateController = false; + auto hostGroup = controller.updateHost(_id, std::move(state)); - // First we call updateController() on each pool that needs it and cache the results - struct Command { - HostFate fate; - transport::ConnectSSLMode sslMode; - size_t updateId = 0; - }; - stdx::unordered_map<HostAndPort, Command> hosts; - for (auto& [pool, updateId] : _poolsToUpdate) { - for (auto [otherHost, fate] : pool->updateController().fates) { - auto& cmd = hosts[otherHost]; - if (cmd.updateId > updateId) { + // If we can shutdown, then do so + if (hostGroup.canShutdown) { + for (const auto& host : hostGroup.hosts) { + auto it = _parent->_pools.find(host); + if (it == _parent->_pools.end()) { continue; } - cmd.fate = fate; - cmd.sslMode = pool->sslMode(); + auto& pool = it->second; + + // At the moment, controllers will never mark for shutdown a pool with active + // connections or pending requests. isExpired is never true if these invariants are + // false. That's not to say that it's a terrible idea, but if this happens then we + // should review what it means to be expired. + + invariant(pool->_checkedOutPool.empty()); + invariant(pool->_requests.empty()); + + pool->triggerShutdown(Status(ErrorCodes::ShutdownInProgress, + str::stream() << "Pool for " << host << " has expired.")); } + return; } - _poolsToUpdate.clear(); - - // Then we go through each cached result host by host and address the HostFate we were given - for (auto& [host, cmd] : hosts) { - switch (cmd.fate) { - case HostFate::kShouldLive: { - LOG(kDiagnosticLogLevel) << "Vivifying pool for " << host; - auto& pool = _pools[host]; - if (!pool) { - pool = SpecificPool::make(shared_from_this(), host, cmd.sslMode); - } - pool->spawnConnections(); - } break; - case HostFate::kShouldDie: { - auto it = _pools.find(host); - if (it == _pools.end()) { - continue; - } - - LOG(kDiagnosticLogLevel) << "Killing pool for " << host; - auto& pool = it->second; - - // At the moment, controllers will never mark for shutdown a pool with active - // connections or pending requests. isExpired is never true if these invariants are - // false. That's not to say that it's a terrible idea, but if this happens then we - // should review what it means to be expired. - dassert(pool->inUseConnections() == 0); - dassert(pool->requestsPending() == 0); - - pool->triggerShutdown( - Status(ErrorCodes::ShutdownInProgress, - str::stream() << "Pool for " << host << " has expired.")); - } break; + + + // Make sure all related hosts exist + for (const auto& host : hostGroup.hosts) { + if (auto& pool = _parent->_pools[host]; !pool) { + pool = SpecificPool::make(_parent, host, _sslMode); } } + + spawnConnections(); } // Updates our state and manages the request timer @@ -1133,17 +1112,17 @@ void ConnectionPool::SpecificPool::updateState() { updateEventTimer(); updateHealth(); - _parent->_poolsToUpdate[shared_from_this()] = ++_parent->_lastUpdateId; - if (std::exchange(_parent->_shouldUpdateController, true)) { + if (std::exchange(_updateScheduled, true)) { return; } - LOG(kDiagnosticLogLevel) << "Scheduling controller update"; ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) // - .getAsync([parent = _parent](Status&& status) mutable { + .getAsync([this, anchor = shared_from_this()](Status&& status) mutable { invariant(status); - parent->_updateController(); + stdx::lock_guard lk(_parent->_mutex); + _updateScheduled = false; + updateController(); }); } diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 7f43941f8cb..70ee3c652c5 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -90,8 +90,6 @@ public: static const Status kConnectionStateUnknown; - static constexpr int kDiagnosticLogLevel = 4; - struct Options { Options() {} @@ -212,27 +210,13 @@ public: }; /** - * A HostFate is a HostAndPort specific signal from a Controller to the ConnectionPool - * - * - kShouldLive implies that if the SpecificPool doesn't already exist, it should be created - * - kShouldDie implies that if the SpecificPool does exist, it should shutdown - */ - enum class HostFate { - kShouldLive, - kShouldDie, - }; - - /** - * A set of (HostAndPort, HostFate) pairs representing the HostGroup + * A set of hosts and a flag canShutdown for if the group can shutdown * * This should only be constructed by a ControllerInterface */ struct HostGroupState { - // While this is a list of pairs, the two controllers in use today each have a predictable - // pattern: - // * A single host with a single fate - // * A list of hosts (i.e. a replica set) all with the same fate - std::vector<std::pair<HostAndPort, HostFate>> fates; + std::vector<HostAndPort> hosts; + bool canShutdown = false; }; explicit ConnectionPool(std::shared_ptr<DependentTypeFactoryInterface> impl, @@ -263,8 +247,6 @@ public: size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const; private: - void _updateController(); - std::string _name; const std::shared_ptr<DependentTypeFactoryInterface> _factory; @@ -277,14 +259,6 @@ private: PoolId _nextPoolId = 0; stdx::unordered_map<HostAndPort, std::shared_ptr<SpecificPool>> _pools; - // When the pool needs to potentially die or spawn connections, _updateController() is scheduled - // onto the executor and this flag is set. When _updateController() finishes running, this flag - // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once - // it is closer to steady state. - bool _shouldUpdateController = false; - size_t _lastUpdateId = 0; - stdx::unordered_map<std::shared_ptr<SpecificPool>, size_t> _poolsToUpdate; - EgressTagCloserManager* _manager; }; @@ -439,7 +413,6 @@ public: using HostState = typename ConnectionPool::HostState; using ConnectionControls = typename ConnectionPool::ConnectionControls; using HostGroupState = typename ConnectionPool::HostGroupState; - using HostFate = typename ConnectionPool::HostFate; using PoolId = typename ConnectionPool::PoolId; virtual ~ControllerInterface() = default; diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp index fc252fcba91..871293699ea 100644 --- a/src/mongo/s/sharding_task_executor_pool_controller.cpp +++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp @@ -222,8 +222,7 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState& // If the pool isn't in a groupData, we can return now auto groupData = poolData.groupData.lock(); if (!groupData || groupData->state.passives.count(poolData.host)) { - auto fate = poolData.isAbleToShutdown ? HostFate::kShouldDie : HostFate::kShouldLive; - return {{std::make_pair(poolData.host, fate)}}; + return {{poolData.host}, poolData.isAbleToShutdown}; } switch (gParameters.matchingStrategy.load()) { @@ -255,14 +254,7 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState& std::all_of(groupData->poolIds.begin(), groupData->poolIds.end(), [&](auto otherId) { return getOrInvariant(_poolDatas, otherId).isAbleToShutdown; }); - - HostGroupState hostGroupState; - hostGroupState.fates.reserve(groupData->state.connStr.getServers().size()); - auto fate = shouldShutdown ? HostFate::kShouldDie : HostFate::kShouldLive; - for (const auto& host : groupData->state.connStr.getServers()) { - hostGroupState.fates.emplace_back(host, fate); - } - return {hostGroupState}; + return {groupData->state.connStr.getServers(), shouldShutdown}; } void ShardingTaskExecutorPoolController::removeHost(PoolId id) { |