diff options
author | Tommaso Tocci <tommaso.tocci@mongodb.com> | 2020-09-16 00:11:16 +0200 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-15 23:32:25 +0000 |
commit | 945ca4f1112f8846e793b40ff47610927b6fc298 (patch) | |
tree | 809ba4107d38f25570b0688aa99c3b1c171df6f3 /src/mongo/s/client | |
parent | f9f7f4832d8e94c233e7281790cbeb9b70129382 (diff) | |
download | mongo-945ca4f1112f8846e793b40ff47610927b6fc298.tar.gz |
Revert "SERVER-46202 Implement ShardRegistry on top of ReadThroughCache"
This reverts commit a8913858697363a26b06996f0821045b550bea27.
Diffstat (limited to 'src/mongo/s/client')
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 684 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 310 |
2 files changed, 330 insertions, 664 deletions
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 842da344bec..e0a5cc13476 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -33,41 +33,50 @@ #include "mongo/s/client/shard_registry.h" +#include <memory> +#include <set> + +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/client.h" #include "mongo/db/logical_time_metadata_hook.h" -#include "mongo/db/vector_clock.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/server_options.h" +#include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/platform/mutex.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_factory.h" #include "mongo/s/grid.h" +#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/str.h" namespace mongo { -namespace { +using executor::NetworkInterface; +using executor::NetworkInterfaceThreadPool; +using executor::TaskExecutor; +using executor::TaskExecutorPool; +using executor::ThreadPoolTaskExecutor; +using CallbackArgs = TaskExecutor::CallbackArgs; +using CallbackHandle = TaskExecutor::CallbackHandle; -const Seconds kRefreshPeriod(30); - -/** - * Whether or not the actual topologyTime should be used. When this is false, the - * topologyTime part of the cache's Time will stay fixed and not advance. - */ -bool useActualTopologyTime() { - return serverGlobalParams.featureCompatibility.isVersionInitialized() && - serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo( - ServerGlobalParams::FeatureCompatibility::Version::kVersion47); -} +namespace { +const Seconds kRefreshPeriod(30); } // namespace -using CallbackArgs = executor::TaskExecutor::CallbackArgs; - const ShardId ShardRegistry::kConfigServerShardId = ShardId("config"); ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, @@ -75,145 +84,154 @@ ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, std::vector<ShardRemovalHook> shardRemovalHooks) : _shardFactory(std::move(shardFactory)), _initConfigServerCS(configServerCS), - _shardRemovalHooks(std::move(shardRemovalHooks)), - _threadPool([] { - ThreadPool::Options options; - options.poolName = "ShardRegistry"; - options.minThreads = 0; - options.maxThreads = 1; - return options; - }()) { + _shardRemovalHooks(std::move(shardRemovalHooks)) { invariant(_initConfigServerCS.isValid()); - _threadPool.startup(); } ShardRegistry::~ShardRegistry() { shutdown(); } -void ShardRegistry::init(ServiceContext* service) { - invariant(!_isInitialized.load()); +void ShardRegistry::shutdown() { + if (_executor && !_isShutdown.load()) { + LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry"); + _executor->shutdown(); + _executor->join(); + _isShutdown.store(true); + } +} - invariant(!_service); - _service = service; +ConnectionString ShardRegistry::getConfigServerConnectionString() const { + return getConfigShard()->getConnString(); +} - auto lookupFn = [this](OperationContext* opCtx, - const Singleton& key, - const Cache::ValueHandle& cachedData, - const Time& timeInStore) { - return _lookup(opCtx, key, cachedData, timeInStore); - }; +StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, + const ShardId& shardId) { + // If we know about the shard, return it. + auto shard = getShardNoReload(shardId); + if (shard) { + return shard; + } - _cache = - std::make_unique<Cache>(_cacheMutex, _service, _threadPool, lookupFn, 1 /* cacheSize */); + // If we can't find the shard, attempt to reload the ShardRegistry. + bool didReload = reload(opCtx); + shard = getShardNoReload(shardId); - { - stdx::lock_guard<Latch> lk(_mutex); - _configShardData = ShardRegistryData::createWithConfigShardOnly( - _shardFactory->createShard(kConfigServerShardId, _initConfigServerCS)); + // If we found the shard, return it. + if (shard) { + return shard; } - _isInitialized.store(true); + // If we did not find the shard but performed the reload + // ourselves, return, because it means the shard does not exist. + if (didReload) { + return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"}; + } + + // If we did not perform the reload ourselves (because there was a concurrent reload), force a + // reload again to ensure that we have seen data at least as up to date as our first reload. + reload(opCtx); + shard = getShardNoReload(shardId); + + if (shard) { + return shard; + } + + return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"}; } -ShardRegistry::Cache::LookupResult ShardRegistry::_lookup(OperationContext* opCtx, - const Singleton& key, - const Cache::ValueHandle& cachedData, - const Time& timeInStore) { - invariant(key == _kSingleton); - invariant(cachedData, "ShardRegistry::_lookup called but the cache is empty"); - - LOGV2_DEBUG(4620250, - 2, - "Starting ShardRegistry::_lookup", - "cachedData"_attr = cachedData->toBSON(), - "cachedData.getTime()"_attr = cachedData.getTime().toBSON(), - "timeInStore"_attr = timeInStore.toBSON()); - - // Check if we need to refresh from the configsvrs. If so, then do that and get the results, - // otherwise (this is a lookup only to incorporate updated connection strings from the RSM), - // then get the equivalent values from the previously cached data. - auto [returnData, - returnTopologyTime, - returnForceReloadIncrement, - removedShards, - fetchedFromConfigServers] = [&]() - -> std::tuple<ShardRegistryData, Timestamp, Increment, ShardRegistryData::ShardMap, bool> { - if (timeInStore.topologyTime > cachedData.getTime().topologyTime || - timeInStore.forceReloadIncrement > cachedData.getTime().forceReloadIncrement) { - auto [reloadedData, maxTopologyTime] = - ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get()); - if (!useActualTopologyTime()) { - // If not using the actual topology time, then just use the topologyTime currently - // in the cache, instead of the maximum topologyTime value from config.shards. This - // is necessary during upgrade/downgrade when topologyTime might not be gossiped by - // all nodes (and so isn't being used). - maxTopologyTime = cachedData.getTime().topologyTime; - } - - auto [mergedData, removedShards] = - ShardRegistryData::mergeExisting(*cachedData, reloadedData); - - return { - mergedData, maxTopologyTime, timeInStore.forceReloadIncrement, removedShards, true}; - } else { - return {*cachedData, - cachedData.getTime().topologyTime, - cachedData.getTime().forceReloadIncrement, - {}, - false}; - } - }(); +std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) { + stdx::lock_guard<Latch> lk(_mutex); + return _data.findShard(shardId); +} - // Always apply the latest conn strings. - auto [latestConnStrings, rsmIncrementForConnStrings] = _getLatestConnStrings(); +std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) { + stdx::lock_guard<Latch> lk(_mutex); + return _data.findByHostAndPort(host); +} - for (const auto& latestConnString : latestConnStrings) { - // TODO SERVER-50909: Optimise by only doing this work if the latest conn string differs. +std::shared_ptr<Shard> ShardRegistry::getConfigShard() const { + stdx::lock_guard<Latch> lk(_mutex); + invariant(_configShard); + return _configShard; +} - auto shard = returnData.findByRSName(latestConnString.first.toString()); - if (!shard) { - continue; +std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { + return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr); +} + +std::shared_ptr<Shard> ShardRegistry::lookupRSName(const std::string& name) const { + stdx::lock_guard<Latch> lk(_mutex); + return _data.findByRSName(name); +} + +void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const { + std::set<ShardId> seen; + { + stdx::lock_guard<Latch> lk(_mutex); + _data.getAllShardIds(seen); + } + all->assign(seen.begin(), seen.end()); +} + +void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) { + getAllShardIdsNoReload(all); + if (all->empty()) { + bool didReload = reload(opCtx); + getAllShardIdsNoReload(all); + // If we didn't do the reload ourselves, we should retry to ensure + // that the reload is actually initiated while we're executing this + if (!didReload && all->empty()) { + reload(opCtx); + getAllShardIdsNoReload(all); } + } +} - auto newData = ShardRegistryData::createFromExisting( - returnData, latestConnString.second, _shardFactory.get()); - returnData = newData; +int ShardRegistry::getNumShards() const { + std::set<ShardId> seen; + { + stdx::lock_guard<Latch> lk(_mutex); + _data.getAllShardIds(seen); } + return seen.size(); +} - // Remove RSMs that are not in the catalog any more. - for (auto& pair : removedShards) { - auto& shardId = pair.first; - auto& shard = pair.second; - invariant(shard); +void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { + invariant(newConnString.type() == ConnectionString::SET || + newConnString.type() == ConnectionString::CUSTOM); // For dbtests - auto name = shard->getConnString().getSetName(); - ReplicaSetMonitor::remove(name); - for (auto& callback : _shardRemovalHooks) { - // Run callbacks asynchronously. - // TODO SERVER-50906: Consider running these callbacks synchronously. - ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()) - .getAsync([=](const Status&) { callback(shardId); }); - } + // to prevent update config shard connection string during init + stdx::unique_lock<Latch> lock(_mutex); + + auto shard = _data.findByRSName(newConnString.getSetName()); + if (!shard) { + return; } - // The registry is "up" once there has been a successful lookup from the config servers. - if (fetchedFromConfigServers) { - _isUp.store(true); + auto [data, updatedShard] = + ShardRegistryData::createFromExisting(_data, newConnString, _shardFactory.get()); + + if (updatedShard && updatedShard->isConfig()) { + _configShard = updatedShard; } - Time returnTime{returnTopologyTime, rsmIncrementForConnStrings, returnForceReloadIncrement}; - LOGV2_DEBUG(4620251, - 2, - "Finished ShardRegistry::_lookup", - "returnData"_attr = returnData.toBSON(), - "returnTime"_attr = returnTime); - return Cache::LookupResult{returnData, returnTime}; + _data = data; } -void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) { - invariant(_isInitialized.load()); - // startupPeriodicReloader() must be called only once +void ShardRegistry::init() { + invariant(!_isInitialized.load()); + { + stdx::unique_lock<Latch> lock(_mutex); + _configShard = + _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS); + _data = ShardRegistryData::createWithConfigShardOnly(_configShard); + } + _isInitialized.store(true); +} + +void ShardRegistry::startup(OperationContext* opCtx) { + // startup() must be called only once invariant(!_executor); auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); @@ -222,17 +240,16 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) { // construct task executor auto net = executor::makeNetworkInterface("ShardRegistryUpdater", nullptr, std::move(hookList)); auto netPtr = net.get(); - _executor = std::make_unique<executor::ThreadPoolTaskExecutor>( - std::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net)); + _executor = std::make_unique<ThreadPoolTaskExecutor>( + std::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); LOGV2_DEBUG(22724, 1, "Starting up task executor for periodic reloading of ShardRegistry"); _executor->startup(); auto status = - _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); }); + _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); }); if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOGV2_DEBUG( - 22725, 1, "Can't schedule Shard Registry reload. Executor shutdown in progress"); + LOGV2_DEBUG(22725, 1, "Cant schedule Shard Registry reload. Executor shutdown in progress"); return; } @@ -244,27 +261,7 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) { } } -void ShardRegistry::shutdownPeriodicReloader() { - if (_executor) { - LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry"); - _executor->shutdown(); - _executor->join(); - _executor.reset(); - } -} - -void ShardRegistry::shutdown() { - shutdownPeriodicReloader(); - - if (!_isShutdown.load()) { - LOGV2_DEBUG(4620235, 1, "Shutting down shard registry"); - _threadPool.shutdown(); - _threadPool.join(); - _isShutdown.store(true); - } -} - -void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) { +void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) { LOGV2_DEBUG(22726, 1, "Reloading shardRegistry"); if (!cbArgs.status.isOK()) { LOGV2_WARNING(22734, @@ -278,26 +275,21 @@ void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) { auto opCtx = tc->makeOperationContext(); - auto refreshPeriod = kRefreshPeriod; - try { reload(opCtx.get()); } catch (const DBException& e) { - if (e.code() == ErrorCodes::ReadConcernMajorityNotAvailableYet) { - refreshPeriod = Seconds(1); - } LOGV2(22727, "Error running periodic reload of shard registry caused by {error}; will retry after " "{shardRegistryReloadInterval}", "Error running periodic reload of shard registry", "error"_attr = redact(e), - "shardRegistryReloadInterval"_attr = refreshPeriod); + "shardRegistryReloadInterval"_attr = kRefreshPeriod); } // reschedule itself auto status = - _executor->scheduleWorkAt(_executor->now() + refreshPeriod, - [this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); }); + _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod, + [this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); }); if (status.getStatus() == ErrorCodes::ShutdownInProgress) { LOGV2_DEBUG( @@ -313,155 +305,85 @@ void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) { } } -ConnectionString ShardRegistry::getConfigServerConnectionString() const { - return getConfigShard()->getConnString(); -} - -std::shared_ptr<Shard> ShardRegistry::getConfigShard() const { - stdx::lock_guard<Latch> lk(_mutex); - return _configShardData.findShard(kConfigServerShardId); +bool ShardRegistry::isUp() const { + return _isUp.load(); } -StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, - const ShardId& shardId) { - // First check if this is a config shard lookup. - { - stdx::lock_guard<Latch> lk(_mutex); - if (auto shard = _configShardData.findShard(shardId)) { - return shard; +bool ShardRegistry::reload(OperationContext* opCtx) { + stdx::unique_lock<Latch> reloadLock(_reloadMutex); + + if (_reloadState == ReloadState::Reloading) { + // Another thread is already in the process of reloading so no need to do duplicate work. + // There is also an issue if multiple threads are allowed to call getAllShards() + // simultaneously because there is no good way to determine which of the threads has the + // more recent version of the data. + try { + opCtx->waitForConditionOrInterrupt( + _inReloadCV, reloadLock, [&] { return _reloadState != ReloadState::Reloading; }); + } catch (const DBException& e) { + LOGV2_DEBUG(22729, + 1, + "Error reloading shard registry caused by {error}", + "Error reloading shard registry", + "error"_attr = redact(e)); + return false; } - } - if (auto shard = _getData(opCtx)->findShard(shardId)) { - return shard; + if (_reloadState == ReloadState::Idle) { + return false; + } + // else proceed to reload since an error occured on the last reload attempt. + invariant(_reloadState == ReloadState::Failed); } - // Reload and try again if the shard was not in the registry - reload(opCtx); - if (auto shard = _getData(opCtx)->findShard(shardId)) { - return shard; - } + _reloadState = ReloadState::Reloading; + reloadLock.unlock(); - return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"}; -} + auto nextReloadState = ReloadState::Failed; -void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) { - std::set<ShardId> seen; - auto data = _getData(opCtx); - data->getAllShardIds(seen); - if (seen.empty()) { - reload(opCtx); - data = _getData(opCtx); - data->getAllShardIds(seen); - } - all->assign(seen.begin(), seen.end()); -} + auto failGuard = makeGuard([&] { + if (!reloadLock.owns_lock()) { + reloadLock.lock(); + } + _reloadState = nextReloadState; + _inReloadCV.notify_all(); + }); -int ShardRegistry::getNumShards(OperationContext* opCtx) { - std::set<ShardId> seen; - auto data = _getData(opCtx); - data->getAllShardIds(seen); - return seen.size(); -} + ShardRegistryData reloadedData = + ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get(), getConfigShard()); -std::pair<std::vector<ShardRegistry::LatestConnStrings::value_type>, ShardRegistry::Increment> -ShardRegistry::_getLatestConnStrings() const { stdx::unique_lock<Latch> lock(_mutex); - return {{_latestConnStrings.begin(), _latestConnStrings.end()}, _rsmIncrement.load()}; -} - -void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { - invariant(newConnString.type() == ConnectionString::SET || - newConnString.type() == ConnectionString::CUSTOM); // For dbtests - - stdx::lock_guard<Latch> lk(_mutex); - if (auto shard = _configShardData.findByRSName(newConnString.getSetName())) { - auto newData = ShardRegistryData::createFromExisting( - _configShardData, newConnString, _shardFactory.get()); - _configShardData = newData; - - } else { - // Stash the new connection string and bump the RSM increment. - _latestConnStrings[newConnString.getSetName()] = newConnString; - auto value = _rsmIncrement.addAndFetch(1); - LOGV2_DEBUG(4620252, - 2, - "ShardRegistry stashed new connection string", - "newConnString"_attr = newConnString, - "newRSMIncrement"_attr = value); - } - - // Schedule a lookup, to incorporate the new connection string. - // TODO SERVER-50910: To avoid needing to use a separate thread to schedule the lookup, make - // _getData() async. - auto status = Grid::get(_service)->getExecutorPool()->getFixedExecutor()->scheduleWork( - [this](const CallbackArgs& cbArgs) { - ThreadClient tc("shard-registry-rsm-reload", _service); - - auto opCtx = tc->makeOperationContext(); - - try { - _getData(opCtx.get()); - } catch (const DBException& e) { - LOGV2(4620201, - "Error running reload of ShardRegistry for RSM update, caused by {error}", - "Error running reload of ShardRegistry for RSM update", - "error"_attr = redact(e)); - } - }); - - if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOGV2_DEBUG( - 4620202, - 1, - "Can't schedule ShardRegistry reload for RSM update, executor shutdown in progress"); - return; - } - if (!status.isOK()) { - LOGV2_FATAL(4620203, - "Error scheduling ShardRegistry reload for RSM update, caused by {error}", - "Error scheduling ShardRegistry reload for RSM update", - "error"_attr = redact(status.getStatus())); - } -} + auto [mergedData, removedShards] = ShardRegistryData::mergeExisting(_data, reloadedData); + _data = std::move(mergedData); -std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { - return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr); -} + lock.unlock(); -bool ShardRegistry::isUp() const { - return _isUp.load(); -} + // Remove RSMs that are not in the catalog any more. + for (auto& pair : removedShards) { + auto& shardId = pair.first; + auto& shard = pair.second; + invariant(shard); -void ShardRegistry::toBSON(BSONObjBuilder* result) const { - BSONObjBuilder map; - BSONObjBuilder hosts; - BSONObjBuilder connStrings; - auto data = _getCachedData(); - data->toBSON(&map, &hosts, &connStrings); - { - stdx::lock_guard<Latch> lk(_mutex); - _configShardData.toBSON(&map, &hosts, &connStrings); + auto name = shard->getConnString().getSetName(); + ReplicaSetMonitor::remove(name); + for (auto& callback : _shardRemovalHooks) { + // Run callbacks asynchronously. + ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()) + .getAsync([=](const Status&) { callback(shardId); }); + } } - result->append("map", map.obj()); - result->append("hosts", hosts.obj()); - result->append("connStrings", connStrings.obj()); -} - -bool ShardRegistry::reload(OperationContext* opCtx) { - // Make the next acquire do a lookup. - auto value = _forceReloadIncrement.addAndFetch(1); - LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value); - - // Force it to actually happen now. - _getData(opCtx); + nextReloadState = ReloadState::Idle; + // first successful reload means that registry is up + _isUp.store(true); return true; } void ShardRegistry::clearEntries() { - _cache->invalidateAll(); + ShardRegistryData empty; + stdx::lock_guard<Latch> lk(_mutex); + _data = empty; } void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext, @@ -471,8 +393,7 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex auto opCtx = tc->makeOperationContext(); auto const grid = Grid::get(opCtx.get()); - std::shared_ptr<Shard> s = - grid->shardRegistry()->_getShardForRSNameNoReload(connStr.getSetName()); + std::shared_ptr<Shard> s = grid->shardRegistry()->lookupRSName(connStr.getSetName()); if (!s) { LOGV2_DEBUG(22730, 1, @@ -506,93 +427,25 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex } } -// Inserts the initial empty ShardRegistryData into the cache, if the cache is empty. -void ShardRegistry::_initializeCacheIfNecessary() const { - if (!_cache->peekLatestCached(_kSingleton)) { - stdx::lock_guard<Latch> lk(_mutex); - if (!_cache->peekLatestCached(_kSingleton)) { - _cache->insertOrAssign(_kSingleton, {}, Date_t::now(), Time()); - } - } -} - -ShardRegistry::Cache::ValueHandle ShardRegistry::_getData(OperationContext* opCtx) { - _initializeCacheIfNecessary(); - - // If the forceReloadIncrement is 0, then we've never done a lookup, so we should be sure to do - // one now. - Increment uninitializedIncrement{0}; - _forceReloadIncrement.compareAndSwap(&uninitializedIncrement, 1); - - // Update the time the cache should be aiming for. - auto now = VectorClock::get(opCtx)->getTime(); - // The topologyTime should be advanced to either the actual topologyTime (if it is being - // gossiped), or else the previously cached topologyTime value (so that this part of the cache's - // time doesn't advance, if topologyTime isn't being gossiped). - Timestamp topologyTime = useActualTopologyTime() - ? now.topologyTime().asTimestamp() - : _cache->peekLatestCached(_kSingleton).getTime().topologyTime; - _cache->advanceTimeInStore( - _kSingleton, Time(topologyTime, _rsmIncrement.load(), _forceReloadIncrement.load())); - - return _cache->acquire(opCtx, _kSingleton, CacheCausalConsistency::kLatestKnown); -} - -// TODO SERVER-50206: Remove usage of these non-causally consistent accessors. - -ShardRegistry::Cache::ValueHandle ShardRegistry::_getCachedData() const { - _initializeCacheIfNecessary(); - return _cache->peekLatestCached(_kSingleton); -} - -std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) const { - // First check if this is a config shard lookup. - { - stdx::lock_guard<Latch> lk(_mutex); - if (auto shard = _configShardData.findShard(shardId)) { - return shard; - } - } - auto data = _getCachedData(); - return data->findShard(shardId); -} - -std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) const { - // First check if this is a config shard lookup. +void ShardRegistry::toBSON(BSONObjBuilder* result) const { + std::vector<std::shared_ptr<Shard>> shards; { stdx::lock_guard<Latch> lk(_mutex); - if (auto shard = _configShardData.findByHostAndPort(host)) { - return shard; - } + _data.getAllShards(shards); } - auto data = _getCachedData(); - return data->findByHostAndPort(host); -} -void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const { - std::set<ShardId> seen; - auto data = _getCachedData(); - data->getAllShardIds(seen); - all->assign(seen.begin(), seen.end()); -} - -int ShardRegistry::getNumShardsNoReload() const { - std::set<ShardId> seen; - auto data = _getCachedData(); - data->getAllShardIds(seen); - return seen.size(); -} + std::sort(std::begin(shards), + std::end(shards), + [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) { + return lhs->getId() < rhs->getId(); + }); -std::shared_ptr<Shard> ShardRegistry::_getShardForRSNameNoReload(const std::string& name) const { - // First check if this is a config shard lookup. - { - stdx::lock_guard<Latch> lk(_mutex); - if (auto shard = _configShardData.findByRSName(name)) { - return shard; - } + BSONObjBuilder mapBob(result->subobjStart("map")); + for (auto&& shard : shards) { + // Intentionally calling getConnString while not holding _mutex + // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote. + mapBob.append(shard->getId(), shard->getConnString().toString()); } - auto data = _getCachedData(); - return data->findByRSName(name); } ////////////// ShardRegistryData ////////////////// @@ -603,8 +456,9 @@ ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr<S return data; } -std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClient( - OperationContext* opCtx, ShardFactory* shardFactory) { +ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* opCtx, + ShardFactory* shardFactory, + std::shared_ptr<Shard> configShard) { auto const catalogClient = Grid::get(opCtx)->catalogClient(); auto readConcern = repl::ReadConcernLevel::kMajorityReadConcern; @@ -635,7 +489,6 @@ std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClie // Do this before re-taking the mutex to avoid deadlock with the ReplicaSetMonitor updating // hosts for a given shard. std::vector<std::tuple<std::string, ConnectionString>> shardsInfo; - Timestamp maxTopologyTime; for (const auto& shardType : shards) { // This validation should ideally go inside the ShardType::validate call. However, doing // it there would prevent us from loading previously faulty shard hosts, which might have @@ -649,15 +502,11 @@ std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClie continue; } - if (auto thisTopologyTime = shardType.getTopologyTime(); - maxTopologyTime < thisTopologyTime) { - maxTopologyTime = thisTopologyTime; - } - shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue())); } ShardRegistryData data; + data._addShard(configShard, true); for (auto& shardInfo : shardsInfo) { if (std::get<0>(shardInfo) == "config") { continue; @@ -668,7 +517,7 @@ std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClie data._addShard(std::move(shard), false); } - return {data, maxTopologyTime}; + return data; } std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mergeExisting( @@ -701,20 +550,21 @@ std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mer return {mergedData, removedShards}; } -ShardRegistryData ShardRegistryData::createFromExisting(const ShardRegistryData& existingData, - const ConnectionString& newConnString, - ShardFactory* shardFactory) { +std::pair<ShardRegistryData, std::shared_ptr<Shard>> ShardRegistryData::createFromExisting( + const ShardRegistryData& existingData, + const ConnectionString& newConnString, + ShardFactory* shardFactory) { ShardRegistryData data(existingData); auto it = data._rsLookup.find(newConnString.getSetName()); if (it == data._rsLookup.end()) { - return data; + return {data, nullptr}; } invariant(it->second); auto updatedShard = shardFactory->createShard(it->second->getId(), newConnString); data._addShard(updatedShard, true); - return data; + return {data, updatedShard}; } std::shared_ptr<Shard> ShardRegistryData::findByRSName(const std::string& name) const { @@ -832,70 +682,4 @@ void ShardRegistryData::_addShard(std::shared_ptr<Shard> shard, bool useOriginal } } -void ShardRegistryData::toBSON(BSONObjBuilder* map, - BSONObjBuilder* hosts, - BSONObjBuilder* connStrings) const { - std::vector<std::shared_ptr<Shard>> shards; - getAllShards(shards); - - std::sort(std::begin(shards), - std::end(shards), - [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) { - return lhs->getId() < rhs->getId(); - }); - - if (map) { - for (auto&& shard : shards) { - map->append(shard->getId(), shard->getConnString().toString()); - } - } - - if (hosts) { - for (const auto& hostIt : _hostLookup) { - hosts->append(hostIt.first.toString(), hostIt.second->getId()); - } - } - - if (connStrings) { - for (const auto& connStringIt : _connStringLookup) { - connStrings->append(connStringIt.first.toString(), connStringIt.second->getId()); - } - } -} - -void ShardRegistryData::toBSON(BSONObjBuilder* result) const { - std::vector<std::shared_ptr<Shard>> shards; - getAllShards(shards); - - std::sort(std::begin(shards), - std::end(shards), - [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) { - return lhs->getId() < rhs->getId(); - }); - - BSONObjBuilder mapBob(result->subobjStart("map")); - for (auto&& shard : shards) { - mapBob.append(shard->getId(), shard->getConnString().toString()); - } - mapBob.done(); - - BSONObjBuilder hostsBob(result->subobjStart("hosts")); - for (const auto& hostIt : _hostLookup) { - hostsBob.append(hostIt.first.toString(), hostIt.second->getId()); - } - hostsBob.done(); - - BSONObjBuilder connStringsBob(result->subobjStart("connStrings")); - for (const auto& connStringIt : _connStringLookup) { - connStringsBob.append(connStringIt.first.toString(), connStringIt.second->getId()); - } - connStringsBob.done(); -} - -BSONObj ShardRegistryData::toBSON() const { - BSONObjBuilder bob; - toBSON(&bob); - return bob.obj(); -} - } // namespace mongo diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 711c8e0d34b..486788c57dd 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -29,24 +29,30 @@ #pragma once +#include <memory> +#include <set> #include <string> #include <vector> -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/client/connection_string.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/server_options.h" +#include "mongo/db/jsobj.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/s/client/shard.h" -#include "mongo/s/client/shard_factory.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/unordered_map.h" -#include "mongo/util/concurrency/thread_pool.h" -#include "mongo/util/net/hostandport.h" -#include "mongo/util/read_through_cache.h" +#include "mongo/util/concurrency/with_lock.h" namespace mongo { +class BSONObjBuilder; +struct HostAndPort; +class NamespaceString; +class OperationContext; +class ServiceContext; +class ShardFactory; +class Shard; +class ShardType; + class ShardRegistryData { public: using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>; @@ -61,8 +67,9 @@ public: /** * Reads shards docs from the catalog client and fills in maps. */ - static std::pair<ShardRegistryData, Timestamp> createFromCatalogClient( - OperationContext* opCtx, ShardFactory* shardFactory); + static ShardRegistryData createFromCatalogClient(OperationContext* opCtx, + ShardFactory* shardFactory, + std::shared_ptr<Shard> configShard); /** * Merges alreadyCachedData and configServerData into a new ShardRegistryData. @@ -86,9 +93,10 @@ public: * Create a duplicate of existingData, but additionally updates the shard for newConnString. * Used when notified by the RSM of a new connection string from a shard. */ - static ShardRegistryData createFromExisting(const ShardRegistryData& existingData, - const ConnectionString& newConnString, - ShardFactory* shardFactory); + static std::pair<ShardRegistryData, std::shared_ptr<Shard>> createFromExisting( + const ShardRegistryData& existingData, + const ConnectionString& newConnString, + ShardFactory* shardFactory); /** * Returns the shard with the given shard id, connection string, or host and port. @@ -120,10 +128,6 @@ public: */ void getAllShards(std::vector<std::shared_ptr<Shard>>& result) const; - void toBSON(BSONObjBuilder* result) const; - void toBSON(BSONObjBuilder* map, BSONObjBuilder* hosts, BSONObjBuilder* connStrings) const; - BSONObj toBSON() const; - private: /** * Returns the shard with the given shard id, or nullptr if no such shard. @@ -194,27 +198,9 @@ public: ~ShardRegistry(); /** - * Initializes ShardRegistry with config shard. Must be called outside c-tor to avoid calls on - * this while its still not fully constructed. - */ - void init(ServiceContext* service); - - /** - * Startup the periodic reloader of the ShardRegistry. - * Can be called only after ShardRegistry::init() + * Starts ReplicaSetMonitor by adding a config shard. */ - void startupPeriodicReloader(OperationContext* opCtx); - - /** - * Shutdown the periodic reloader of the ShardRegistry. - */ - void shutdownPeriodicReloader(); - - /** - * Shuts down the threadPool. Needs to be called explicitly because ShardRegistry is never - * destroyed as it's owned by the static grid object. - */ - void shutdown(); + void startup(OperationContext* opCtx); /** * This is invalid to use on the config server and will hit an invariant if it is done. @@ -226,37 +212,56 @@ public: ConnectionString getConfigServerConnectionString() const; /** - * Returns shared pointer to the shard object representing the config servers. - * - * The config shard is always known, so this function never blocks. + * Reloads the ShardRegistry based on the contents of the config server's config.shards + * collection. Returns true if this call performed a reload and false if this call only waited + * for another thread to perform the reload and did not actually reload. Because of this, it is + * possible that calling reload once may not result in the most up to date view. If strict + * reloading is required, the caller should call this method one more time if the first call + * returned false. */ - std::shared_ptr<Shard> getConfigShard() const; + bool reload(OperationContext* opCtx); + + /** + * Clears all entries from the shard registry entries, which will force the registry to do a + * reload on next access. + */ + void clearEntries(); + + /** + * Takes a connection string describing either a shard or config server replica set, looks + * up the corresponding Shard object based on the replica set name, then updates the + * ShardRegistry's notion of what hosts make up that shard. + */ + void updateReplSetHosts(const ConnectionString& newConnString); /** * Returns a shared pointer to the shard object with the given shard id, or ShardNotFound error * otherwise. * * May refresh the shard registry if there's no cached information about the shard. The shardId - * parameter can actually be the shard name or the HostAndPort for any server in the shard. + * parameter can actually be the shard name or the HostAndPort for any + * server in the shard. */ StatusWith<std::shared_ptr<Shard>> getShard(OperationContext* opCtx, const ShardId& shardId); /** - * Populates all known shard ids into the given vector. + * Returns a shared pointer to the shard object with the given shard id. The shardId parameter + * can actually be the shard name or the HostAndPort for any server in the shard. Will not + * refresh the shard registry or otherwise perform any network traffic. This means that if the + * shard was recently added it may not be found. USE WITH CAUTION. */ - void getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all); + std::shared_ptr<Shard> getShardNoReload(const ShardId& shardId); /** - * Returns the number of shards. + * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not + * refresh the shard registry or otherwise perform any network traffic. */ - int getNumShards(OperationContext* opCtx); + std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost); /** - * Takes a connection string describing either a shard or config server replica set, looks - * up the corresponding Shard object based on the replica set name, then updates the - * ShardRegistry's notion of what hosts make up that shard. + * Returns shared pointer to the shard object representing the config servers. */ - void updateReplSetHosts(const ConnectionString& newConnString); + std::shared_ptr<Shard> getConfigShard() const; /** * Instantiates a new detached shard connection, which does not appear in the list of shards @@ -269,28 +274,36 @@ public: std::unique_ptr<Shard> createConnection(const ConnectionString& connStr) const; /** - * The ShardRegistry is "up" once a successful lookup from the config servers has been - * completed. + * Lookup shard by replica set name. Returns nullptr if the name can't be found. + * Note: this doesn't refresh the table if the name isn't found, so it's possible that a + * newly added shard/Replica Set may not be found. */ - bool isUp() const; + std::shared_ptr<Shard> lookupRSName(const std::string& name) const; + + void getAllShardIdsNoReload(std::vector<ShardId>* all) const; + + /** + * Like getAllShardIdsNoReload(), but does a reload internally in the case that + * getAllShardIdsNoReload() comes back empty + */ + void getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all); + + int getNumShards() const; void toBSON(BSONObjBuilder* result) const; + bool isUp() const; /** - * Reloads the ShardRegistry based on the contents of the config server's config.shards - * collection. Returns true if this call performed a reload and false if this call only waited - * for another thread to perform the reload and did not actually reload. Because of this, it is - * possible that calling reload once may not result in the most up to date view. If strict - * reloading is required, the caller should call this method one more time if the first call - * returned false. + * Initializes ShardRegistry with config shard. Must be called outside c-tor to avoid calls on + * this while its still not fully constructed. */ - bool reload(OperationContext* opCtx); + void init(); /** - * Clears all entries from the shard registry entries, which will force the registry to do a - * reload on next access. + * Shuts down _executor. Needs to be called explicitly because ShardRegistry is never destroyed + * as it's owned by the static grid object. */ - void clearEntries(); + void shutdown(); /** * For use in mongos which needs notifications about changes to shard replset membership to @@ -299,136 +312,8 @@ public: static void updateReplicaSetOnConfigServer(ServiceContext* serviceContex, const ConnectionString& connStr) noexcept; - // TODO SERVER-50206: Remove usage of these non-causally consistent accessors. - // - // Their most important current users are dispatching requests to hosts, and processing - // responses from hosts. These contexts need to know the shard that the host is associated - // with, but usually have no access to any associated opCtx (if there even is one), and also - // cannot tolerate waiting for further network activity (if the cache is stale and needs to be - // refreshed via _lookup()). - - /** - * Returns a shared pointer to the shard object with the given shard id. The shardId parameter - * can actually be the shard name or the HostAndPort for any server in the shard. Will not - * refresh the shard registry or otherwise perform any network traffic. This means that if the - * shard was recently added it may not be found. USE WITH CAUTION. - */ - std::shared_ptr<Shard> getShardNoReload(const ShardId& shardId) const; - - /** - * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not - * refresh the shard registry or otherwise perform any network traffic. - */ - std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost) const; - - void getAllShardIdsNoReload(std::vector<ShardId>* all) const; - - int getNumShardsNoReload() const; - private: - /** - * The ShardRegistry uses the ReadThroughCache to handle refreshing itself. The cache stores - * a single entry, with key of Singleton, value of ShardRegistryData, and causal-consistency - * time which is primarily Timestamp (based on the TopologyTime), but with additional - * "increment"s that are used to flag additional refresh criteria. - */ - - using Increment = int64_t; - - struct Time { - explicit Time() {} - - explicit Time(Timestamp topologyTime, - Increment rsmIncrement, - Increment forceReloadIncrement) - : topologyTime(topologyTime), - rsmIncrement(rsmIncrement), - forceReloadIncrement(forceReloadIncrement) {} - - bool operator==(const Time& other) const { - return topologyTime == other.topologyTime && rsmIncrement == other.rsmIncrement && - forceReloadIncrement == other.forceReloadIncrement; - } - bool operator!=(const Time& other) const { - return !(*this == other); - } - bool operator>(const Time& other) const { - return topologyTime > other.topologyTime || rsmIncrement > other.rsmIncrement || - forceReloadIncrement > other.forceReloadIncrement; - } - bool operator>=(const Time& other) const { - return (*this > other) || (*this == other); - } - bool operator<(const Time& other) const { - return !(*this >= other); - } - bool operator<=(const Time& other) const { - return !(*this > other); - } - - BSONObj toBSON() const { - BSONObjBuilder bob; - bob.append("topologyTime", topologyTime); - bob.append("rsmIncrement", rsmIncrement); - bob.append("forceReloadIncrement", forceReloadIncrement); - return bob.obj(); - } - - Timestamp topologyTime; - - // The increments are used locally to trigger the lookup function. - // - // The rsmIncrement is used to indicate that that there are stashed RSM updates that need to - // be incorporated. - // - // The forceReloadIncrement is used to indicate that the latest data should be fetched from - // the configsvrs (ie. when the topologyTime can't be used for this, eg. in the first - // lookup, and in contexts like unittests where topologyTime isn't gossipped but the - // ShardRegistry still needs to be reloaded). This is how reload() is able to force a - // refresh from the config servers - incrementing the forceReloadIncrement causes the cache - // to call _lookup() (rather than having reload() attempt to do a synchronous refresh). - Increment rsmIncrement{0}; - Increment forceReloadIncrement{0}; - }; - - enum class Singleton { Only }; - static constexpr auto _kSingleton = Singleton::Only; - - using Cache = ReadThroughCache<Singleton, ShardRegistryData, Time>; - - Cache::LookupResult _lookup(OperationContext* opCtx, - const Singleton& key, - const Cache::ValueHandle& cachedData, - const Time& timeInStore); - - /** - * Gets a causally-consistent (ie. latest-known) copy of the ShardRegistryData, refreshing from - * the config servers if necessary. - */ - Cache::ValueHandle _getData(OperationContext* opCtx); - - /** - * Gets the latest-cached copy of the ShardRegistryData. Never fetches from the config servers. - * Only used by the "NoReload" accessors. - * TODO SERVER-50206: Remove usage of this non-causally consistent accessor. - */ - Cache::ValueHandle _getCachedData() const; - - /** - * Lookup shard by replica set name. Returns nullptr if the name can't be found. - * Note: this doesn't refresh the table if the name isn't found, so it's possible that a - * newly added shard/Replica Set may not be found. - * TODO SERVER-50206: Remove usage of this non-causally consistent accessor. - */ - std::shared_ptr<Shard> _getShardForRSNameNoReload(const std::string& name) const; - - using LatestConnStrings = stdx::unordered_map<ShardId, ConnectionString, ShardId::Hasher>; - - std::pair<std::vector<LatestConnStrings::value_type>, Increment> _getLatestConnStrings() const; - - void _initializeCacheIfNecessary() const; - - void _periodicReload(const executor::TaskExecutor::CallbackArgs& cbArgs); + void _internalReload(const executor::TaskExecutor::CallbackArgs& cbArgs); /** * Factory to create shards. Never changed after startup so safe to access outside of _mutex. @@ -441,37 +326,24 @@ private: */ const ConnectionString _initConfigServerCS; + AtomicWord<bool> _isInitialized{false}; + /** * A list of callbacks to be called asynchronously when it has been discovered that a shard was * removed. */ const std::vector<ShardRemovalHook> _shardRemovalHooks; - // Thread pool used when looking up new values for the cache (ie. in which _lookup() runs). - ThreadPool _threadPool; - - // Executor for periodically reloading the registry (ie. in which _periodicReload() runs). - std::unique_ptr<executor::TaskExecutor> _executor{}; - - mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("ShardRegistry::_cacheMutex"); - std::unique_ptr<Cache> _cache; - - // Counters for incrementing the rsmIncrement and forceReloadIncrement fields of the Time used - // by the _cache. See the comments for these fields in the Time class above for an explanation - // of their purpose. - AtomicWord<Increment> _rsmIncrement{0}; - AtomicWord<Increment> _forceReloadIncrement{0}; - - // Protects _configShardData, and _latestNewConnStrings. + // Protects the ShardRegistryData lookup maps in _data, and _configShard. mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardRegistry::_mutex"); - // Store a reference to the configShard. - ShardRegistryData _configShardData; + ShardRegistryData _data; - // The key is replset name (the type is ShardId just to take advantage of its hasher). - LatestConnStrings _latestConnStrings; + // Store a separate reference to the configShard. + std::shared_ptr<Shard> _configShard; - AtomicWord<bool> _isInitialized{false}; + // Executor for reloading. + std::unique_ptr<executor::TaskExecutor> _executor{}; // The ShardRegistry is "up" once there has been a successful refresh. AtomicWord<bool> _isUp{false}; @@ -479,7 +351,17 @@ private: // Set to true in shutdown call to prevent calling it twice. AtomicWord<bool> _isShutdown{false}; - ServiceContext* _service{nullptr}; + // Protects the _reloadState during startup and refresh. + mutable Mutex _reloadMutex = MONGO_MAKE_LATCH("ShardRegistry::_reloadMutex"); + stdx::condition_variable _inReloadCV; + + enum class ReloadState { + Idle, // no other thread is loading data from config server in reload(). + Reloading, // another thread is loading data from the config server in reload(). + Failed, // last call to reload() caused an error when contacting the config server. + }; + + ReloadState _reloadState{ReloadState::Idle}; }; } // namespace mongo |