summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@mongodb.com>2019-09-19 16:10:51 +0000
committerevergreen <evergreen@mongodb.com>2019-09-19 16:10:51 +0000
commite9f5360ba5fba5598e2556816a9d7d818ab586c7 (patch)
tree6a5c89191b1c3ffdc68501c9be1e620004ec583f /src/mongo
parent7a2417228adc2ee34925fcf6084a6bfc93469567 (diff)
downloadmongo-e9f5360ba5fba5598e2556816a9d7d818ab586c7.tar.gz
SERVER-42790 SERVER-42930 ConnectionPool controller updates must batch across hosts
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/executor/connection_pool.cpp127
-rw-r--r--src/mongo/executor/connection_pool.h33
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.cpp12
3 files changed, 114 insertions, 58 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index c2705ae9e1d..27d856444dd 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -33,8 +33,6 @@
#include "mongo/executor/connection_pool.h"
-#include <fmt/format.h>
-
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/remote_command_request.h"
@@ -149,7 +147,8 @@ public:
data.target = maxConns;
}
- return {{data.host}, stats.health.isExpired};
+ auto fate = stats.health.isExpired ? HostFate::kShouldDie : HostFate::kShouldLive;
+ return {{std::make_pair(data.host, fate)}};
}
void removeHost(PoolId id) override {
stdx::lock_guard lk(_mutex);
@@ -199,7 +198,6 @@ protected:
*/
class ConnectionPool::SpecificPool final
: public std::enable_shared_from_this<ConnectionPool::SpecificPool> {
- static constexpr int kDiagnosticLogLevel = 3;
public:
/**
@@ -319,6 +317,10 @@ public:
_tags = mutateFunc(_tags);
}
+ auto sslMode() const {
+ return _sslMode;
+ }
+
void fassertSSLModeIs(transport::ConnectSSLMode desired) const {
if (desired != _sslMode) {
severe() << "Mixing ssl modes for a single host is not supported";
@@ -326,6 +328,14 @@ public:
}
}
+ // Update the controller and potentially change the controls
+ HostGroupState updateController();
+
+ /**
+ * Establishes connections until the ControllerInterface's target is met.
+ */
+ void spawnConnections();
+
private:
using OwnedConnection = std::shared_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
@@ -339,11 +349,6 @@ private:
ConnectionHandle makeHandle(ConnectionInterface* connection);
- /**
- * Establishes connections until the ControllerInterface's target is met.
- */
- void spawnConnections();
-
void finishRefresh(ConnectionInterface* connPtr, Status status);
void addToReady(OwnedConnection conn);
@@ -368,9 +373,6 @@ private:
// Update the event timer for this host pool
void updateEventTimer();
- // Update the controller and potentially change the controls
- void updateController();
-
private:
const std::shared_ptr<ConnectionPool> _parent;
@@ -395,13 +397,6 @@ private:
// It increases when we process a failure. If a connection is from a previous generation,
// it will be discarded on return/refresh.
size_t _generation = 0;
-
- // When the pool needs to potentially die or spawn connections, updateController() is scheduled
- // onto the executor and this flag is set. When updateController() finishes running, this flag
- // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once
- // it is closer to steady state.
- bool _updateScheduled = false;
-
size_t _created = 0;
transport::Session::TagMask _tags = transport::Session::kPending;
@@ -522,6 +517,7 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort,
.thenRunOn(_factory->getExecutor())
.getAsync(std::move(cb));
};
+ LOG(kDiagnosticLogLevel) << "Scheduling get for " << hostAndPort << " with timeout " << timeout;
_factory->getExecutor()->schedule(std::move(getConnectionFunc));
}
@@ -1047,9 +1043,9 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
_eventTimer->setTimeout(timeout, std::move(deferredStateUpdateFunc));
}
-void ConnectionPool::SpecificPool::updateController() {
+auto ConnectionPool::SpecificPool::updateController() -> HostGroupState {
if (_health.isShutdown) {
- return;
+ return {};
}
auto& controller = *_parent->_controller;
@@ -1064,41 +1060,66 @@ void ConnectionPool::SpecificPool::updateController() {
};
LOG(kDiagnosticLogLevel) << "Updating controller for " << _hostAndPort
<< " with State: " << state;
- auto hostGroup = controller.updateHost(_id, std::move(state));
+ return controller.updateHost(_id, std::move(state));
+}
+
+void ConnectionPool::_updateController() {
+ stdx::lock_guard lk(_mutex);
+ _shouldUpdateController = false;
- // If we can shutdown, then do so
- if (hostGroup.canShutdown) {
- for (const auto& host : hostGroup.hosts) {
- auto it = _parent->_pools.find(host);
- if (it == _parent->_pools.end()) {
+ // First we call updateController() on each pool that needs it and cache the results
+ struct Command {
+ HostFate fate;
+ transport::ConnectSSLMode sslMode;
+ size_t updateId = 0;
+ };
+ stdx::unordered_map<HostAndPort, Command> hosts;
+ for (auto& [pool, updateId] : _poolsToUpdate) {
+ for (auto [otherHost, fate] : pool->updateController().fates) {
+ auto& cmd = hosts[otherHost];
+ if (cmd.updateId > updateId) {
continue;
}
- auto& pool = it->second;
-
- // At the moment, controllers will never mark for shutdown a pool with active
- // connections or pending requests. isExpired is never true if these invariants are
- // false. That's not to say that it's a terrible idea, but if this happens then we
- // should review what it means to be expired.
-
- invariant(pool->_checkedOutPool.empty());
- invariant(pool->_requests.empty());
-
- pool->triggerShutdown(Status(ErrorCodes::ShutdownInProgress,
- str::stream() << "Pool for " << host << " has expired."));
+ cmd.fate = fate;
+ cmd.sslMode = pool->sslMode();
}
- return;
}
-
-
- // Make sure all related hosts exist
- for (const auto& host : hostGroup.hosts) {
- if (auto& pool = _parent->_pools[host]; !pool) {
- pool = SpecificPool::make(_parent, host, _sslMode);
+ _poolsToUpdate.clear();
+
+ // Then we go through each cached result host by host and address the HostFate we were given
+ for (auto& [host, cmd] : hosts) {
+ switch (cmd.fate) {
+ case HostFate::kShouldLive: {
+ LOG(kDiagnosticLogLevel) << "Vivifying pool for " << host;
+ auto& pool = _pools[host];
+ if (!pool) {
+ pool = SpecificPool::make(shared_from_this(), host, cmd.sslMode);
+ }
+ pool->spawnConnections();
+ } break;
+ case HostFate::kShouldDie: {
+ auto it = _pools.find(host);
+ if (it == _pools.end()) {
+ continue;
+ }
+
+ LOG(kDiagnosticLogLevel) << "Killing pool for " << host;
+ auto& pool = it->second;
+
+ // At the moment, controllers will never mark for shutdown a pool with active
+ // connections or pending requests. isExpired is never true if these invariants are
+ // false. That's not to say that it's a terrible idea, but if this happens then we
+ // should review what it means to be expired.
+ dassert(pool->inUseConnections() == 0);
+ dassert(pool->requestsPending() == 0);
+
+ pool->triggerShutdown(
+ Status(ErrorCodes::ShutdownInProgress,
+ str::stream() << "Pool for " << host << " has expired."));
+ } break;
}
}
-
- spawnConnections();
}
// Updates our state and manages the request timer
@@ -1112,17 +1133,17 @@ void ConnectionPool::SpecificPool::updateState() {
updateEventTimer();
updateHealth();
- if (std::exchange(_updateScheduled, true)) {
+ _parent->_poolsToUpdate[shared_from_this()] = ++_parent->_lastUpdateId;
+ if (std::exchange(_parent->_shouldUpdateController, true)) {
return;
}
+ LOG(kDiagnosticLogLevel) << "Scheduling controller update";
ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) //
- .getAsync([this, anchor = shared_from_this()](Status&& status) mutable {
+ .getAsync([parent = _parent](Status&& status) mutable {
invariant(status);
- stdx::lock_guard lk(_parent->_mutex);
- _updateScheduled = false;
- updateController();
+ parent->_updateController();
});
}
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 70ee3c652c5..7f43941f8cb 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -90,6 +90,8 @@ public:
static const Status kConnectionStateUnknown;
+ static constexpr int kDiagnosticLogLevel = 4;
+
struct Options {
Options() {}
@@ -210,13 +212,27 @@ public:
};
/**
- * A set of hosts and a flag canShutdown for if the group can shutdown
+ * A HostFate is a HostAndPort specific signal from a Controller to the ConnectionPool
+ *
+ * - kShouldLive implies that if the SpecificPool doesn't already exist, it should be created
+ * - kShouldDie implies that if the SpecificPool does exist, it should shutdown
+ */
+ enum class HostFate {
+ kShouldLive,
+ kShouldDie,
+ };
+
+ /**
+ * A set of (HostAndPort, HostFate) pairs representing the HostGroup
*
* This should only be constructed by a ControllerInterface
*/
struct HostGroupState {
- std::vector<HostAndPort> hosts;
- bool canShutdown = false;
+ // While this is a list of pairs, the two controllers in use today each have a predictable
+ // pattern:
+ // * A single host with a single fate
+ // * A list of hosts (i.e. a replica set) all with the same fate
+ std::vector<std::pair<HostAndPort, HostFate>> fates;
};
explicit ConnectionPool(std::shared_ptr<DependentTypeFactoryInterface> impl,
@@ -247,6 +263,8 @@ public:
size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const;
private:
+ void _updateController();
+
std::string _name;
const std::shared_ptr<DependentTypeFactoryInterface> _factory;
@@ -259,6 +277,14 @@ private:
PoolId _nextPoolId = 0;
stdx::unordered_map<HostAndPort, std::shared_ptr<SpecificPool>> _pools;
+ // When the pool needs to potentially die or spawn connections, _updateController() is scheduled
+ // onto the executor and this flag is set. When _updateController() finishes running, this flag
+ // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once
+ // it is closer to steady state.
+ bool _shouldUpdateController = false;
+ size_t _lastUpdateId = 0;
+ stdx::unordered_map<std::shared_ptr<SpecificPool>, size_t> _poolsToUpdate;
+
EgressTagCloserManager* _manager;
};
@@ -413,6 +439,7 @@ public:
using HostState = typename ConnectionPool::HostState;
using ConnectionControls = typename ConnectionPool::ConnectionControls;
using HostGroupState = typename ConnectionPool::HostGroupState;
+ using HostFate = typename ConnectionPool::HostFate;
using PoolId = typename ConnectionPool::PoolId;
virtual ~ControllerInterface() = default;
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp
index 871293699ea..fc252fcba91 100644
--- a/src/mongo/s/sharding_task_executor_pool_controller.cpp
+++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp
@@ -222,7 +222,8 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState&
// If the pool isn't in a groupData, we can return now
auto groupData = poolData.groupData.lock();
if (!groupData || groupData->state.passives.count(poolData.host)) {
- return {{poolData.host}, poolData.isAbleToShutdown};
+ auto fate = poolData.isAbleToShutdown ? HostFate::kShouldDie : HostFate::kShouldLive;
+ return {{std::make_pair(poolData.host, fate)}};
}
switch (gParameters.matchingStrategy.load()) {
@@ -254,7 +255,14 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState&
std::all_of(groupData->poolIds.begin(), groupData->poolIds.end(), [&](auto otherId) {
return getOrInvariant(_poolDatas, otherId).isAbleToShutdown;
});
- return {groupData->state.connStr.getServers(), shouldShutdown};
+
+ HostGroupState hostGroupState;
+ hostGroupState.fates.reserve(groupData->state.connStr.getServers().size());
+ auto fate = shouldShutdown ? HostFate::kShouldDie : HostFate::kShouldLive;
+ for (const auto& host : groupData->state.connStr.getServers()) {
+ hostGroupState.fates.emplace_back(host, fate);
+ }
+ return {hostGroupState};
}
void ShardingTaskExecutorPoolController::removeHost(PoolId id) {