diff options
Diffstat (limited to 'src/mongo/s/client/shard_registry.cpp')
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 329 |
1 files changed, 156 insertions, 173 deletions
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 61ee14708a1..e0a5cc13476 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -64,12 +64,6 @@ namespace mongo { -using std::set; -using std::shared_ptr; -using std::string; -using std::unique_ptr; -using std::vector; - using executor::NetworkInterface; using executor::NetworkInterfaceThreadPool; using executor::TaskExecutor; @@ -90,18 +84,20 @@ ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, std::vector<ShardRemovalHook> shardRemovalHooks) : _shardFactory(std::move(shardFactory)), _initConfigServerCS(configServerCS), - _shardRemovalHooks(std::move(shardRemovalHooks)) {} + _shardRemovalHooks(std::move(shardRemovalHooks)) { + invariant(_initConfigServerCS.isValid()); +} ShardRegistry::~ShardRegistry() { shutdown(); } void ShardRegistry::shutdown() { - if (_executor && !_isShutdown) { + if (_executor && !_isShutdown.load()) { LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry"); _executor->shutdown(); _executor->join(); - _isShutdown = true; + _isShutdown.store(true); } } @@ -109,17 +105,17 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const { return getConfigShard()->getConnString(); } -StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, - const ShardId& shardId) { +StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, + const ShardId& shardId) { // If we know about the shard, return it. - auto shard = _data.findShard(shardId); + auto shard = getShardNoReload(shardId); if (shard) { return shard; } // If we can't find the shard, attempt to reload the ShardRegistry. bool didReload = reload(opCtx); - shard = _data.findShard(shardId); + shard = getShardNoReload(shardId); // If we found the shard, return it. if (shard) { @@ -135,7 +131,7 @@ StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, // If we did not perform the reload ourselves (because there was a concurrent reload), force a // reload again to ensure that we have seen data at least as up to date as our first reload. reload(opCtx); - shard = _data.findShard(shardId); + shard = getShardNoReload(shardId); if (shard) { return shard; @@ -144,35 +140,41 @@ StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"}; } -shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) { +std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) { + stdx::lock_guard<Latch> lk(_mutex); return _data.findShard(shardId); } -shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) { +std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) { + stdx::lock_guard<Latch> lk(_mutex); return _data.findByHostAndPort(host); } -shared_ptr<Shard> ShardRegistry::getConfigShard() const { - auto shard = _data.getConfigShard(); - invariant(shard); - return shard; +std::shared_ptr<Shard> ShardRegistry::getConfigShard() const { + stdx::lock_guard<Latch> lk(_mutex); + invariant(_configShard); + return _configShard; } -unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { +std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr); } -shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const { +std::shared_ptr<Shard> ShardRegistry::lookupRSName(const std::string& name) const { + stdx::lock_guard<Latch> lk(_mutex); return _data.findByRSName(name); } -void ShardRegistry::getAllShardIdsNoReload(vector<ShardId>* all) const { +void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const { std::set<ShardId> seen; - _data.getAllShardIds(seen); + { + stdx::lock_guard<Latch> lk(_mutex); + _data.getAllShardIds(seen); + } all->assign(seen.begin(), seen.end()); } -void ShardRegistry::getAllShardIds(OperationContext* opCtx, vector<ShardId>* all) { +void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) { getAllShardIdsNoReload(all); if (all->empty()) { bool didReload = reload(opCtx); @@ -188,31 +190,44 @@ void ShardRegistry::getAllShardIds(OperationContext* opCtx, vector<ShardId>* all int ShardRegistry::getNumShards() const { std::set<ShardId> seen; - _data.getAllShardIds(seen); + { + stdx::lock_guard<Latch> lk(_mutex); + _data.getAllShardIds(seen); + } return seen.size(); } -void ShardRegistry::toBSON(BSONObjBuilder* result) const { - _data.toBSON(result); -} - void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { invariant(newConnString.type() == ConnectionString::SET || newConnString.type() == ConnectionString::CUSTOM); // For dbtests // to prevent update config shard connection string during init - stdx::unique_lock<Latch> lock(_reloadMutex); - _data.rebuildShardIfExists(newConnString, _shardFactory.get()); + stdx::unique_lock<Latch> lock(_mutex); + + auto shard = _data.findByRSName(newConnString.getSetName()); + if (!shard) { + return; + } + + auto [data, updatedShard] = + ShardRegistryData::createFromExisting(_data, newConnString, _shardFactory.get()); + + if (updatedShard && updatedShard->isConfig()) { + _configShard = updatedShard; + } + + _data = data; } void ShardRegistry::init() { - stdx::unique_lock<Latch> reloadLock(_reloadMutex); - invariant(_initConfigServerCS.isValid()); - auto configShard = - _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS); - _data.addConfigShard(configShard); - // set to invalid so it cant be started more than once. - _initConfigServerCS = ConnectionString(); + invariant(!_isInitialized.load()); + { + stdx::unique_lock<Latch> lock(_mutex); + _configShard = + _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS); + _data = ShardRegistryData::createWithConfigShardOnly(_configShard); + } + _isInitialized.store(true); } void ShardRegistry::startup(OperationContext* opCtx) { @@ -291,8 +306,7 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) { } bool ShardRegistry::isUp() const { - stdx::unique_lock<Latch> reloadLock(_reloadMutex); - return _isUp; + return _isUp.load(); } bool ShardRegistry::reload(OperationContext* opCtx) { @@ -335,17 +349,20 @@ bool ShardRegistry::reload(OperationContext* opCtx) { _inReloadCV.notify_all(); }); - ShardRegistryData currData(opCtx, _shardFactory.get()); - currData.addConfigShard(_data.getConfigShard()); - _data.swapAndMerge(currData); + ShardRegistryData reloadedData = + ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get(), getConfigShard()); - // Remove RSMs that are not in the catalog any more. - std::set<ShardId> removedShardIds; - currData.getAllShardIds(removedShardIds); - _data.shardIdSetDifference(removedShardIds); + stdx::unique_lock<Latch> lock(_mutex); + + auto [mergedData, removedShards] = ShardRegistryData::mergeExisting(_data, reloadedData); + _data = std::move(mergedData); + + lock.unlock(); - for (auto& shardId : removedShardIds) { - auto shard = currData.findByShardId(shardId); + // Remove RSMs that are not in the catalog any more. + for (auto& pair : removedShards) { + auto& shardId = pair.first; + auto& shard = pair.second; invariant(shard); auto name = shard->getConnString().getSetName(); @@ -359,14 +376,14 @@ bool ShardRegistry::reload(OperationContext* opCtx) { nextReloadState = ReloadState::Idle; // first successful reload means that registry is up - _isUp = true; + _isUp.store(true); return true; } void ShardRegistry::clearEntries() { ShardRegistryData empty; - empty.addConfigShard(_data.getConfigShard()); - _data.swap(empty); + stdx::lock_guard<Latch> lk(_mutex); + _data = empty; } void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext, @@ -410,9 +427,38 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex } } +void ShardRegistry::toBSON(BSONObjBuilder* result) const { + std::vector<std::shared_ptr<Shard>> shards; + { + stdx::lock_guard<Latch> lk(_mutex); + _data.getAllShards(shards); + } + + std::sort(std::begin(shards), + std::end(shards), + [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) { + return lhs->getId() < rhs->getId(); + }); + + BSONObjBuilder mapBob(result->subobjStart("map")); + for (auto&& shard : shards) { + // Intentionally calling getConnString while not holding _mutex + // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote. + mapBob.append(shard->getId(), shard->getConnString().toString()); + } +} + ////////////// ShardRegistryData ////////////////// -ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory) { +ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr<Shard> configShard) { + ShardRegistryData data; + data._addShard(configShard, true); + return data; +} + +ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* opCtx, + ShardFactory* shardFactory, + std::shared_ptr<Shard> configShard) { auto const catalogClient = Grid::get(opCtx)->catalogClient(); auto readConcern = repl::ReadConcernLevel::kMajorityReadConcern; @@ -459,6 +505,8 @@ ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shar shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue())); } + ShardRegistryData data; + data._addShard(configShard, true); for (auto& shardInfo : shardsInfo) { if (std::get<0>(shardInfo) == "config") { continue; @@ -467,98 +515,88 @@ ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shar auto shard = shardFactory->createShard(std::move(std::get<0>(shardInfo)), std::move(std::get<1>(shardInfo))); - _addShard(WithLock::withoutLock(), std::move(shard), false); + data._addShard(std::move(shard), false); } + return data; } -void ShardRegistryData::swap(ShardRegistryData& other) { - stdx::lock_guard<Latch> lk(_mutex); - _shardIdLookup.swap(other._shardIdLookup); - _rsLookup.swap(other._rsLookup); - _hostLookup.swap(other._hostLookup); - _connStringLookup.swap(other._connStringLookup); - _configShard.swap(other._configShard); -} +std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mergeExisting( + const ShardRegistryData& alreadyCachedData, const ShardRegistryData& configServerData) { + ShardRegistryData mergedData(configServerData); -void ShardRegistryData::swapAndMerge(ShardRegistryData& other) { - stdx::lock_guard<Latch> lk(_mutex); - _rsLookup.swap(other._rsLookup); - _configShard.swap(other._configShard); - _shardIdLookup.swap(other._shardIdLookup); - - for (auto it = other._connStringLookup.begin(); it != other._connStringLookup.end(); ++it) { - auto res = _connStringLookup.find(it->first); - if (res == _connStringLookup.end()) { - _connStringLookup[it->first] = it->second; - } + // For connstrings and hosts, prefer values from alreadyCachedData to whatever might have been + // fetched from the configsvrs. + for (auto it = alreadyCachedData._connStringLookup.begin(); + it != alreadyCachedData._connStringLookup.end(); + ++it) { + mergedData._connStringLookup[it->first] = it->second; + } + for (auto it = alreadyCachedData._hostLookup.begin(); it != alreadyCachedData._hostLookup.end(); + ++it) { + mergedData._hostLookup[it->first] = it->second; } - for (auto it = other._hostLookup.begin(); it != other._hostLookup.end(); ++it) { - auto res = _hostLookup.find(it->first); - if (res == _hostLookup.end()) { - _hostLookup[it->first] = it->second; + // Find the shards that are no longer present. + ShardMap removedShards; + for (auto i = alreadyCachedData._shardIdLookup.begin(); + i != alreadyCachedData._shardIdLookup.end(); + ++i) { + invariant(i->second); + if (mergedData._shardIdLookup.find(i->second->getId()) == mergedData._shardIdLookup.end()) { + removedShards[i->second->getId()] = i->second; } } -} -shared_ptr<Shard> ShardRegistryData::getConfigShard() const { - stdx::lock_guard<Latch> lk(_mutex); - return _configShard; + return {mergedData, removedShards}; } -void ShardRegistryData::addConfigShard(std::shared_ptr<Shard> shard) { - stdx::lock_guard<Latch> lk(_mutex); - _configShard = shard; - _addShard(lk, shard, true); -} +std::pair<ShardRegistryData, std::shared_ptr<Shard>> ShardRegistryData::createFromExisting( + const ShardRegistryData& existingData, + const ConnectionString& newConnString, + ShardFactory* shardFactory) { + ShardRegistryData data(existingData); -shared_ptr<Shard> ShardRegistryData::findByRSName(const string& name) const { - stdx::lock_guard<Latch> lk(_mutex); - auto i = _rsLookup.find(name); - return (i != _rsLookup.end()) ? i->second : nullptr; -} + auto it = data._rsLookup.find(newConnString.getSetName()); + if (it == data._rsLookup.end()) { + return {data, nullptr}; + } + invariant(it->second); + auto updatedShard = shardFactory->createShard(it->second->getId(), newConnString); + data._addShard(updatedShard, true); -shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAndPort) const { - stdx::lock_guard<Latch> lk(_mutex); - return _findByHostAndPort(lk, hostAndPort); + return {data, updatedShard}; } -shared_ptr<Shard> ShardRegistryData::findByShardId(const ShardId& shardId) const { - stdx::lock_guard<Latch> lk(_mutex); - return _findByShardId(lk, shardId); +std::shared_ptr<Shard> ShardRegistryData::findByRSName(const std::string& name) const { + auto i = _rsLookup.find(name); + return (i != _rsLookup.end()) ? i->second : nullptr; } -shared_ptr<Shard> ShardRegistryData::_findByConnectionString( - WithLock, const ConnectionString& connectionString) const { +std::shared_ptr<Shard> ShardRegistryData::_findByConnectionString( + const ConnectionString& connectionString) const { auto i = _connStringLookup.find(connectionString); return (i != _connStringLookup.end()) ? i->second : nullptr; } -shared_ptr<Shard> ShardRegistryData::_findByHostAndPort(WithLock, - const HostAndPort& hostAndPort) const { +std::shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAndPort) const { auto i = _hostLookup.find(hostAndPort); return (i != _hostLookup.end()) ? i->second : nullptr; } -shared_ptr<Shard> ShardRegistryData::_findByShardId(WithLock, ShardId const& shardId) const { +std::shared_ptr<Shard> ShardRegistryData::_findByShardId(const ShardId& shardId) const { auto i = _shardIdLookup.find(shardId); return (i != _shardIdLookup.end()) ? i->second : nullptr; } -shared_ptr<Shard> ShardRegistryData::findShard(ShardId const& shardId) const { - stdx::lock_guard<Latch> lk(_mutex); - return _findShard(lk, shardId); -} - -shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shardId) const { - auto shard = _findByShardId(lk, shardId); +std::shared_ptr<Shard> ShardRegistryData::findShard(const ShardId& shardId) const { + auto shard = _findByShardId(shardId); if (shard) { return shard; } StatusWith<ConnectionString> swConnString = ConnectionString::parse(shardId.toString()); if (swConnString.isOK()) { - shard = _findByConnectionString(lk, swConnString.getValue()); + shard = _findByConnectionString(swConnString.getValue()); if (shard) { return shard; } @@ -566,7 +604,7 @@ shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shar StatusWith<HostAndPort> swHostAndPort = HostAndPort::parse(shardId.toString()); if (swHostAndPort.isOK()) { - shard = _findByHostAndPort(lk, swHostAndPort.getValue()); + shard = findByHostAndPort(swHostAndPort.getValue()); if (shard) { return shard; } @@ -575,32 +613,14 @@ shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shar return nullptr; } -void ShardRegistryData::toBSON(BSONObjBuilder* result) const { - std::vector<std::shared_ptr<Shard>> shards; - { - stdx::lock_guard<Latch> lk(_mutex); - shards.reserve(_shardIdLookup.size()); - for (auto&& shard : _shardIdLookup) { - shards.emplace_back(shard.second); - } - } - - std::sort(std::begin(shards), - std::end(shards), - [](const std::shared_ptr<Shard>& lhs, const std::shared_ptr<Shard>& rhs) { - return lhs->getId() < rhs->getId(); - }); - - BSONObjBuilder mapBob(result->subobjStart("map")); - for (auto&& shard : shards) { - // Intentionally calling getConnString while not holding ShardRegistryData::_mutex - // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote. - mapBob.append(shard->getId(), shard->getConnString().toString()); +void ShardRegistryData::getAllShards(std::vector<std::shared_ptr<Shard>>& result) const { + result.reserve(_shardIdLookup.size()); + for (auto&& shard : _shardIdLookup) { + result.emplace_back(shard.second); } } void ShardRegistryData::getAllShardIds(std::set<ShardId>& seen) const { - stdx::lock_guard<Latch> lk(_mutex); for (auto i = _shardIdLookup.begin(); i != _shardIdLookup.end(); ++i) { const auto& s = i->second; if (s->getId().toString() == "config") { @@ -610,50 +630,13 @@ void ShardRegistryData::getAllShardIds(std::set<ShardId>& seen) const { } } -void ShardRegistryData::shardIdSetDifference(std::set<ShardId>& diff) const { - stdx::lock_guard<Latch> lk(_mutex); - for (auto i = _shardIdLookup.begin(); i != _shardIdLookup.end(); ++i) { - invariant(i->second); - auto res = diff.find(i->second->getId()); - if (res != diff.end()) { - diff.erase(res); - } - } -} - -void ShardRegistryData::rebuildShardIfExists(const ConnectionString& newConnString, - ShardFactory* factory) { - stdx::unique_lock<Latch> updateConnStringLock(_mutex); - auto it = _rsLookup.find(newConnString.getSetName()); - if (it == _rsLookup.end()) { - return; - } - - _rebuildShard(updateConnStringLock, newConnString, factory); -} - - -void ShardRegistryData::_rebuildShard(WithLock lk, - ConnectionString const& newConnString, - ShardFactory* factory) { - auto it = _rsLookup.find(newConnString.getSetName()); - invariant(it->second); - auto shard = factory->createShard(it->second->getId(), newConnString); - _addShard(lk, shard, true); - if (shard->isConfig()) { - _configShard = shard; - } -} - -void ShardRegistryData::_addShard(WithLock lk, - std::shared_ptr<Shard> const& shard, - bool useOriginalCS) { +void ShardRegistryData::_addShard(std::shared_ptr<Shard> shard, bool useOriginalCS) { const ShardId shardId = shard->getId(); const ConnectionString connString = useOriginalCS ? shard->originalConnString() : shard->getConnString(); - auto currentShard = _findShard(lk, shardId); + auto currentShard = findShard(shardId); if (currentShard) { auto oldConnString = currentShard->originalConnString(); |