diff options
-rw-r--r-- | src/mongo/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 329 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 143 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry_data_test.cpp | 92 |
4 files changed, 235 insertions, 331 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 14f85e390a1..70400e57ed0 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -258,7 +258,6 @@ env.Library( '$BUILD_DIR/mongo/db/logical_time_metadata_hook', '$BUILD_DIR/mongo/util/concurrency/thread_pool', '$BUILD_DIR/mongo/executor/task_executor_pool', - '$BUILD_DIR/mongo/util/caching', 'client/shard_interface', 'query/cluster_cursor_manager', 'sharding_routing_table', @@ -585,7 +584,6 @@ env.CppUnitTest( 'chunk_test.cpp', 'chunk_version_test.cpp', 'chunk_writes_tracker_test.cpp', - 'client/shard_registry_data_test.cpp', 'client/shard_remote_test.cpp', 'cluster_identity_loader_test.cpp', 'cluster_last_error_info_test.cpp', diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 61ee14708a1..e0a5cc13476 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -64,12 +64,6 @@ namespace mongo { -using std::set; -using std::shared_ptr; -using std::string; -using std::unique_ptr; -using std::vector; - using executor::NetworkInterface; using executor::NetworkInterfaceThreadPool; using executor::TaskExecutor; @@ -90,18 +84,20 @@ ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, std::vector<ShardRemovalHook> shardRemovalHooks) : _shardFactory(std::move(shardFactory)), _initConfigServerCS(configServerCS), - _shardRemovalHooks(std::move(shardRemovalHooks)) {} + _shardRemovalHooks(std::move(shardRemovalHooks)) { + invariant(_initConfigServerCS.isValid()); +} ShardRegistry::~ShardRegistry() { shutdown(); } void ShardRegistry::shutdown() { - if (_executor && !_isShutdown) { + if (_executor && !_isShutdown.load()) { LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry"); _executor->shutdown(); _executor->join(); - _isShutdown = true; + _isShutdown.store(true); } } @@ -109,17 +105,17 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const { return getConfigShard()->getConnString(); } -StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, - const ShardId& shardId) { +StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, + const ShardId& shardId) { // If we know about the shard, return it. - auto shard = _data.findShard(shardId); + auto shard = getShardNoReload(shardId); if (shard) { return shard; } // If we can't find the shard, attempt to reload the ShardRegistry. bool didReload = reload(opCtx); - shard = _data.findShard(shardId); + shard = getShardNoReload(shardId); // If we found the shard, return it. if (shard) { @@ -135,7 +131,7 @@ StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, // If we did not perform the reload ourselves (because there was a concurrent reload), force a // reload again to ensure that we have seen data at least as up to date as our first reload. reload(opCtx); - shard = _data.findShard(shardId); + shard = getShardNoReload(shardId); if (shard) { return shard; @@ -144,35 +140,41 @@ StatusWith<shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"}; } -shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) { +std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) { + stdx::lock_guard<Latch> lk(_mutex); return _data.findShard(shardId); } -shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) { +std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) { + stdx::lock_guard<Latch> lk(_mutex); return _data.findByHostAndPort(host); } -shared_ptr<Shard> ShardRegistry::getConfigShard() const { - auto shard = _data.getConfigShard(); - invariant(shard); - return shard; +std::shared_ptr<Shard> ShardRegistry::getConfigShard() const { + stdx::lock_guard<Latch> lk(_mutex); + invariant(_configShard); + return _configShard; } -unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { +std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr); } -shared_ptr<Shard> ShardRegistry::lookupRSName(const string& name) const { +std::shared_ptr<Shard> ShardRegistry::lookupRSName(const std::string& name) const { + stdx::lock_guard<Latch> lk(_mutex); return _data.findByRSName(name); } -void ShardRegistry::getAllShardIdsNoReload(vector<ShardId>* all) const { +void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const { std::set<ShardId> seen; - _data.getAllShardIds(seen); + { + stdx::lock_guard<Latch> lk(_mutex); + _data.getAllShardIds(seen); + } all->assign(seen.begin(), seen.end()); } -void ShardRegistry::getAllShardIds(OperationContext* opCtx, vector<ShardId>* all) { +void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) { getAllShardIdsNoReload(all); if (all->empty()) { bool didReload = reload(opCtx); @@ -188,31 +190,44 @@ void ShardRegistry::getAllShardIds(OperationContext* opCtx, vector<ShardId>* all int ShardRegistry::getNumShards() const { std::set<ShardId> seen; - _data.getAllShardIds(seen); + { + stdx::lock_guard<Latch> lk(_mutex); + _data.getAllShardIds(seen); + } return seen.size(); } -void ShardRegistry::toBSON(BSONObjBuilder* result) const { - _data.toBSON(result); -} - void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { invariant(newConnString.type() == ConnectionString::SET || newConnString.type() == ConnectionString::CUSTOM); // For dbtests // to prevent update config shard connection string during init - stdx::unique_lock<Latch> lock(_reloadMutex); - _data.rebuildShardIfExists(newConnString, _shardFactory.get()); + stdx::unique_lock<Latch> lock(_mutex); + + auto shard = _data.findByRSName(newConnString.getSetName()); + if (!shard) { + return; + } + + auto [data, updatedShard] = + ShardRegistryData::createFromExisting(_data, newConnString, _shardFactory.get()); + + if (updatedShard && updatedShard->isConfig()) { + _configShard = updatedShard; + } + + _data = data; } void ShardRegistry::init() { - stdx::unique_lock<Latch> reloadLock(_reloadMutex); - invariant(_initConfigServerCS.isValid()); - auto configShard = - _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS); - _data.addConfigShard(configShard); - // set to invalid so it cant be started more than once. - _initConfigServerCS = ConnectionString(); + invariant(!_isInitialized.load()); + { + stdx::unique_lock<Latch> lock(_mutex); + _configShard = + _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS); + _data = ShardRegistryData::createWithConfigShardOnly(_configShard); + } + _isInitialized.store(true); } void ShardRegistry::startup(OperationContext* opCtx) { @@ -291,8 +306,7 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) { } bool ShardRegistry::isUp() const { - stdx::unique_lock<Latch> reloadLock(_reloadMutex); - return _isUp; + return _isUp.load(); } bool ShardRegistry::reload(OperationContext* opCtx) { @@ -335,17 +349,20 @@ bool ShardRegistry::reload(OperationContext* opCtx) { _inReloadCV.notify_all(); }); - ShardRegistryData currData(opCtx, _shardFactory.get()); - currData.addConfigShard(_data.getConfigShard()); - _data.swapAndMerge(currData); + ShardRegistryData reloadedData = + ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get(), getConfigShard()); - // Remove RSMs that are not in the catalog any more. - std::set<ShardId> removedShardIds; - currData.getAllShardIds(removedShardIds); - _data.shardIdSetDifference(removedShardIds); + stdx::unique_lock<Latch> lock(_mutex); + + auto [mergedData, removedShards] = ShardRegistryData::mergeExisting(_data, reloadedData); + _data = std::move(mergedData); + + lock.unlock(); - for (auto& shardId : removedShardIds) { - auto shard = currData.findByShardId(shardId); + // Remove RSMs that are not in the catalog any more. + for (auto& pair : removedShards) { + auto& shardId = pair.first; + auto& shard = pair.second; invariant(shard); auto name = shard->getConnString().getSetName(); @@ -359,14 +376,14 @@ bool ShardRegistry::reload(OperationContext* opCtx) { nextReloadState = ReloadState::Idle; // first successful reload means that registry is up - _isUp = true; + _isUp.store(true); return true; } void ShardRegistry::clearEntries() { ShardRegistryData empty; - empty.addConfigShard(_data.getConfigShard()); - _data.swap(empty); + stdx::lock_guard<Latch> lk(_mutex); + _data = empty; } void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext, @@ -410,9 +427,38 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex } } +void ShardRegistry::toBSON(BSONObjBuilder* result) const { + std::vector<std::shared_ptr<Shard>> shards; + { + stdx::lock_guard<Latch> lk(_mutex); + _data.getAllShards(shards); + } + + std::sort(std::begin(shards), + std::end(shards), + [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) { + return lhs->getId() < rhs->getId(); + }); + + BSONObjBuilder mapBob(result->subobjStart("map")); + for (auto&& shard : shards) { + // Intentionally calling getConnString while not holding _mutex + // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote. + mapBob.append(shard->getId(), shard->getConnString().toString()); + } +} + ////////////// ShardRegistryData ////////////////// -ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory) { +ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr<Shard> configShard) { + ShardRegistryData data; + data._addShard(configShard, true); + return data; +} + +ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* opCtx, + ShardFactory* shardFactory, + std::shared_ptr<Shard> configShard) { auto const catalogClient = Grid::get(opCtx)->catalogClient(); auto readConcern = repl::ReadConcernLevel::kMajorityReadConcern; @@ -459,6 +505,8 @@ ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shar shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue())); } + ShardRegistryData data; + data._addShard(configShard, true); for (auto& shardInfo : shardsInfo) { if (std::get<0>(shardInfo) == "config") { continue; @@ -467,98 +515,88 @@ ShardRegistryData::ShardRegistryData(OperationContext* opCtx, ShardFactory* shar auto shard = shardFactory->createShard(std::move(std::get<0>(shardInfo)), std::move(std::get<1>(shardInfo))); - _addShard(WithLock::withoutLock(), std::move(shard), false); + data._addShard(std::move(shard), false); } + return data; } -void ShardRegistryData::swap(ShardRegistryData& other) { - stdx::lock_guard<Latch> lk(_mutex); - _shardIdLookup.swap(other._shardIdLookup); - _rsLookup.swap(other._rsLookup); - _hostLookup.swap(other._hostLookup); - _connStringLookup.swap(other._connStringLookup); - _configShard.swap(other._configShard); -} +std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mergeExisting( + const ShardRegistryData& alreadyCachedData, const ShardRegistryData& configServerData) { + ShardRegistryData mergedData(configServerData); -void ShardRegistryData::swapAndMerge(ShardRegistryData& other) { - stdx::lock_guard<Latch> lk(_mutex); - _rsLookup.swap(other._rsLookup); - _configShard.swap(other._configShard); - _shardIdLookup.swap(other._shardIdLookup); - - for (auto it = other._connStringLookup.begin(); it != other._connStringLookup.end(); ++it) { - auto res = _connStringLookup.find(it->first); - if (res == _connStringLookup.end()) { - _connStringLookup[it->first] = it->second; - } + // For connstrings and hosts, prefer values from alreadyCachedData to whatever might have been + // fetched from the configsvrs. + for (auto it = alreadyCachedData._connStringLookup.begin(); + it != alreadyCachedData._connStringLookup.end(); + ++it) { + mergedData._connStringLookup[it->first] = it->second; + } + for (auto it = alreadyCachedData._hostLookup.begin(); it != alreadyCachedData._hostLookup.end(); + ++it) { + mergedData._hostLookup[it->first] = it->second; } - for (auto it = other._hostLookup.begin(); it != other._hostLookup.end(); ++it) { - auto res = _hostLookup.find(it->first); - if (res == _hostLookup.end()) { - _hostLookup[it->first] = it->second; + // Find the shards that are no longer present. + ShardMap removedShards; + for (auto i = alreadyCachedData._shardIdLookup.begin(); + i != alreadyCachedData._shardIdLookup.end(); + ++i) { + invariant(i->second); + if (mergedData._shardIdLookup.find(i->second->getId()) == mergedData._shardIdLookup.end()) { + removedShards[i->second->getId()] = i->second; } } -} -shared_ptr<Shard> ShardRegistryData::getConfigShard() const { - stdx::lock_guard<Latch> lk(_mutex); - return _configShard; + return {mergedData, removedShards}; } -void ShardRegistryData::addConfigShard(std::shared_ptr<Shard> shard) { - stdx::lock_guard<Latch> lk(_mutex); - _configShard = shard; - _addShard(lk, shard, true); -} +std::pair<ShardRegistryData, std::shared_ptr<Shard>> ShardRegistryData::createFromExisting( + const ShardRegistryData& existingData, + const ConnectionString& newConnString, + ShardFactory* shardFactory) { + ShardRegistryData data(existingData); -shared_ptr<Shard> ShardRegistryData::findByRSName(const string& name) const { - stdx::lock_guard<Latch> lk(_mutex); - auto i = _rsLookup.find(name); - return (i != _rsLookup.end()) ? i->second : nullptr; -} + auto it = data._rsLookup.find(newConnString.getSetName()); + if (it == data._rsLookup.end()) { + return {data, nullptr}; + } + invariant(it->second); + auto updatedShard = shardFactory->createShard(it->second->getId(), newConnString); + data._addShard(updatedShard, true); -shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAndPort) const { - stdx::lock_guard<Latch> lk(_mutex); - return _findByHostAndPort(lk, hostAndPort); + return {data, updatedShard}; } -shared_ptr<Shard> ShardRegistryData::findByShardId(const ShardId& shardId) const { - stdx::lock_guard<Latch> lk(_mutex); - return _findByShardId(lk, shardId); +std::shared_ptr<Shard> ShardRegistryData::findByRSName(const std::string& name) const { + auto i = _rsLookup.find(name); + return (i != _rsLookup.end()) ? i->second : nullptr; } -shared_ptr<Shard> ShardRegistryData::_findByConnectionString( - WithLock, const ConnectionString& connectionString) const { +std::shared_ptr<Shard> ShardRegistryData::_findByConnectionString( + const ConnectionString& connectionString) const { auto i = _connStringLookup.find(connectionString); return (i != _connStringLookup.end()) ? i->second : nullptr; } -shared_ptr<Shard> ShardRegistryData::_findByHostAndPort(WithLock, - const HostAndPort& hostAndPort) const { +std::shared_ptr<Shard> ShardRegistryData::findByHostAndPort(const HostAndPort& hostAndPort) const { auto i = _hostLookup.find(hostAndPort); return (i != _hostLookup.end()) ? i->second : nullptr; } -shared_ptr<Shard> ShardRegistryData::_findByShardId(WithLock, ShardId const& shardId) const { +std::shared_ptr<Shard> ShardRegistryData::_findByShardId(const ShardId& shardId) const { auto i = _shardIdLookup.find(shardId); return (i != _shardIdLookup.end()) ? i->second : nullptr; } -shared_ptr<Shard> ShardRegistryData::findShard(ShardId const& shardId) const { - stdx::lock_guard<Latch> lk(_mutex); - return _findShard(lk, shardId); -} - -shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shardId) const { - auto shard = _findByShardId(lk, shardId); +std::shared_ptr<Shard> ShardRegistryData::findShard(const ShardId& shardId) const { + auto shard = _findByShardId(shardId); if (shard) { return shard; } StatusWith<ConnectionString> swConnString = ConnectionString::parse(shardId.toString()); if (swConnString.isOK()) { - shard = _findByConnectionString(lk, swConnString.getValue()); + shard = _findByConnectionString(swConnString.getValue()); if (shard) { return shard; } @@ -566,7 +604,7 @@ shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shar StatusWith<HostAndPort> swHostAndPort = HostAndPort::parse(shardId.toString()); if (swHostAndPort.isOK()) { - shard = _findByHostAndPort(lk, swHostAndPort.getValue()); + shard = findByHostAndPort(swHostAndPort.getValue()); if (shard) { return shard; } @@ -575,32 +613,14 @@ shared_ptr<Shard> ShardRegistryData::_findShard(WithLock lk, ShardId const& shar return nullptr; } -void ShardRegistryData::toBSON(BSONObjBuilder* result) const { - std::vector<std::shared_ptr<Shard>> shards; - { - stdx::lock_guard<Latch> lk(_mutex); - shards.reserve(_shardIdLookup.size()); - for (auto&& shard : _shardIdLookup) { - shards.emplace_back(shard.second); - } - } - - std::sort(std::begin(shards), - std::end(shards), - [](const std::shared_ptr<Shard>& lhs, const std::shared_ptr<Shard>& rhs) { - return lhs->getId() < rhs->getId(); - }); - - BSONObjBuilder mapBob(result->subobjStart("map")); - for (auto&& shard : shards) { - // Intentionally calling getConnString while not holding ShardRegistryData::_mutex - // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote. - mapBob.append(shard->getId(), shard->getConnString().toString()); +void ShardRegistryData::getAllShards(std::vector<std::shared_ptr<Shard>>& result) const { + result.reserve(_shardIdLookup.size()); + for (auto&& shard : _shardIdLookup) { + result.emplace_back(shard.second); } } void ShardRegistryData::getAllShardIds(std::set<ShardId>& seen) const { - stdx::lock_guard<Latch> lk(_mutex); for (auto i = _shardIdLookup.begin(); i != _shardIdLookup.end(); ++i) { const auto& s = i->second; if (s->getId().toString() == "config") { @@ -610,50 +630,13 @@ void ShardRegistryData::getAllShardIds(std::set<ShardId>& seen) const { } } -void ShardRegistryData::shardIdSetDifference(std::set<ShardId>& diff) const { - stdx::lock_guard<Latch> lk(_mutex); - for (auto i = _shardIdLookup.begin(); i != _shardIdLookup.end(); ++i) { - invariant(i->second); - auto res = diff.find(i->second->getId()); - if (res != diff.end()) { - diff.erase(res); - } - } -} - -void ShardRegistryData::rebuildShardIfExists(const ConnectionString& newConnString, - ShardFactory* factory) { - stdx::unique_lock<Latch> updateConnStringLock(_mutex); - auto it = _rsLookup.find(newConnString.getSetName()); - if (it == _rsLookup.end()) { - return; - } - - _rebuildShard(updateConnStringLock, newConnString, factory); -} - - -void ShardRegistryData::_rebuildShard(WithLock lk, - ConnectionString const& newConnString, - ShardFactory* factory) { - auto it = _rsLookup.find(newConnString.getSetName()); - invariant(it->second); - auto shard = factory->createShard(it->second->getId(), newConnString); - _addShard(lk, shard, true); - if (shard->isConfig()) { - _configShard = shard; - } -} - -void ShardRegistryData::_addShard(WithLock lk, - std::shared_ptr<Shard> const& shard, - bool useOriginalCS) { +void ShardRegistryData::_addShard(std::shared_ptr<Shard> shard, bool useOriginalCS) { const ShardId shardId = shard->getId(); const ConnectionString connString = useOriginalCS ? shard->originalConnString() : shard->getConnString(); - auto currentShard = _findShard(lk, shardId); + auto currentShard = findShard(shardId); if (currentShard) { auto oldConnString = currentShard->originalConnString(); diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index c998a897d4f..4f68d1a8c4c 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -55,92 +55,98 @@ class ShardType; class ShardRegistryData { public: + using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>; + /** - * Reads shards docs from the catalog client and fills in maps. + * Creates a basic ShardRegistryData, that only contains the config shard. Needed during + * initialization, when the config servers are contacted for the first time (ie. the first time + * createFromCatalogClient() is called). */ - ShardRegistryData(OperationContext* opCtx, ShardFactory* shardFactory); - ShardRegistryData() = default; - ~ShardRegistryData() = default; + static ShardRegistryData createWithConfigShardOnly(std::shared_ptr<Shard> configShard); - void swap(ShardRegistryData& other); + /** + * Reads shards docs from the catalog client and fills in maps. + */ + static ShardRegistryData createFromCatalogClient(OperationContext* opCtx, + ShardFactory* shardFactory, + std::shared_ptr<Shard> configShard); - /* - * Swaps _shardIdLookup, _rsLookup, and _configShard with other. Merges _hostLookup and - * _connStringLookup without overwriting existing entries in either map. + /** + * Merges alreadyCachedData and configServerData into a new ShardRegistryData. + * + * The merged data is the same as configServerData, except that for the host and connection + * string based lookups, any values from alreadyCachedData will take precedence over those from + * configServerData. + * + * Returns the merged data, as well as the shards that have been removed (ie. that are present + * in alreadyCachedData but not configServerData) as a mapping from ShardId to + * std::shared_ptr<Shard>. * * Called when reloading the shard registry. It is important to merge _hostLookup because * reloading the shard registry can interleave with updates to the shard registry passed by the * RSM. */ - void swapAndMerge(ShardRegistryData& other); + static std::pair<ShardRegistryData, ShardMap> mergeExisting( + const ShardRegistryData& alreadyCachedData, const ShardRegistryData& configServerData); + + /** + * Create a duplicate of existingData, but additionally updates the shard for newConnString. + * Used when notified by the RSM of a new connection string from a shard. + */ + static std::pair<ShardRegistryData, std::shared_ptr<Shard>> createFromExisting( + const ShardRegistryData& existingData, + const ConnectionString& newConnString, + ShardFactory* shardFactory); /** - * Returns a shared pointer the shard object with the given shard id. + * Returns the shard with the given shard id, connection string, or host and port. * * Callers might pass in the connection string or HostAndPort rather than ShardId, so this * method will first look for the shard by ShardId, then connection string, then HostAndPort * stopping once it finds the shard. */ - std::shared_ptr<Shard> findShard(ShardId const& shardId) const; + std::shared_ptr<Shard> findShard(const ShardId& shardId) const; /** - * Lookup shard by replica set name. Returns nullptr if the name can't be found. + * Returns the shard with the given replica set name, or nullptr if no such shard. */ - std::shared_ptr<Shard> findByRSName(const std::string& rsName) const; + std::shared_ptr<Shard> findByRSName(const std::string& name) const; /** - * Returns a shared pointer to the shard object with the given shard id. - */ - std::shared_ptr<Shard> findByShardId(const ShardId&) const; - - /** - * Finds the shard that the mongod listening at this HostAndPort is a member of. + * Returns the shard which contains a mongod with the given host and port, or nullptr if no such + * shard. */ std::shared_ptr<Shard> findByHostAndPort(const HostAndPort&) const; /** - * Returns config shard. + * Returns the set of all known shard ids. */ - std::shared_ptr<Shard> getConfigShard() const; + void getAllShardIds(std::set<ShardId>& result) const; /** - * Adds config shard. + * Returns the set of all known shard objects. */ - void addConfigShard(std::shared_ptr<Shard>); - - void getAllShardIds(std::set<ShardId>& result) const; + void getAllShards(std::vector<std::shared_ptr<Shard>>& result) const; +private: /** - * Erases known by this shardIds from the diff argument. + * Returns the shard with the given shard id, or nullptr if no such shard. */ - void shardIdSetDifference(std::set<ShardId>& diff) const; - void toBSON(BSONObjBuilder* result) const; + std::shared_ptr<Shard> _findByShardId(const ShardId&) const; + /** - * If the shard with same replica set name as in the newConnString already exists then replace - * it with the shard built for the newConnString. + * Returns the shard with the given connection string, or nullptr if no such shard. */ - void rebuildShardIfExists(const ConnectionString& newConnString, ShardFactory* factory); + std::shared_ptr<Shard> _findByConnectionString(const ConnectionString& connectionString) const; -private: /** - * Creates a shard based on the specified information and puts it into the lookup maps. - * if useOriginalCS = true it will use the ConnectionSring used for shard creation to update + * Puts the given shard object into the lookup maps. + * + * If useOriginalCS = true it will use the ConnectionSring used for shard creation to update * lookup maps. Otherwise the current connection string from the Shard's RemoteCommandTargeter - * will be used. + * will be used. Only called during ShardRegistryData construction. */ - void _addShard(WithLock, std::shared_ptr<Shard> const&, bool useOriginalCS); - auto _findByShardId(WithLock, ShardId const&) const -> std::shared_ptr<Shard>; - auto _findByHostAndPort(WithLock, const HostAndPort& hostAndPort) const - -> std::shared_ptr<Shard>; - auto _findByConnectionString(WithLock, const ConnectionString& connectionString) const - -> std::shared_ptr<Shard>; - auto _findShard(WithLock lk, ShardId const& shardId) const -> std::shared_ptr<Shard>; - void _rebuildShard(WithLock, ConnectionString const& newConnString, ShardFactory* factory); - - // Protects the lookup maps below. - mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardRegistryData::_mutex"); - - using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>; + void _addShard(std::shared_ptr<Shard>, bool useOriginalCS); // Map of shardName -> Shard ShardMap _shardIdLookup; @@ -153,9 +159,6 @@ private: // Map of connection string to Shard std::map<ConnectionString, std::shared_ptr<Shard>> _connStringLookup; - - // store configShard separately to always have a reference - std::shared_ptr<Shard> _configShard; }; /** @@ -193,6 +196,7 @@ public: std::vector<ShardRemovalHook> shardRemovalHooks = {}); ~ShardRegistry(); + /** * Starts ReplicaSetMonitor by adding a config shard. */ @@ -309,27 +313,45 @@ public: const ConnectionString& connStr) noexcept; private: + void _internalReload(const executor::TaskExecutor::CallbackArgs& cbArgs); + /** * Factory to create shards. Never changed after startup so safe to access outside of _mutex. */ const std::unique_ptr<ShardFactory> _shardFactory; /** - * Specified in the ShardRegistry c-tor. It's used only in startup() to initialize the config - * shard + * Specified in the ShardRegistry c-tor. It's used only in init() to initialize the config + * shard. */ - ConnectionString _initConfigServerCS; + const ConnectionString _initConfigServerCS; + + AtomicWord<bool> _isInitialized{false}; /** * A list of callbacks to be called asynchronously when it has been discovered that a shard was * removed. */ - std::vector<ShardRemovalHook> _shardRemovalHooks; + const std::vector<ShardRemovalHook> _shardRemovalHooks; + + // Protects the ShardRegistryData lookup maps in _data, and _configShard. + mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardRegistry::_mutex"); - void _internalReload(const executor::TaskExecutor::CallbackArgs& cbArgs); ShardRegistryData _data; - // Protects the _reloadState and _initConfigServerCS during startup. + // Store a separate reference to the configShard. + std::shared_ptr<Shard> _configShard; + + // Executor for reloading. + std::unique_ptr<executor::TaskExecutor> _executor{}; + + // The ShardRegistry is "up" once there has been a successful refresh. + AtomicWord<bool> _isUp{false}; + + // Set to true in shutdown call to prevent calling it twice. + AtomicWord<bool> _isShutdown{false}; + + // Protects the _reloadState during startup and refresh. mutable Mutex _reloadMutex = MONGO_MAKE_LATCH("ShardRegistry::_reloadMutex"); stdx::condition_variable _inReloadCV; @@ -340,13 +362,6 @@ private: }; ReloadState _reloadState{ReloadState::Idle}; - bool _isUp{false}; - - // Executor for reloading. - std::unique_ptr<executor::TaskExecutor> _executor{}; - - // Set to true in shutdown call to prevent calling it twice. - bool _isShutdown{false}; }; } // namespace mongo diff --git a/src/mongo/s/client/shard_registry_data_test.cpp b/src/mongo/s/client/shard_registry_data_test.cpp deleted file mode 100644 index 2fcfed25e63..00000000000 --- a/src/mongo/s/client/shard_registry_data_test.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/bson/json.h" -#include "mongo/client/remote_command_targeter_factory_mock.h" -#include "mongo/client/remote_command_targeter_mock.h" -#include "mongo/s/client/shard_factory.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/client/shard_remote.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/time_support.h" - -namespace mongo { -namespace { - -class ShardRegistryDataTest : public mongo::unittest::Test { -public: - ShardFactory* shardFactory() { - return _shardFactory.get(); - } - -private: - void setUp() override { - auto targeterFactory = std::make_unique<RemoteCommandTargeterFactoryMock>(); - auto targeterFactoryPtr = targeterFactory.get(); - - ShardFactory::BuilderCallable setBuilder = - [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) { - return std::make_unique<ShardRemote>( - shardId, connStr, targeterFactoryPtr->create(connStr)); - }; - - ShardFactory::BuilderCallable masterBuilder = - [targeterFactoryPtr](const ShardId& shardId, const ConnectionString& connStr) { - return std::make_unique<ShardRemote>( - shardId, connStr, targeterFactoryPtr->create(connStr)); - }; - - ShardFactory::BuildersMap buildersMap{ - {ConnectionString::SET, std::move(setBuilder)}, - {ConnectionString::MASTER, std::move(masterBuilder)}, - }; - - _shardFactory = - std::make_unique<ShardFactory>(std::move(buildersMap), std::move(targeterFactory)); - } - - void tearDown() override {} - - std::unique_ptr<ShardFactory> _shardFactory; -}; - -TEST_F(ShardRegistryDataTest, AddConfigShard) { - ConnectionString configCS("rs/dummy1:1234,dummy2:2345,dummy3:3456", ConnectionString::SET); - auto configShard = shardFactory()->createShard(ShardRegistry::kConfigServerShardId, configCS); - - ShardRegistryData data; - data.addConfigShard(configShard); - - ASSERT_EQUALS(configCS.toString(), data.getConfigShard()->originalConnString().toString()); -} - -} // namespace -} // namespace mongo |