summaryrefslogtreecommitdiff
path: root/src/mongo/s/client/shard_registry.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/client/shard_registry.cpp')
-rw-r--r--src/mongo/s/client/shard_registry.cpp329
1 files changed, 156 insertions, 173 deletions
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 61ee14708a1..e0a5cc13476 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -64,12 +64,6 @@
namespace mongo {
-using std::set;
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-using std::vector;
-
using executor::NetworkInterface;
using executor::NetworkInterfaceThreadPool;
using executor::TaskExecutor;
@@ -90,18 +84,20 @@ ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
std::vector<ShardRemovalHook> shardRemovalHooks)
: _shardFactory(std::move(shardFactory)),
_initConfigServerCS(configServerCS),
- _shardRemovalHooks(std::move(shardRemovalHooks)) {}
+ _shardRemovalHooks(std::move(shardRemovalHooks)) {
+ invariant(_initConfigServerCS.isValid());
+}
ShardRegistry::~ShardRegistry() {
shutdown();
}
void ShardRegistry::shutdown() {
- if (_executor && !_isShutdown) {
+ if (_executor && !_isShutdown.load()) {
LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry");
_executor->shutdown();
_executor->join();
- _isShutdown = true;
+ _isShutdown.store(true);
}
}
@@ -109,17 +105,17 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const {
return getConfigShard()->getConnString();
}
-StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
- const ShardId& shardId) {
+StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
+ const ShardId& shardId) {
// If we know about the shard, return it.
- auto shard = _data.findShard(shardId);
+ auto shard = getShardNoReload(shardId);
if (shard) {
return shard;
}
// If we can't find the shard, attempt to reload the ShardRegistry.
bool didReload = reload(opCtx);
- shard = _data.findShard(shardId);
+ shard = getShardNoReload(shardId);
// If we found the shard, return it.
if (shard) {
@@ -135,7 +131,7 @@ StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
// If we did not perform the reload ourselves (because there was a concurrent reload), force a
// reload again to ensure that we have seen data at least as up to date as our first reload.
reload(opCtx);
- shard = _data.findShard(shardId);
+ shard = getShardNoReload(shardId);
if (shard) {
return shard;
@@ -144,35 +140,41 @@ StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
}
-shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) {
+std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) {
+ stdx::lock_guard<Latch> lk(_mutex);
return _data.findShard(shardId);
}
-shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) {
+std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) {
+ stdx::lock_guard<Latch> lk(_mutex);
return _data.findByHostAndPort(host);
}
-shared_ptr<Shard> ShardRegistry::getConfigShard() const {
- auto shard = _data.getConfigShard();
- invariant(shard);
- return shard;
+std::shared_ptr<Shard> ShardRegistry::getConfigShard() const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ invariant(_configShard);
+ return _configShard;
}
-unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
+std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr);
}
-shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const {
+std::shared_ptr<Shard> ShardRegistry::lookupRSName(const std::string& name) const {
+ stdx::lock_guard<Latch> lk(_mutex);
return _data.findByRSName(name);
}
-void ShardRegistry::getAllShardIdsNoReload(vector<ShardId>* all) const {
+void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const {
std::set<ShardId> seen;
- _data.getAllShardIds(seen);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data.getAllShardIds(seen);
+ }
all->assign(seen.begin(), seen.end());
}
-void ShardRegistry::getAllShardIds(OperationContext* opCtx, vector<ShardId>* all) {
+void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) {
getAllShardIdsNoReload(all);
if (all->empty()) {
bool didReload = reload(opCtx);
@@ -188,31 +190,44 @@ void ShardRegistry::getAllShardIds(OperationContext* opCtx, vector<ShardId>* all
int ShardRegistry::getNumShards() const {
std::set<ShardId> seen;
- _data.getAllShardIds(seen);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data.getAllShardIds(seen);
+ }
return seen.size();
}
-void ShardRegistry::toBSON(BSONObjBuilder* result) const {
- _data.toBSON(result);
-}
-
void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) {
invariant(newConnString.type() == ConnectionString::SET ||
newConnString.type() == ConnectionString::CUSTOM); // For dbtests
// to prevent update config shard connection string during init
- stdx::unique_lock<Latch> lock(_reloadMutex);
- _data.rebuildShardIfExists(newConnString, _shardFactory.get());
+ stdx::unique_lock<Latch> lock(_mutex);
+
+ auto shard = _data.findByRSName(newConnString.getSetName());
+ if (!shard) {
+ return;
+ }
+
+ auto [data, updatedShard] =
+ ShardRegistryData::createFromExisting(_data, newConnString, _shardFactory.get());
+
+ if (updatedShard && updatedShard->isConfig()) {
+ _configShard = updatedShard;
+ }
+
+ _data = data;
}
void ShardRegistry::init() {
- stdx::unique_lock<Latch> reloadLock(_reloadMutex);
- invariant(_initConfigServerCS.isValid());
- auto configShard =
- _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS);
- _data.addConfigShard(configShard);
- // set to invalid so it cant be started more than once.
- _initConfigServerCS = ConnectionString();
+ invariant(!_isInitialized.load());
+ {
+ stdx::unique_lock<Latch> lock(_mutex);
+ _configShard =
+ _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS);
+ _data = ShardRegistryData::createWithConfigShardOnly(_configShard);
+ }
+ _isInitialized.store(true);
}
void ShardRegistry::startup(OperationContext* opCtx) {
@@ -291,8 +306,7 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) {
}
bool ShardRegistry::isUp() const {
- stdx::unique_lock<Latch> reloadLock(_reloadMutex);
- return _isUp;
+ return _isUp.load();
}
bool ShardRegistry::reload(OperationContext* opCtx) {
@@ -335,17 +349,20 @@ bool ShardRegistry::reload(OperationContext* opCtx) {
_inReloadCV.notify_all();
});
- ShardRegistryData currData(opCtx, _shardFactory.get());
- currData.addConfigShard(_data.getConfigShard());
- _data.swapAndMerge(currData);
+ ShardRegistryData reloadedData =
+ ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get(), getConfigShard());
- // Remove RSMs that are not in the catalog any more.
- std::set<ShardId> removedShardIds;
- currData.getAllShardIds(removedShardIds);
- _data.shardIdSetDifference(removedShardIds);
+ stdx::unique_lock<Latch> lock(_mutex);
+
+ auto [mergedData, removedShards] = ShardRegistryData::mergeExisting(_data, reloadedData);
+ _data = std::move(mergedData);
+
+ lock.unlock();
- for (auto& shardId : removedShardIds) {
- auto shard = currData.findByShardId(shardId);
+ // Remove RSMs that are not in the catalog any more.
+ for (auto& pair : removedShards) {
+ auto& shardId = pair.first;
+ auto& shard = pair.second;
invariant(shard);
auto name = shard->getConnString().getSetName();
@@ -359,14 +376,14 @@ bool ShardRegistry::reload(OperationContext* opCtx) {
nextReloadState = ReloadState::Idle;
// first successful reload means that registry is up
- _isUp = true;
+ _isUp.store(true);
return true;
}
void ShardRegistry::clearEntries() {
ShardRegistryData empty;
- empty.addConfigShard(_data.getConfigShard());
- _data.swap(empty);
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data = empty;
}
void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext,
@@ -410,9 +427,38 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex
}
}
+void ShardRegistry::toBSON(BSONObjBuilder* result) const {
+ std::vector<std::shared_ptr<Shard>> shards;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data.getAllShards(shards);
+ }
+
+ std::sort(std::begin(shards),
+ std::end(shards),
+ [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) {
+ return lhs->getId() < rhs->getId();
+ });
+
+ BSONObjBuilder mapBob(result->subobjStart("map"));
+ for (auto&& shard : shards) {
+ // Intentionally calling getConnString while not holding _mutex
+ // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote.
+ mapBob.append(shard->getId(), shard->getConnString().toString());
+ }
+}
+
////////////// ShardRegistryData //////////////////
-ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory) {
+ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr<Shard> configShard) {
+ ShardRegistryData data;
+ data._addShard(configShard, true);
+ return data;
+}
+
+ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* opCtx,
+ ShardFactory* shardFactory,
+ std::shared_ptr<Shard> configShard) {
auto const catalogClient = Grid::get(opCtx)->catalogClient();
auto readConcern = repl::ReadConcernLevel::kMajorityReadConcern;
@@ -459,6 +505,8 @@ ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shar
shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue()));
}
+ ShardRegistryData data;
+ data._addShard(configShard, true);
for (auto& shardInfo : shardsInfo) {
if (std::get<0>(shardInfo) == "config") {
continue;
@@ -467,98 +515,88 @@ ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shar
auto shard = shardFactory->createShard(std::move(std::get<0>(shardInfo)),
std::move(std::get<1>(shardInfo)));
- _addShard(WithLock::withoutLock(), std::move(shard), false);
+ data._addShard(std::move(shard), false);
}
+ return data;
}
-void ShardRegistryData::swap(ShardRegistryData& other) {
- stdx::lock_guard<Latch> lk(_mutex);
- _shardIdLookup.swap(other._shardIdLookup);
- _rsLookup.swap(other._rsLookup);
- _hostLookup.swap(other._hostLookup);
- _connStringLookup.swap(other._connStringLookup);
- _configShard.swap(other._configShard);
-}
+std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mergeExisting(
+ const ShardRegistryData& alreadyCachedData, const ShardRegistryData& configServerData) {
+ ShardRegistryData mergedData(configServerData);
-void ShardRegistryData::swapAndMerge(ShardRegistryData& other) {
- stdx::lock_guard<Latch> lk(_mutex);
- _rsLookup.swap(other._rsLookup);
- _configShard.swap(other._configShard);
- _shardIdLookup.swap(other._shardIdLookup);
-
- for (auto it = other._connStringLookup.begin(); it != other._connStringLookup.end(); ++it) {
- auto res = _connStringLookup.find(it->first);
- if (res == _connStringLookup.end()) {
- _connStringLookup[it->first] = it->second;
- }
+ // For connstrings and hosts, prefer values from alreadyCachedData to whatever might have been
+ // fetched from the configsvrs.
+ for (auto it = alreadyCachedData._connStringLookup.begin();
+ it != alreadyCachedData._connStringLookup.end();
+ ++it) {
+ mergedData._connStringLookup[it->first] = it->second;
+ }
+ for (auto it = alreadyCachedData._hostLookup.begin(); it != alreadyCachedData._hostLookup.end();
+ ++it) {
+ mergedData._hostLookup[it->first] = it->second;
}
- for (auto it = other._hostLookup.begin(); it != other._hostLookup.end(); ++it) {
- auto res = _hostLookup.find(it->first);
- if (res == _hostLookup.end()) {
- _hostLookup[it->first] = it->second;
+ // Find the shards that are no longer present.
+ ShardMap removedShards;
+ for (auto i = alreadyCachedData._shardIdLookup.begin();
+ i != alreadyCachedData._shardIdLookup.end();
+ ++i) {
+ invariant(i->second);
+ if (mergedData._shardIdLookup.find(i->second->getId()) == mergedData._shardIdLookup.end()) {
+ removedShards[i->second->getId()] = i->second;
}
}
-}
-shared_ptr<Shard> ShardRegistryData::getConfigShard() const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _configShard;
+ return {mergedData, removedShards};
}
-void ShardRegistryData::addConfigShard(std::shared_ptr<Shard> shard) {
- stdx::lock_guard<Latch> lk(_mutex);
- _configShard = shard;
- _addShard(lk, shard, true);
-}
+std::pair<ShardRegistryData, std::shared_ptr<Shard>> ShardRegistryData::createFromExisting(
+ const ShardRegistryData& existingData,
+ const ConnectionString& newConnString,
+ ShardFactory* shardFactory) {
+ ShardRegistryData data(existingData);
-shared_ptr<Shard> ShardRegistryData::findByRSName(const string& name) const {
- stdx::lock_guard<Latch> lk(_mutex);
- auto i = _rsLookup.find(name);
- return (i != _rsLookup.end()) ? i->second : nullptr;
-}
+ auto it = data._rsLookup.find(newConnString.getSetName());
+ if (it == data._rsLookup.end()) {
+ return {data, nullptr};
+ }
+ invariant(it->second);
+ auto updatedShard = shardFactory->createShard(it->second->getId(), newConnString);
+ data._addShard(updatedShard, true);
-shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAndPort) const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _findByHostAndPort(lk, hostAndPort);
+ return {data, updatedShard};
}
-shared_ptr<Shard> ShardRegistryData::findByShardId(const ShardId& shardId) const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _findByShardId(lk, shardId);
+std::shared_ptr<Shard> ShardRegistryData::findByRSName(const std::string& name) const {
+ auto i = _rsLookup.find(name);
+ return (i != _rsLookup.end()) ? i->second : nullptr;
}
-shared_ptr<Shard> ShardRegistryData::_findByConnectionString(
- WithLock, const ConnectionString& connectionString) const {
+std::shared_ptr<Shard> ShardRegistryData::_findByConnectionString(
+ const ConnectionString& connectionString) const {
auto i = _connStringLookup.find(connectionString);
return (i != _connStringLookup.end()) ? i->second : nullptr;
}
-shared_ptr<Shard> ShardRegistryData::_findByHostAndPort(WithLock,
- const HostAndPort& hostAndPort) const {
+std::shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAndPort) const {
auto i = _hostLookup.find(hostAndPort);
return (i != _hostLookup.end()) ? i->second : nullptr;
}
-shared_ptr<Shard> ShardRegistryData::_findByShardId(WithLock, ShardId const& shardId) const {
+std::shared_ptr<Shard> ShardRegistryData::_findByShardId(const ShardId& shardId) const {
auto i = _shardIdLookup.find(shardId);
return (i != _shardIdLookup.end()) ? i->second : nullptr;
}
-shared_ptr<Shard> ShardRegistryData::findShard(ShardId const& shardId) const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _findShard(lk, shardId);
-}
-
-shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shardId) const {
- auto shard = _findByShardId(lk, shardId);
+std::shared_ptr<Shard> ShardRegistryData::findShard(const ShardId& shardId) const {
+ auto shard = _findByShardId(shardId);
if (shard) {
return shard;
}
StatusWith<ConnectionString> swConnString = ConnectionString::parse(shardId.toString());
if (swConnString.isOK()) {
- shard = _findByConnectionString(lk, swConnString.getValue());
+ shard = _findByConnectionString(swConnString.getValue());
if (shard) {
return shard;
}
@@ -566,7 +604,7 @@ shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shar
StatusWith<HostAndPort> swHostAndPort = HostAndPort::parse(shardId.toString());
if (swHostAndPort.isOK()) {
- shard = _findByHostAndPort(lk, swHostAndPort.getValue());
+ shard = findByHostAndPort(swHostAndPort.getValue());
if (shard) {
return shard;
}
@@ -575,32 +613,14 @@ shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shar
return nullptr;
}
-void ShardRegistryData::toBSON(BSONObjBuilder* result) const {
- std::vector<std::shared_ptr<Shard>> shards;
- {
- stdx::lock_guard<Latch> lk(_mutex);
- shards.reserve(_shardIdLookup.size());
- for (auto&& shard : _shardIdLookup) {
- shards.emplace_back(shard.second);
- }
- }
-
- std::sort(std::begin(shards),
- std::end(shards),
- [](const std::shared_ptr<Shard>& lhs, const std::shared_ptr<Shard>& rhs) {
- return lhs->getId() < rhs->getId();
- });
-
- BSONObjBuilder mapBob(result->subobjStart("map"));
- for (auto&& shard : shards) {
- // Intentionally calling getConnString while not holding ShardRegistryData::_mutex
- // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote.
- mapBob.append(shard->getId(), shard->getConnString().toString());
+void ShardRegistryData::getAllShards(std::vector<std::shared_ptr<Shard>>& result) const {
+ result.reserve(_shardIdLookup.size());
+ for (auto&& shard : _shardIdLookup) {
+ result.emplace_back(shard.second);
}
}
void ShardRegistryData::getAllShardIds(std::set<ShardId>& seen) const {
- stdx::lock_guard<Latch> lk(_mutex);
for (auto i = _shardIdLookup.begin(); i != _shardIdLookup.end(); ++i) {
const auto& s = i->second;
if (s->getId().toString() == "config") {
@@ -610,50 +630,13 @@ void ShardRegistryData::getAllShardIds(std::set<ShardId>& seen) const {
}
}
-void ShardRegistryData::shardIdSetDifference(std::set<ShardId>& diff) const {
- stdx::lock_guard<Latch> lk(_mutex);
- for (auto i = _shardIdLookup.begin(); i != _shardIdLookup.end(); ++i) {
- invariant(i->second);
- auto res = diff.find(i->second->getId());
- if (res != diff.end()) {
- diff.erase(res);
- }
- }
-}
-
-void ShardRegistryData::rebuildShardIfExists(const ConnectionString& newConnString,
- ShardFactory* factory) {
- stdx::unique_lock<Latch> updateConnStringLock(_mutex);
- auto it = _rsLookup.find(newConnString.getSetName());
- if (it == _rsLookup.end()) {
- return;
- }
-
- _rebuildShard(updateConnStringLock, newConnString, factory);
-}
-
-
-void ShardRegistryData::_rebuildShard(WithLock lk,
- ConnectionString const& newConnString,
- ShardFactory* factory) {
- auto it = _rsLookup.find(newConnString.getSetName());
- invariant(it->second);
- auto shard = factory->createShard(it->second->getId(), newConnString);
- _addShard(lk, shard, true);
- if (shard->isConfig()) {
- _configShard = shard;
- }
-}
-
-void ShardRegistryData::_addShard(WithLock lk,
- std::shared_ptr<Shard> const& shard,
- bool useOriginalCS) {
+void ShardRegistryData::_addShard(std::shared_ptr<Shard> shard, bool useOriginalCS) {
const ShardId shardId = shard->getId();
const ConnectionString connString =
useOriginalCS ? shard->originalConnString() : shard->getConnString();
- auto currentShard = _findShard(lk, shardId);
+ auto currentShard = findShard(shardId);
if (currentShard) {
auto oldConnString = currentShard->originalConnString();