summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorPavithra Vetriselvan <pavithra.vetriselvan@mongodb.com>2019-09-23 16:06:14 +0000
committerevergreen <evergreen@mongodb.com>2019-09-23 16:06:14 +0000
commit28a50ae351071acdf18745c0f3bb54c84485d3ba (patch)
treedaec7dff7a6b447086db4de074448d1715a4e0b0 /src/mongo
parent35ba7ee8716fb1ece51d28097777ecd8f688568a (diff)
downloadmongo-28a50ae351071acdf18745c0f3bb54c84485d3ba.tar.gz
Revert "SERVER-42790 SERVER-42930 ConnectionPool controller updates must batch across hosts"
This reverts commit e9f5360ba5fba5598e2556816a9d7d818ab586c7.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/executor/connection_pool.cpp127
-rw-r--r--src/mongo/executor/connection_pool.h33
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.cpp12
3 files changed, 58 insertions, 114 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 27d856444dd..c2705ae9e1d 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -33,6 +33,8 @@
#include "mongo/executor/connection_pool.h"
+#include <fmt/format.h>
+
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/remote_command_request.h"
@@ -147,8 +149,7 @@ public:
data.target = maxConns;
}
- auto fate = stats.health.isExpired ? HostFate::kShouldDie : HostFate::kShouldLive;
- return {{std::make_pair(data.host, fate)}};
+ return {{data.host}, stats.health.isExpired};
}
void removeHost(PoolId id) override {
stdx::lock_guard lk(_mutex);
@@ -198,6 +199,7 @@ protected:
*/
class ConnectionPool::SpecificPool final
: public std::enable_shared_from_this<ConnectionPool::SpecificPool> {
+ static constexpr int kDiagnosticLogLevel = 3;
public:
/**
@@ -317,10 +319,6 @@ public:
_tags = mutateFunc(_tags);
}
- auto sslMode() const {
- return _sslMode;
- }
-
void fassertSSLModeIs(transport::ConnectSSLMode desired) const {
if (desired != _sslMode) {
severe() << "Mixing ssl modes for a single host is not supported";
@@ -328,14 +326,6 @@ public:
}
}
- // Update the controller and potentially change the controls
- HostGroupState updateController();
-
- /**
- * Establishes connections until the ControllerInterface's target is met.
- */
- void spawnConnections();
-
private:
using OwnedConnection = std::shared_ptr<ConnectionInterface>;
using OwnershipPool = stdx::unordered_map<ConnectionInterface*, OwnedConnection>;
@@ -349,6 +339,11 @@ private:
ConnectionHandle makeHandle(ConnectionInterface* connection);
+ /**
+ * Establishes connections until the ControllerInterface's target is met.
+ */
+ void spawnConnections();
+
void finishRefresh(ConnectionInterface* connPtr, Status status);
void addToReady(OwnedConnection conn);
@@ -373,6 +368,9 @@ private:
// Update the event timer for this host pool
void updateEventTimer();
+ // Update the controller and potentially change the controls
+ void updateController();
+
private:
const std::shared_ptr<ConnectionPool> _parent;
@@ -397,6 +395,13 @@ private:
// It increases when we process a failure. If a connection is from a previous generation,
// it will be discarded on return/refresh.
size_t _generation = 0;
+
+ // When the pool needs to potentially die or spawn connections, updateController() is scheduled
+ // onto the executor and this flag is set. When updateController() finishes running, this flag
+ // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once
+ // it is closer to steady state.
+ bool _updateScheduled = false;
+
size_t _created = 0;
transport::Session::TagMask _tags = transport::Session::kPending;
@@ -517,7 +522,6 @@ void ConnectionPool::get_forTest(const HostAndPort& hostAndPort,
.thenRunOn(_factory->getExecutor())
.getAsync(std::move(cb));
};
- LOG(kDiagnosticLogLevel) << "Scheduling get for " << hostAndPort << " with timeout " << timeout;
_factory->getExecutor()->schedule(std::move(getConnectionFunc));
}
@@ -1043,9 +1047,9 @@ void ConnectionPool::SpecificPool::updateEventTimer() {
_eventTimer->setTimeout(timeout, std::move(deferredStateUpdateFunc));
}
-auto ConnectionPool::SpecificPool::updateController() -> HostGroupState {
+void ConnectionPool::SpecificPool::updateController() {
if (_health.isShutdown) {
- return {};
+ return;
}
auto& controller = *_parent->_controller;
@@ -1060,66 +1064,41 @@ auto ConnectionPool::SpecificPool::updateController() -> HostGroupState {
};
LOG(kDiagnosticLogLevel) << "Updating controller for " << _hostAndPort
<< " with State: " << state;
- return controller.updateHost(_id, std::move(state));
-}
-
-void ConnectionPool::_updateController() {
- stdx::lock_guard lk(_mutex);
- _shouldUpdateController = false;
+ auto hostGroup = controller.updateHost(_id, std::move(state));
- // First we call updateController() on each pool that needs it and cache the results
- struct Command {
- HostFate fate;
- transport::ConnectSSLMode sslMode;
- size_t updateId = 0;
- };
- stdx::unordered_map<HostAndPort, Command> hosts;
- for (auto& [pool, updateId] : _poolsToUpdate) {
- for (auto [otherHost, fate] : pool->updateController().fates) {
- auto& cmd = hosts[otherHost];
- if (cmd.updateId > updateId) {
+ // If we can shutdown, then do so
+ if (hostGroup.canShutdown) {
+ for (const auto& host : hostGroup.hosts) {
+ auto it = _parent->_pools.find(host);
+ if (it == _parent->_pools.end()) {
continue;
}
- cmd.fate = fate;
- cmd.sslMode = pool->sslMode();
+ auto& pool = it->second;
+
+ // At the moment, controllers will never mark for shutdown a pool with active
+ // connections or pending requests. isExpired is never true if these invariants are
+ // false. That's not to say that it's a terrible idea, but if this happens then we
+ // should review what it means to be expired.
+
+ invariant(pool->_checkedOutPool.empty());
+ invariant(pool->_requests.empty());
+
+ pool->triggerShutdown(Status(ErrorCodes::ShutdownInProgress,
+ str::stream() << "Pool for " << host << " has expired."));
}
+ return;
}
- _poolsToUpdate.clear();
-
- // Then we go through each cached result host by host and address the HostFate we were given
- for (auto& [host, cmd] : hosts) {
- switch (cmd.fate) {
- case HostFate::kShouldLive: {
- LOG(kDiagnosticLogLevel) << "Vivifying pool for " << host;
- auto& pool = _pools[host];
- if (!pool) {
- pool = SpecificPool::make(shared_from_this(), host, cmd.sslMode);
- }
- pool->spawnConnections();
- } break;
- case HostFate::kShouldDie: {
- auto it = _pools.find(host);
- if (it == _pools.end()) {
- continue;
- }
-
- LOG(kDiagnosticLogLevel) << "Killing pool for " << host;
- auto& pool = it->second;
-
- // At the moment, controllers will never mark for shutdown a pool with active
- // connections or pending requests. isExpired is never true if these invariants are
- // false. That's not to say that it's a terrible idea, but if this happens then we
- // should review what it means to be expired.
- dassert(pool->inUseConnections() == 0);
- dassert(pool->requestsPending() == 0);
-
- pool->triggerShutdown(
- Status(ErrorCodes::ShutdownInProgress,
- str::stream() << "Pool for " << host << " has expired."));
- } break;
+
+
+ // Make sure all related hosts exist
+ for (const auto& host : hostGroup.hosts) {
+ if (auto& pool = _parent->_pools[host]; !pool) {
+ pool = SpecificPool::make(_parent, host, _sslMode);
}
}
+
+ spawnConnections();
}
// Updates our state and manages the request timer
@@ -1133,17 +1112,17 @@ void ConnectionPool::SpecificPool::updateState() {
updateEventTimer();
updateHealth();
- _parent->_poolsToUpdate[shared_from_this()] = ++_parent->_lastUpdateId;
- if (std::exchange(_parent->_shouldUpdateController, true)) {
+ if (std::exchange(_updateScheduled, true)) {
return;
}
- LOG(kDiagnosticLogLevel) << "Scheduling controller update";
ExecutorFuture(ExecutorPtr(_parent->_factory->getExecutor())) //
- .getAsync([parent = _parent](Status&& status) mutable {
+ .getAsync([this, anchor = shared_from_this()](Status&& status) mutable {
invariant(status);
- parent->_updateController();
+ stdx::lock_guard lk(_parent->_mutex);
+ _updateScheduled = false;
+ updateController();
});
}
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 7f43941f8cb..70ee3c652c5 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -90,8 +90,6 @@ public:
static const Status kConnectionStateUnknown;
- static constexpr int kDiagnosticLogLevel = 4;
-
struct Options {
Options() {}
@@ -212,27 +210,13 @@ public:
};
/**
- * A HostFate is a HostAndPort specific signal from a Controller to the ConnectionPool
- *
- * - kShouldLive implies that if the SpecificPool doesn't already exist, it should be created
- * - kShouldDie implies that if the SpecificPool does exist, it should shutdown
- */
- enum class HostFate {
- kShouldLive,
- kShouldDie,
- };
-
- /**
- * A set of (HostAndPort, HostFate) pairs representing the HostGroup
+ * A set of hosts and a flag canShutdown for if the group can shutdown
*
* This should only be constructed by a ControllerInterface
*/
struct HostGroupState {
- // While this is a list of pairs, the two controllers in use today each have a predictable
- // pattern:
- // * A single host with a single fate
- // * A list of hosts (i.e. a replica set) all with the same fate
- std::vector<std::pair<HostAndPort, HostFate>> fates;
+ std::vector<HostAndPort> hosts;
+ bool canShutdown = false;
};
explicit ConnectionPool(std::shared_ptr<DependentTypeFactoryInterface> impl,
@@ -263,8 +247,6 @@ public:
size_t getNumConnectionsPerHost(const HostAndPort& hostAndPort) const;
private:
- void _updateController();
-
std::string _name;
const std::shared_ptr<DependentTypeFactoryInterface> _factory;
@@ -277,14 +259,6 @@ private:
PoolId _nextPoolId = 0;
stdx::unordered_map<HostAndPort, std::shared_ptr<SpecificPool>> _pools;
- // When the pool needs to potentially die or spawn connections, _updateController() is scheduled
- // onto the executor and this flag is set. When _updateController() finishes running, this flag
- // is unset. This allows the pool to amortize the expensive spawning and hopefully do work once
- // it is closer to steady state.
- bool _shouldUpdateController = false;
- size_t _lastUpdateId = 0;
- stdx::unordered_map<std::shared_ptr<SpecificPool>, size_t> _poolsToUpdate;
-
EgressTagCloserManager* _manager;
};
@@ -439,7 +413,6 @@ public:
using HostState = typename ConnectionPool::HostState;
using ConnectionControls = typename ConnectionPool::ConnectionControls;
using HostGroupState = typename ConnectionPool::HostGroupState;
- using HostFate = typename ConnectionPool::HostFate;
using PoolId = typename ConnectionPool::PoolId;
virtual ~ControllerInterface() = default;
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp
index fc252fcba91..871293699ea 100644
--- a/src/mongo/s/sharding_task_executor_pool_controller.cpp
+++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp
@@ -222,8 +222,7 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState&
// If the pool isn't in a groupData, we can return now
auto groupData = poolData.groupData.lock();
if (!groupData || groupData->state.passives.count(poolData.host)) {
- auto fate = poolData.isAbleToShutdown ? HostFate::kShouldDie : HostFate::kShouldLive;
- return {{std::make_pair(poolData.host, fate)}};
+ return {{poolData.host}, poolData.isAbleToShutdown};
}
switch (gParameters.matchingStrategy.load()) {
@@ -255,14 +254,7 @@ auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState&
std::all_of(groupData->poolIds.begin(), groupData->poolIds.end(), [&](auto otherId) {
return getOrInvariant(_poolDatas, otherId).isAbleToShutdown;
});
-
- HostGroupState hostGroupState;
- hostGroupState.fates.reserve(groupData->state.connStr.getServers().size());
- auto fate = shouldShutdown ? HostFate::kShouldDie : HostFate::kShouldLive;
- for (const auto& host : groupData->state.connStr.getServers()) {
- hostGroupState.fates.emplace_back(host, fate);
- }
- return {hostGroupState};
+ return {groupData->state.connStr.getServers(), shouldShutdown};
}
void ShardingTaskExecutorPoolController::removeHost(PoolId id) {