summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-06-07 13:23:20 -0400
committerBen Caimano <ben.caimano@10gen.com>2019-06-19 14:40:45 -0400
commit8ab1b17cbcc7f5389a933e9864511ce1565fc68e (patch)
tree18eb96ba2db277008e3d93f0147dd405b1ca1eb6
parentbd334968e09e4ef71c030f2bfc19ed935dcce97e (diff)
downloadmongo-8ab1b17cbcc7f5389a933e9864511ce1565fc68e.tar.gz
SERVER-41602 Activate followBusiestNode in ShardingTaskExecutor
-rw-r--r--src/mongo/executor/connection_pool.cpp75
-rw-r--r--src/mongo/executor/connection_pool.h26
-rw-r--r--src/mongo/executor/connection_pool_test.cpp12
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.cpp180
-rw-r--r--src/mongo/s/sharding_task_executor_pool_controller.h77
5 files changed, 258 insertions, 112 deletions
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index 99d73ec76f5..276f86509f5 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -54,6 +54,25 @@ using namespace fmt::literals;
// ourselves to operations over the connection).
namespace mongo {
+
+namespace {
+
+template <typename Map, typename Key>
+auto& getOrInvariant(Map&& map, const Key& key) noexcept {
+ auto it = std::forward<Map>(map).find(key);
+ invariant(it != std::forward<Map>(map).end(), "Unable to find key in map");
+
+ return it->second;
+}
+
+template <typename Map, typename... Args>
+void emplaceOrInvariant(Map&& map, Args&&... args) noexcept {
+ auto ret = std::forward<Map>(map).emplace(std::forward<Args>(args)...);
+ invariant(ret.second, "Element already existed in map/set");
+}
+
+} // anonymous
+
namespace executor {
void ConnectionPool::ConnectionInterface::indicateUsed() {
@@ -109,11 +128,16 @@ std::string ConnectionPool::HostState::toString() const {
*/
class ConnectionPool::LimitController final : public ConnectionPool::ControllerInterface {
public:
- HostGroup updateHost(const SpecificPool* pool,
- const HostAndPort& host,
- const HostState& stats) override {
+ void addHost(PoolId id, const HostAndPort& host) override {
+ stdx::lock_guard lk(_mutex);
+ PoolData poolData;
+ poolData.host = host;
+
+ emplaceOrInvariant(_poolData, id, std::move(poolData));
+ }
+ HostGroupState updateHost(PoolId id, const HostState& stats) override {
stdx::lock_guard lk(_mutex);
- auto& data = _poolData[pool];
+ auto& data = getOrInvariant(_poolData, id);
const auto minConns = getPool()->_options.minConnections;
const auto maxConns = getPool()->_options.maxConnections;
@@ -125,16 +149,16 @@ public:
data.target = maxConns;
}
- return {{host}, stats.health.isExpired};
+ return {{data.host}, stats.health.isExpired};
}
- void removeHost(const SpecificPool* pool) override {
+ void removeHost(PoolId id) override {
stdx::lock_guard lk(_mutex);
- _poolData.erase(pool);
+ invariant(_poolData.erase(id));
}
- ConnectionControls getControls(const SpecificPool* pool) override {
+ ConnectionControls getControls(PoolId id) override {
stdx::lock_guard lk(_mutex);
- const auto& data = _poolData[pool];
+ const auto& data = getOrInvariant(_poolData, id);
return {
getPool()->_options.maxConnecting, data.target,
@@ -156,12 +180,13 @@ public:
}
protected:
- struct Data {
+ struct PoolData {
+ HostAndPort host;
size_t target = 0;
};
stdx::mutex _mutex;
- stdx::unordered_map<const SpecificPool*, Data> _poolData;
+ stdx::unordered_map<PoolId, PoolData> _poolData;
};
/**
@@ -361,6 +386,8 @@ private:
const transport::ConnectSSLMode _sslMode;
const HostAndPort _hostAndPort;
+ const PoolId _id;
+
LRUOwnershipPool _readyPool;
OwnershipPool _processingPool;
OwnershipPool _droppedProcessingPool;
@@ -391,7 +418,14 @@ private:
auto ConnectionPool::SpecificPool::make(std::shared_ptr<ConnectionPool> parent,
const HostAndPort& hostAndPort,
transport::ConnectSSLMode sslMode) {
+ auto& controller = *parent->_controller;
+
auto pool = std::make_shared<SpecificPool>(std::move(parent), hostAndPort, sslMode);
+
+ // Inform the controller that we exist
+ controller.addHost(pool->_id, hostAndPort);
+
+ // Set our timers and health
pool->updateEventTimer();
pool->updateHealth();
return pool;
@@ -553,6 +587,7 @@ ConnectionPool::SpecificPool::SpecificPool(std::shared_ptr<ConnectionPool> paren
: _parent(std::move(parent)),
_sslMode(sslMode),
_hostAndPort(hostAndPort),
+ _id(_parent->_nextPoolId++),
_readyPool(std::numeric_limits<size_t>::max()) {
invariant(_parent);
_eventTimer = _parent->_factory->makeTimer();
@@ -716,6 +751,11 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
auto conn = takeFromPool(_checkedOutPool, connPtr);
invariant(conn);
+ if (_health.isShutdown) {
+ // If we're in shutdown, then we don't care
+ return;
+ }
+
if (conn->getGeneration() != _generation) {
// If the connection is from an older generation, just return.
return;
@@ -733,7 +773,7 @@ void ConnectionPool::SpecificPool::returnConnection(ConnectionInterface* connPtr
if (needsRefreshTP <= now) {
// If we need to refresh this connection
- auto controls = _parent->_controller->getControls(this);
+ auto controls = _parent->_controller->getControls(_id);
if (_readyPool.size() + _processingPool.size() + _checkedOutPool.size() >=
controls.targetConnections) {
// If we already have minConnections, just let the connection lapse
@@ -797,7 +837,7 @@ void ConnectionPool::SpecificPool::triggerShutdown(const Status& status) {
_health.isShutdown = true;
LOG(2) << "Delisting connection pool for " << _hostAndPort;
- _parent->_controller->removeHost(this);
+ _parent->_controller->removeHost(_id);
_parent->_pools.erase(_hostAndPort);
processFailure(status);
@@ -881,6 +921,11 @@ void ConnectionPool::SpecificPool::fulfillRequests() {
// spawn enough connections to satisfy open requests and minpool, while honoring maxpool
void ConnectionPool::SpecificPool::spawnConnections() {
+ if (_health.isShutdown) {
+ // Dead pools spawn no conns
+ return;
+ }
+
if (_health.isFailed) {
LOG(kDiagnosticLogLevel)
<< "Pool for " << _hostAndPort
@@ -888,7 +933,7 @@ void ConnectionPool::SpecificPool::spawnConnections() {
return;
}
- auto controls = _parent->_controller->getControls(this);
+ auto controls = _parent->_controller->getControls(_id);
LOG(kDiagnosticLogLevel) << "Comparing connection state for " << _hostAndPort
<< " to Controls: " << controls;
@@ -1035,7 +1080,7 @@ void ConnectionPool::SpecificPool::updateController() {
};
LOG(kDiagnosticLogLevel) << "Updating controller for " << _hostAndPort
<< " with State: " << state;
- auto hostGroup = controller.updateHost(this, _hostAndPort, std::move(state));
+ auto hostGroup = controller.updateHost(_id, std::move(state));
// If we can shutdown, then do so
if (hostGroup.canShutdown) {
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 6eca977ca1e..70ee3c652c5 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -78,6 +78,8 @@ public:
using GetConnectionCallback = unique_function<void(StatusWith<ConnectionHandle>)>;
+ using PoolId = uint64_t;
+
static constexpr size_t kDefaultMaxConns = std::numeric_limits<size_t>::max();
static constexpr size_t kDefaultMinConns = 1;
static constexpr size_t kDefaultMaxConnecting = 2;
@@ -212,7 +214,7 @@ public:
*
* This should only be constructed by a ControllerInterface
*/
- struct HostGroup {
+ struct HostGroupState {
std::vector<HostAndPort> hosts;
bool canShutdown = false;
};
@@ -254,6 +256,7 @@ private:
// The global mutex for specific pool access and the generation counter
mutable stdx::mutex _mutex;
+ PoolId _nextPoolId = 0;
stdx::unordered_map<HostAndPort, std::shared_ptr<SpecificPool>> _pools;
EgressTagCloserManager* _manager;
@@ -409,7 +412,8 @@ public:
using SpecificPool = typename ConnectionPool::SpecificPool;
using HostState = typename ConnectionPool::HostState;
using ConnectionControls = typename ConnectionPool::ConnectionControls;
- using HostGroup = typename ConnectionPool::HostGroup;
+ using HostGroupState = typename ConnectionPool::HostGroupState;
+ using PoolId = typename ConnectionPool::PoolId;
virtual ~ControllerInterface() = default;
@@ -421,24 +425,26 @@ public:
virtual void init(ConnectionPool* parent);
/**
- * Inform this Controller of a new State for a pool/host
+ * Inform this Controller that a pool should be tracked
+ */
+ virtual void addHost(PoolId id, const HostAndPort& host) = 0;
+
+ /**
+ * Inform this Controller of a new State for a pool
*
- * This function returns information about all hosts tied to this one. This function is also
- * expected to handle never-before-seen pools.
+ * This function returns the state of the group of hosts to which this host belongs.
*/
- virtual HostGroup updateHost(const SpecificPool* pool,
- const HostAndPort& host,
- const HostState& stats) = 0;
+ virtual HostGroupState updateHost(PoolId id, const HostState& stats) = 0;
/**
* Inform this Controller that a pool is no longer tracked
*/
- virtual void removeHost(const SpecificPool* pool) = 0;
+ virtual void removeHost(PoolId id) = 0;
/**
* Get controls for the given pool
*/
- virtual ConnectionControls getControls(const SpecificPool* pool) = 0;
+ virtual ConnectionControls getControls(PoolId id) = 0;
/**
* Get the various timeouts that this controller suggests
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index c2220cd95f6..cb405fe4e4e 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -1501,6 +1501,18 @@ TEST_F(ConnectionPoolTest, AsyncGet) {
}
}
+TEST_F(ConnectionPoolTest, ReturnAfterShutdown) {
+ auto pool = makePool();
+
+ // Grab a connection and hold it to end of scope
+ auto connFuture = pool->get(HostAndPort(), transport::kGlobalSSLMode, Seconds(1));
+ ConnectionImpl::pushSetup(Status::OK());
+ auto conn = std::move(connFuture).get();
+ doneWith(conn);
+
+ pool->shutdown();
+}
+
} // namespace connection_pool_test_details
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.cpp b/src/mongo/s/sharding_task_executor_pool_controller.cpp
index 77d694e2f16..a077700b6cd 100644
--- a/src/mongo/s/sharding_task_executor_pool_controller.cpp
+++ b/src/mongo/s/sharding_task_executor_pool_controller.cpp
@@ -37,6 +37,24 @@
namespace mongo {
+namespace {
+
+template <typename Map, typename Key>
+auto& getOrInvariant(Map&& map, const Key& key) noexcept {
+ auto it = std::forward<Map>(map).find(key);
+ invariant(it != std::forward<Map>(map).end(), "Unable to find key in map");
+
+ return it->second;
+}
+
+template <typename Map, typename... Args>
+void emplaceOrInvariant(Map&& map, Args&&... args) noexcept {
+ auto ret = std::forward<Map>(map).emplace(std::forward<Args>(args)...);
+ invariant(ret.second, "Element already existed in map/set");
+}
+
+} // anonymous
+
Status ShardingTaskExecutorPoolController::validateHostTimeout(const int& hostTimeoutMS) {
auto toRefreshTimeoutMS = gParameters.toRefreshTimeoutMS.load();
auto pendingTimeoutMS = gParameters.pendingTimeoutMS.load();
@@ -69,9 +87,8 @@ Status ShardingTaskExecutorPoolController::onUpdateMatchingStrategy(const std::s
gParameters.matchingStrategy.store(MatchingStrategy::kDisabled);
} else if (str == "matchPrimaryNode") {
gParameters.matchingStrategy.store(MatchingStrategy::kMatchPrimaryNode);
- // TODO Reactive once the prediction pattern is fixed in SERVER-41602
- //} else if (str == "matchBusiestNode") {
- // gParameters.matchingStrategy.store(MatchingStrategy::kMatchBusiestNode);
+ } else if (str == "matchBusiestNode") {
+ gParameters.matchingStrategy.store(MatchingStrategy::kMatchBusiestNode);
} else {
return Status{ErrorCodes::BadValue,
str::stream() << "Unrecognized matching strategy '" << str << "'"};
@@ -82,28 +99,46 @@ Status ShardingTaskExecutorPoolController::onUpdateMatchingStrategy(const std::s
void ShardingTaskExecutorPoolController::_addGroup(WithLock,
const ReplicaSetChangeNotifier::State& state) {
- // Replace the last group
- auto& group = _hostGroups[state.connStr.getSetName()];
- group = std::make_shared<HostGroupData>();
- group->state = state;
+ auto groupData = std::make_shared<GroupData>(state);
- // Mark each host with this group
+ // Mark each host with this groupData
for (auto& host : state.connStr.getServers()) {
- _hostGroupsByHost[host] = group;
+ auto& groupAndId = _groupAndIds[host];
+
+ invariant(!groupAndId.groupData);
+ groupAndId.groupData = groupData;
+
+ if (groupAndId.maybeId) {
+ // There is already a pool registered to this host
+ // This group needs to include its id in the list of members and pass its pointer
+ auto id = *groupAndId.maybeId;
+ getOrInvariant(_poolDatas, id).groupData = groupData;
+ emplaceOrInvariant(groupData->poolIds, id);
+ }
}
+
+ emplaceOrInvariant(_groupDatas, state.connStr.getSetName(), std::move(groupData));
}
void ShardingTaskExecutorPoolController::_removeGroup(WithLock, const std::string& name) {
- auto it = _hostGroups.find(name);
- if (it == _hostGroups.end()) {
+ auto it = _groupDatas.find(name);
+ if (it == _groupDatas.end()) {
return;
}
- auto& hostGroup = it->second;
- for (auto& host : hostGroup->state.connStr.getServers()) {
- _hostGroupsByHost.erase(host);
+ auto& groupData = it->second;
+ for (auto& host : groupData->state.connStr.getServers()) {
+ auto& groupAndId = getOrInvariant(_groupAndIds, host);
+ groupAndId.groupData.reset();
+ if (groupAndId.maybeId) {
+ // There is still a pool registered to this host, reset its pointer
+ getOrInvariant(_poolDatas, *groupAndId.maybeId).groupData.reset();
+ } else {
+ invariant(_groupAndIds.erase(host));
+ }
}
- _hostGroups.erase(it);
+
+ _groupDatas.erase(it);
}
class ShardingTaskExecutorPoolController::ReplicaSetChangeListener final
@@ -142,95 +177,118 @@ void ShardingTaskExecutorPoolController::init(ConnectionPool* parent) {
_listener = ReplicaSetMonitor::getNotifier().makeListener<ReplicaSetChangeListener>(this);
}
-auto ShardingTaskExecutorPoolController::updateHost(const SpecificPool* pool,
- const HostAndPort& host,
- const HostState& stats) -> HostGroup {
+void ShardingTaskExecutorPoolController::addHost(PoolId id, const HostAndPort& host) {
stdx::lock_guard lk(_mutex);
- auto& data = _poolData[pool];
+ PoolData poolData;
+ poolData.host = host;
+
+ // Set up the GroupAndId
+ auto& groupAndId = _groupAndIds[host];
+
+ invariant(!groupAndId.maybeId);
+ groupAndId.maybeId = id;
+
+ if (groupAndId.groupData) {
+ // If there is already a GroupData, then get its pointer and the PoolId to its list
+ poolData.groupData = groupAndId.groupData;
+
+ emplaceOrInvariant(groupAndId.groupData->poolIds, id);
+ }
+
+ // Add this PoolData to the set
+ emplaceOrInvariant(_poolDatas, id, std::move(poolData));
+}
+auto ShardingTaskExecutorPoolController::updateHost(PoolId id, const HostState& stats)
+ -> HostGroupState {
+ stdx::lock_guard lk(_mutex);
+
+ auto& poolData = getOrInvariant(_poolDatas, id);
const size_t minConns = gParameters.minConnections.load();
const size_t maxConns = gParameters.maxConnections.load();
// Update the target for just the pool first
- data.target = stats.requests + stats.active;
+ poolData.target = stats.requests + stats.active;
- if (data.target < minConns) {
- data.target = minConns;
- } else if (data.target > maxConns) {
- data.target = maxConns;
+ if (poolData.target < minConns) {
+ poolData.target = minConns;
+ } else if (poolData.target > maxConns) {
+ poolData.target = maxConns;
}
- data.isAbleToShutdown = stats.health.isExpired;
+ poolData.isAbleToShutdown = stats.health.isExpired;
- // If the pool isn't in a group, we can return now
- auto it = _hostGroupsByHost.find(host);
- if (it == _hostGroupsByHost.end()) {
- return {{host}, data.isAbleToShutdown};
+ // If the pool isn't in a groupData, we can return now
+ auto groupData = poolData.groupData.lock();
+ if (!groupData) {
+ return {{poolData.host}, poolData.isAbleToShutdown};
}
- // If the pool has a group, then update the group
- auto& hostGroup = it->second;
- data.hostGroup = hostGroup;
-
- // Make sure we're part of the group
- hostGroup->pools.insert(pool);
-
switch (gParameters.matchingStrategy.load()) {
case MatchingStrategy::kMatchPrimaryNode: {
- if (hostGroup->state.primary == host) {
- hostGroup->target = data.target;
+ if (groupData->state.primary == poolData.host) {
+ groupData->target = poolData.target;
}
} break;
case MatchingStrategy::kMatchBusiestNode: {
- hostGroup->target = std::max(hostGroup->target, data.target);
+ groupData->target = 0;
+ for (auto otherId : groupData->poolIds) {
+ groupData->target =
+ std::max(groupData->target, getOrInvariant(_poolDatas, otherId).target);
+ }
} break;
case MatchingStrategy::kDisabled: {
// Nothing
} break;
};
- if (hostGroup->target < minConns) {
- hostGroup->target = minConns;
- } else if (hostGroup->target > maxConns) {
- hostGroup->target = maxConns;
+ if (groupData->target < minConns) {
+ groupData->target = minConns;
+ } else if (groupData->target > maxConns) {
+ groupData->target = maxConns;
}
- auto shouldShutdown = data.isAbleToShutdown &&
- std::all_of(hostGroup->pools.begin(), hostGroup->pools.end(), [&](auto otherPool) {
- return _poolData[otherPool].isAbleToShutdown;
+ invariant(!groupData->poolIds.empty());
+ auto shouldShutdown = poolData.isAbleToShutdown &&
+ std::all_of(groupData->poolIds.begin(), groupData->poolIds.end(), [&](auto otherId) {
+ return getOrInvariant(_poolDatas, otherId).isAbleToShutdown;
});
- return {hostGroup->state.connStr.getServers(), shouldShutdown};
+ return {groupData->state.connStr.getServers(), shouldShutdown};
}
-void ShardingTaskExecutorPoolController::removeHost(const SpecificPool* pool) {
+void ShardingTaskExecutorPoolController::removeHost(PoolId id) {
stdx::lock_guard lk(_mutex);
- auto it = _poolData.find(pool);
- if (it == _poolData.end()) {
+ auto it = _poolDatas.find(id);
+ if (it == _poolDatas.end()) {
// It's possible that a host immediately needs to go before it updates even once
return;
}
- auto& data = it->second;
- if (auto hostGroup = data.hostGroup.lock()) {
- hostGroup->pools.erase(pool);
+ auto& poolData = it->second;
+ auto& groupAndId = getOrInvariant(_groupAndIds, poolData.host);
+ groupAndId.maybeId.reset();
+ if (groupAndId.groupData) {
+ invariant(groupAndId.groupData->poolIds.erase(id));
+ } else {
+ invariant(_groupAndIds.erase(poolData.host));
}
- _poolData.erase(it);
+
+ _poolDatas.erase(it);
}
-auto ShardingTaskExecutorPoolController::getControls(const SpecificPool* pool)
- -> ConnectionControls {
+auto ShardingTaskExecutorPoolController::getControls(PoolId id) -> ConnectionControls {
stdx::lock_guard lk(_mutex);
- auto& data = _poolData[pool];
+ auto& poolData = getOrInvariant(_poolDatas, id);
const size_t maxPending = gParameters.maxConnecting.load();
- auto hostGroup = data.hostGroup.lock();
- if (!hostGroup || gParameters.matchingStrategy.load() == MatchingStrategy::kDisabled) {
- return {maxPending, data.target};
+ auto groupData = poolData.groupData.lock();
+ if (!groupData || gParameters.matchingStrategy.load() == MatchingStrategy::kDisabled) {
+ return {maxPending, poolData.target};
}
- auto target = std::max(data.target, hostGroup->target);
+ auto target = std::max(poolData.target, groupData->target);
return {maxPending, target};
}
diff --git a/src/mongo/s/sharding_task_executor_pool_controller.h b/src/mongo/s/sharding_task_executor_pool_controller.h
index d61f707de73..c077578892f 100644
--- a/src/mongo/s/sharding_task_executor_pool_controller.h
+++ b/src/mongo/s/sharding_task_executor_pool_controller.h
@@ -29,6 +29,8 @@
#pragma once
+#include <boost/optional.hpp>
+
#include "mongo/base/status.h"
#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/executor/connection_pool.h"
@@ -113,12 +115,11 @@ public:
void init(ConnectionPool* parent) override;
- HostGroup updateHost(const SpecificPool* pool,
- const HostAndPort& host,
- const HostState& stats) override;
- void removeHost(const SpecificPool* pool) override;
+ void addHost(PoolId id, const HostAndPort& host) override;
+ HostGroupState updateHost(PoolId id, const HostState& stats) override;
+ void removeHost(PoolId id) override;
- ConnectionControls getControls(const SpecificPool* pool) override;
+ ConnectionControls getControls(PoolId id) override;
Milliseconds hostTimeout() const override;
Milliseconds pendingTimeout() const override;
@@ -133,40 +134,44 @@ private:
void _removeGroup(WithLock, const std::string& key);
/**
- * HostGroup is a shared state for a set of hosts (a replica set).
+ * GroupData is a shared state for a set of hosts (a replica set).
*
* When the ReplicaSetChangeListener is informed of a change to a replica set,
- * it creates a new HostGroup and fills it into _hostGroups[setName] and
- * _hostGroupsByHost[memberHost]. This does not immediately affect the results of getControls.
+ * it creates a new GroupData and fills it into _groupDatas[setName] and
+ * _groupAndIds[memberHost].
*
- * When a SpecificPool calls updateHost, it checks _hostGroupsByHost to see if it belongs to
- * any group and pushes itself into hostData for that group. It then will update target for its
- * group according to the MatchingStrategy. It will also set shouldShutdown to true if every
- * member of the group has shouldShutdown at true.
+ * When a SpecificPool calls updateHost, it then will update target for its group according to
+ * the MatchingStrategy. It will also postpone shutdown until all members of its group are ready
+ * to shutdown.
*
- * Note that a HostData can find itself orphaned from its HostGroup during a reconfig.
+ * Note that a PoolData can find itself orphaned from its GroupData during a reconfig.
*/
- struct HostGroupData {
+ struct GroupData {
+ explicit GroupData(const ReplicaSetChangeNotifier::State& state_) : state{state_} {}
+
// The ReplicaSet state for this set
ReplicaSetChangeNotifier::State state;
- // Pointer index for each pool in the set
- stdx::unordered_set<const SpecificPool*> pools;
+ // Id for each pool in the set
+ stdx::unordered_set<PoolId> poolIds;
- // The number of connections that all hosts in the group should maintain
+ // The number of connections that all pools in the group should maintain
size_t target = 0;
};
/**
- * HostData represents the current state for a specific HostAndPort/SpecificPool.
+ * PoolData represents the current state for a SpecificPool.
*
- * It is mutated by updateHost/removeHost and used along with Parameters to form Controls
- * for getControls.
+ * It is mutated by addHost/updateHost/removeHost and used along with Parameters to form
+ * Controls for getControls.
*/
- struct HostData {
- // The HostGroup associated with this pool.
+ struct PoolData {
+ // The host associated with this pool
+ HostAndPort host;
+
+ // The GroupData associated with this pool.
// Note that this will be invalid if there was a replica set change
- std::weak_ptr<HostGroupData> hostGroup;
+ std::weak_ptr<GroupData> groupData;
// The number of connections the host should maintain
size_t target = 0;
@@ -175,12 +180,32 @@ private:
bool isAbleToShutdown = false;
};
+ /**
+ * A GroupAndId allows incoming GroupData and PoolData to find each other
+ *
+ * Note that each side of the pair initializes independently. The side that is ctor'd last adds
+ * the id to the GroupData's poolIds and a GroupData ptr to the PoolData for maybeId. Likewise,
+ * the side that is dtor'd last removes the GroupAndId.
+ */
+ struct GroupAndId {
+ std::shared_ptr<GroupData> groupData;
+ boost::optional<PoolId> maybeId;
+ };
+
ReplicaSetChangeListenerHandle _listener;
stdx::mutex _mutex;
- stdx::unordered_map<const SpecificPool*, HostData> _poolData;
- stdx::unordered_map<std::string, std::shared_ptr<HostGroupData>> _hostGroups;
- stdx::unordered_map<HostAndPort, std::shared_ptr<HostGroupData>> _hostGroupsByHost;
+ // Entires to _poolDatas are added by addHost() and removed by removeHost()
+ stdx::unordered_map<PoolId, PoolData> _poolDatas;
+
+ // Entries to _groupData are added by _addGroup() and removed by _removeGroup()
+ stdx::unordered_map<std::string, std::shared_ptr<GroupData>> _groupDatas;
+
+ // Entries to _groupAndIds are added by the first caller of either addHost() or _addGroup() and
+ // removed by the last caller either removeHost() or _removeGroup(). This map exists to tie
+ // together a pool and a group based on a HostAndPort. It is hopefully used once, because a
+ // PoolId is much cheaper to index than a HostAndPort.
+ stdx::unordered_map<HostAndPort, GroupAndId> _groupAndIds;
};
} // namespace mongo