diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/migration_coordinator.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/s/sessions_collection_config_server.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_mongod_test_fixture.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 684 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 310 | ||||
-rw-r--r-- | src/mongo/s/grid.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/sharding_initialization.cpp | 2 | ||||
-rw-r--r-- | src/mongo/util/invalidating_lru_cache.h | 56 | ||||
-rw-r--r-- | src/mongo/util/read_through_cache.h | 44 |
10 files changed, 787 insertions, 377 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index 22102845062..d72311a3369 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -235,21 +235,32 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* 23899, 2, "Making abort decision durable", "migrationId"_attr = _migrationInfo.getId()); migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId()); - LOGV2_DEBUG( - 23900, - 2, - "Bumping transaction number with lsid {lsid} and current txnNumber {currentTxnNumber} on " - "recipient shard {recipientShardId} for abort of collection {nss}", - "Bumping transaction number on recipient shard for abort", - "namespace"_attr = _migrationInfo.getNss(), - "recipientShardId"_attr = _migrationInfo.getRecipientShardId(), - "lsid"_attr = _migrationInfo.getLsid(), - "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(), - "migrationId"_attr = _migrationInfo.getId()); - migrationutil::advanceTransactionOnRecipient(opCtx, - _migrationInfo.getRecipientShardId(), - _migrationInfo.getLsid(), - _migrationInfo.getTxnNumber()); + try { + LOGV2_DEBUG(23900, + 2, + "Bumping transaction number with lsid {lsid} and current txnNumber " + "{currentTxnNumber} on " + "recipient shard {recipientShardId} for abort of collection {nss}", + "Bumping transaction number on recipient shard for abort", + "namespace"_attr = _migrationInfo.getNss(), + "recipientShardId"_attr = _migrationInfo.getRecipientShardId(), + "lsid"_attr = _migrationInfo.getLsid(), + "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(), + "migrationId"_attr = _migrationInfo.getId()); + migrationutil::advanceTransactionOnRecipient(opCtx, + _migrationInfo.getRecipientShardId(), + _migrationInfo.getLsid(), + _migrationInfo.getTxnNumber()); + } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) { + LOGV2_DEBUG(4620231, + 1, + "Failed to advance transaction number on recipient shard for abort", + "namespace"_attr = _migrationInfo.getNss(), + "migrationId"_attr = _migrationInfo.getId(), + "recipientShardId"_attr = _migrationInfo.getRecipientShardId(), + "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(), + "error"_attr = exShardNotFound); + } hangBeforeSendingAbortDecision.pauseWhileSet(); diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index f1c7cec016a..7d289fb979e 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -679,11 +679,20 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx, opCtx, "ready remote range deletion", [&](OperationContext* newOpCtx) { hangInReadyRangeDeletionOnRecipientInterruptible.pauseWhileSet(newOpCtx); - sendToRecipient( - newOpCtx, - recipientId, - updateOp, - BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + try { + sendToRecipient( + newOpCtx, + recipientId, + updateOp, + BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) { + LOGV2_DEBUG(4620232, + 1, + "Failed to mark range deletion task on recipient shard as ready", + "migrationId"_attr = migrationId, + "error"_attr = exShardNotFound); + return; + } if (hangInReadyRangeDeletionOnRecipientThenSimulateErrorUninterruptible.shouldFail()) { hangInReadyRangeDeletionOnRecipientThenSimulateErrorUninterruptible.pauseWhileSet( diff --git a/src/mongo/db/s/sessions_collection_config_server.cpp b/src/mongo/db/s/sessions_collection_config_server.cpp index 95ee087e5ae..99eb2277d02 100644 --- a/src/mongo/db/s/sessions_collection_config_server.cpp +++ b/src/mongo/db/s/sessions_collection_config_server.cpp @@ -61,7 +61,7 @@ void SessionsCollectionConfigServer::_shardCollectionIfNeeded(OperationContext* uassert(ErrorCodes::ShardNotFound, str::stream() << "Failed to create " << NamespaceString::kLogicalSessionsNamespace << ": cannot create the collection until there are shards", - Grid::get(opCtx)->shardRegistry()->getNumShards() != 0); + Grid::get(opCtx)->shardRegistry()->getNumShardsNoReload() != 0); // First, shard the sessions collection to create it. ConfigsvrShardCollectionRequest shardCollection; diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.cpp b/src/mongo/db/s/sharding_mongod_test_fixture.cpp index a78fea13156..78bd0f861ae 100644 --- a/src/mongo/db/s/sharding_mongod_test_fixture.cpp +++ b/src/mongo/db/s/sharding_mongod_test_fixture.cpp @@ -266,10 +266,6 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest( std::move(executorPoolPtr), _mockNetwork); - // NOTE: ShardRegistry::startup() is not called because it starts a task executor with a - // self-rescheduling task to reload the ShardRegistry over the network. - // grid->shardRegistry()->startup(); - if (grid->catalogClient()) { grid->catalogClient()->startup(); } diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index e0a5cc13476..842da344bec 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -33,50 +33,41 @@ #include "mongo/s/client/shard_registry.h" -#include <memory> -#include <set> - -#include "mongo/bson/bsonobj.h" -#include "mongo/bson/util/bson_extract.h" -#include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/client.h" #include "mongo/db/logical_time_metadata_hook.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/server_options.h" -#include "mongo/executor/network_connection_hook.h" +#include "mongo/db/vector_clock.h" #include "mongo/executor/network_interface_factory.h" #include "mongo/executor/network_interface_thread_pool.h" -#include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" -#include "mongo/platform/mutex.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_shard.h" -#include "mongo/s/client/shard.h" -#include "mongo/s/client/shard_factory.h" #include "mongo/s/grid.h" -#include "mongo/util/concurrency/with_lock.h" -#include "mongo/util/scopeguard.h" #include "mongo/util/str.h" namespace mongo { -using executor::NetworkInterface; -using executor::NetworkInterfaceThreadPool; -using executor::TaskExecutor; -using executor::TaskExecutorPool; -using executor::ThreadPoolTaskExecutor; -using CallbackArgs = TaskExecutor::CallbackArgs; -using CallbackHandle = TaskExecutor::CallbackHandle; - - namespace { + const Seconds kRefreshPeriod(30); + +/** + * Whether or not the actual topologyTime should be used. When this is false, the + * topologyTime part of the cache's Time will stay fixed and not advance. + */ +bool useActualTopologyTime() { + return serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo( + ServerGlobalParams::FeatureCompatibility::Version::kVersion47); +} + } // namespace +using CallbackArgs = executor::TaskExecutor::CallbackArgs; + const ShardId ShardRegistry::kConfigServerShardId = ShardId("config"); ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, @@ -84,154 +75,145 @@ ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory, std::vector<ShardRemovalHook> shardRemovalHooks) : _shardFactory(std::move(shardFactory)), _initConfigServerCS(configServerCS), - _shardRemovalHooks(std::move(shardRemovalHooks)) { + _shardRemovalHooks(std::move(shardRemovalHooks)), + _threadPool([] { + ThreadPool::Options options; + options.poolName = "ShardRegistry"; + options.minThreads = 0; + options.maxThreads = 1; + return options; + }()) { invariant(_initConfigServerCS.isValid()); + _threadPool.startup(); } ShardRegistry::~ShardRegistry() { shutdown(); } -void ShardRegistry::shutdown() { - if (_executor && !_isShutdown.load()) { - LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry"); - _executor->shutdown(); - _executor->join(); - _isShutdown.store(true); - } -} - -ConnectionString ShardRegistry::getConfigServerConnectionString() const { - return getConfigShard()->getConnString(); -} - -StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, - const ShardId& shardId) { - // If we know about the shard, return it. - auto shard = getShardNoReload(shardId); - if (shard) { - return shard; - } - - // If we can't find the shard, attempt to reload the ShardRegistry. - bool didReload = reload(opCtx); - shard = getShardNoReload(shardId); - - // If we found the shard, return it. - if (shard) { - return shard; - } - - // If we did not find the shard but performed the reload - // ourselves, return, because it means the shard does not exist. - if (didReload) { - return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"}; - } - - // If we did not perform the reload ourselves (because there was a concurrent reload), force a - // reload again to ensure that we have seen data at least as up to date as our first reload. - reload(opCtx); - shard = getShardNoReload(shardId); - - if (shard) { - return shard; - } - - return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"}; -} - -std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) { - stdx::lock_guard<Latch> lk(_mutex); - return _data.findShard(shardId); -} - -std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) { - stdx::lock_guard<Latch> lk(_mutex); - return _data.findByHostAndPort(host); -} +void ShardRegistry::init(ServiceContext* service) { + invariant(!_isInitialized.load()); -std::shared_ptr<Shard> ShardRegistry::getConfigShard() const { - stdx::lock_guard<Latch> lk(_mutex); - invariant(_configShard); - return _configShard; -} + invariant(!_service); + _service = service; -std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { - return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr); -} + auto lookupFn = [this](OperationContext* opCtx, + const Singleton& key, + const Cache::ValueHandle& cachedData, + const Time& timeInStore) { + return _lookup(opCtx, key, cachedData, timeInStore); + }; -std::shared_ptr<Shard> ShardRegistry::lookupRSName(const std::string& name) const { - stdx::lock_guard<Latch> lk(_mutex); - return _data.findByRSName(name); -} + _cache = + std::make_unique<Cache>(_cacheMutex, _service, _threadPool, lookupFn, 1 /* cacheSize */); -void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const { - std::set<ShardId> seen; { stdx::lock_guard<Latch> lk(_mutex); - _data.getAllShardIds(seen); + _configShardData = ShardRegistryData::createWithConfigShardOnly( + _shardFactory->createShard(kConfigServerShardId, _initConfigServerCS)); } - all->assign(seen.begin(), seen.end()); + + _isInitialized.store(true); } -void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) { - getAllShardIdsNoReload(all); - if (all->empty()) { - bool didReload = reload(opCtx); - getAllShardIdsNoReload(all); - // If we didn't do the reload ourselves, we should retry to ensure - // that the reload is actually initiated while we're executing this - if (!didReload && all->empty()) { - reload(opCtx); - getAllShardIdsNoReload(all); +ShardRegistry::Cache::LookupResult ShardRegistry::_lookup(OperationContext* opCtx, + const Singleton& key, + const Cache::ValueHandle& cachedData, + const Time& timeInStore) { + invariant(key == _kSingleton); + invariant(cachedData, "ShardRegistry::_lookup called but the cache is empty"); + + LOGV2_DEBUG(4620250, + 2, + "Starting ShardRegistry::_lookup", + "cachedData"_attr = cachedData->toBSON(), + "cachedData.getTime()"_attr = cachedData.getTime().toBSON(), + "timeInStore"_attr = timeInStore.toBSON()); + + // Check if we need to refresh from the configsvrs. If so, then do that and get the results, + // otherwise (this is a lookup only to incorporate updated connection strings from the RSM), + // then get the equivalent values from the previously cached data. + auto [returnData, + returnTopologyTime, + returnForceReloadIncrement, + removedShards, + fetchedFromConfigServers] = [&]() + -> std::tuple<ShardRegistryData, Timestamp, Increment, ShardRegistryData::ShardMap, bool> { + if (timeInStore.topologyTime > cachedData.getTime().topologyTime || + timeInStore.forceReloadIncrement > cachedData.getTime().forceReloadIncrement) { + auto [reloadedData, maxTopologyTime] = + ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get()); + if (!useActualTopologyTime()) { + // If not using the actual topology time, then just use the topologyTime currently + // in the cache, instead of the maximum topologyTime value from config.shards. This + // is necessary during upgrade/downgrade when topologyTime might not be gossiped by + // all nodes (and so isn't being used). + maxTopologyTime = cachedData.getTime().topologyTime; + } + + auto [mergedData, removedShards] = + ShardRegistryData::mergeExisting(*cachedData, reloadedData); + + return { + mergedData, maxTopologyTime, timeInStore.forceReloadIncrement, removedShards, true}; + } else { + return {*cachedData, + cachedData.getTime().topologyTime, + cachedData.getTime().forceReloadIncrement, + {}, + false}; } - } -} + }(); -int ShardRegistry::getNumShards() const { - std::set<ShardId> seen; - { - stdx::lock_guard<Latch> lk(_mutex); - _data.getAllShardIds(seen); - } - return seen.size(); -} + // Always apply the latest conn strings. + auto [latestConnStrings, rsmIncrementForConnStrings] = _getLatestConnStrings(); -void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { - invariant(newConnString.type() == ConnectionString::SET || - newConnString.type() == ConnectionString::CUSTOM); // For dbtests + for (const auto& latestConnString : latestConnStrings) { + // TODO SERVER-50909: Optimise by only doing this work if the latest conn string differs. - // to prevent update config shard connection string during init - stdx::unique_lock<Latch> lock(_mutex); + auto shard = returnData.findByRSName(latestConnString.first.toString()); + if (!shard) { + continue; + } - auto shard = _data.findByRSName(newConnString.getSetName()); - if (!shard) { - return; + auto newData = ShardRegistryData::createFromExisting( + returnData, latestConnString.second, _shardFactory.get()); + returnData = newData; } - auto [data, updatedShard] = - ShardRegistryData::createFromExisting(_data, newConnString, _shardFactory.get()); + // Remove RSMs that are not in the catalog any more. + for (auto& pair : removedShards) { + auto& shardId = pair.first; + auto& shard = pair.second; + invariant(shard); - if (updatedShard && updatedShard->isConfig()) { - _configShard = updatedShard; + auto name = shard->getConnString().getSetName(); + ReplicaSetMonitor::remove(name); + for (auto& callback : _shardRemovalHooks) { + // Run callbacks asynchronously. + // TODO SERVER-50906: Consider running these callbacks synchronously. + ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()) + .getAsync([=](const Status&) { callback(shardId); }); + } } - _data = data; -} - -void ShardRegistry::init() { - invariant(!_isInitialized.load()); - { - stdx::unique_lock<Latch> lock(_mutex); - _configShard = - _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS); - _data = ShardRegistryData::createWithConfigShardOnly(_configShard); + // The registry is "up" once there has been a successful lookup from the config servers. + if (fetchedFromConfigServers) { + _isUp.store(true); } - _isInitialized.store(true); + + Time returnTime{returnTopologyTime, rsmIncrementForConnStrings, returnForceReloadIncrement}; + LOGV2_DEBUG(4620251, + 2, + "Finished ShardRegistry::_lookup", + "returnData"_attr = returnData.toBSON(), + "returnTime"_attr = returnTime); + return Cache::LookupResult{returnData, returnTime}; } -void ShardRegistry::startup(OperationContext* opCtx) { - // startup() must be called only once +void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) { + invariant(_isInitialized.load()); + // startupPeriodicReloader() must be called only once invariant(!_executor); auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); @@ -240,16 +222,17 @@ void ShardRegistry::startup(OperationContext* opCtx) { // construct task executor auto net = executor::makeNetworkInterface("ShardRegistryUpdater", nullptr, std::move(hookList)); auto netPtr = net.get(); - _executor = std::make_unique<ThreadPoolTaskExecutor>( - std::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); + _executor = std::make_unique<executor::ThreadPoolTaskExecutor>( + std::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net)); LOGV2_DEBUG(22724, 1, "Starting up task executor for periodic reloading of ShardRegistry"); _executor->startup(); auto status = - _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); }); + _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); }); if (status.getStatus() == ErrorCodes::ShutdownInProgress) { - LOGV2_DEBUG(22725, 1, "Cant schedule Shard Registry reload. Executor shutdown in progress"); + LOGV2_DEBUG( + 22725, 1, "Can't schedule Shard Registry reload. Executor shutdown in progress"); return; } @@ -261,7 +244,27 @@ void ShardRegistry::startup(OperationContext* opCtx) { } } -void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) { +void ShardRegistry::shutdownPeriodicReloader() { + if (_executor) { + LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry"); + _executor->shutdown(); + _executor->join(); + _executor.reset(); + } +} + +void ShardRegistry::shutdown() { + shutdownPeriodicReloader(); + + if (!_isShutdown.load()) { + LOGV2_DEBUG(4620235, 1, "Shutting down shard registry"); + _threadPool.shutdown(); + _threadPool.join(); + _isShutdown.store(true); + } +} + +void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) { LOGV2_DEBUG(22726, 1, "Reloading shardRegistry"); if (!cbArgs.status.isOK()) { LOGV2_WARNING(22734, @@ -275,21 +278,26 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) { auto opCtx = tc->makeOperationContext(); + auto refreshPeriod = kRefreshPeriod; + try { reload(opCtx.get()); } catch (const DBException& e) { + if (e.code() == ErrorCodes::ReadConcernMajorityNotAvailableYet) { + refreshPeriod = Seconds(1); + } LOGV2(22727, "Error running periodic reload of shard registry caused by {error}; will retry after " "{shardRegistryReloadInterval}", "Error running periodic reload of shard registry", "error"_attr = redact(e), - "shardRegistryReloadInterval"_attr = kRefreshPeriod); + "shardRegistryReloadInterval"_attr = refreshPeriod); } // reschedule itself auto status = - _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod, - [this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); }); + _executor->scheduleWorkAt(_executor->now() + refreshPeriod, + [this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); }); if (status.getStatus() == ErrorCodes::ShutdownInProgress) { LOGV2_DEBUG( @@ -305,85 +313,155 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) { } } -bool ShardRegistry::isUp() const { - return _isUp.load(); +ConnectionString ShardRegistry::getConfigServerConnectionString() const { + return getConfigShard()->getConnString(); } -bool ShardRegistry::reload(OperationContext* opCtx) { - stdx::unique_lock<Latch> reloadLock(_reloadMutex); - - if (_reloadState == ReloadState::Reloading) { - // Another thread is already in the process of reloading so no need to do duplicate work. - // There is also an issue if multiple threads are allowed to call getAllShards() - // simultaneously because there is no good way to determine which of the threads has the - // more recent version of the data. - try { - opCtx->waitForConditionOrInterrupt( - _inReloadCV, reloadLock, [&] { return _reloadState != ReloadState::Reloading; }); - } catch (const DBException& e) { - LOGV2_DEBUG(22729, - 1, - "Error reloading shard registry caused by {error}", - "Error reloading shard registry", - "error"_attr = redact(e)); - return false; - } +std::shared_ptr<Shard> ShardRegistry::getConfigShard() const { + stdx::lock_guard<Latch> lk(_mutex); + return _configShardData.findShard(kConfigServerShardId); +} - if (_reloadState == ReloadState::Idle) { - return false; +StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx, + const ShardId& shardId) { + // First check if this is a config shard lookup. + { + stdx::lock_guard<Latch> lk(_mutex); + if (auto shard = _configShardData.findShard(shardId)) { + return shard; } - // else proceed to reload since an error occured on the last reload attempt. - invariant(_reloadState == ReloadState::Failed); } - _reloadState = ReloadState::Reloading; - reloadLock.unlock(); + if (auto shard = _getData(opCtx)->findShard(shardId)) { + return shard; + } - auto nextReloadState = ReloadState::Failed; + // Reload and try again if the shard was not in the registry + reload(opCtx); + if (auto shard = _getData(opCtx)->findShard(shardId)) { + return shard; + } - auto failGuard = makeGuard([&] { - if (!reloadLock.owns_lock()) { - reloadLock.lock(); - } - _reloadState = nextReloadState; - _inReloadCV.notify_all(); - }); + return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"}; +} - ShardRegistryData reloadedData = - ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get(), getConfigShard()); +void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) { + std::set<ShardId> seen; + auto data = _getData(opCtx); + data->getAllShardIds(seen); + if (seen.empty()) { + reload(opCtx); + data = _getData(opCtx); + data->getAllShardIds(seen); + } + all->assign(seen.begin(), seen.end()); +} +int ShardRegistry::getNumShards(OperationContext* opCtx) { + std::set<ShardId> seen; + auto data = _getData(opCtx); + data->getAllShardIds(seen); + return seen.size(); +} + +std::pair<std::vector<ShardRegistry::LatestConnStrings::value_type>, ShardRegistry::Increment> +ShardRegistry::_getLatestConnStrings() const { stdx::unique_lock<Latch> lock(_mutex); + return {{_latestConnStrings.begin(), _latestConnStrings.end()}, _rsmIncrement.load()}; +} - auto [mergedData, removedShards] = ShardRegistryData::mergeExisting(_data, reloadedData); - _data = std::move(mergedData); +void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) { + invariant(newConnString.type() == ConnectionString::SET || + newConnString.type() == ConnectionString::CUSTOM); // For dbtests - lock.unlock(); + stdx::lock_guard<Latch> lk(_mutex); + if (auto shard = _configShardData.findByRSName(newConnString.getSetName())) { + auto newData = ShardRegistryData::createFromExisting( + _configShardData, newConnString, _shardFactory.get()); + _configShardData = newData; + + } else { + // Stash the new connection string and bump the RSM increment. + _latestConnStrings[newConnString.getSetName()] = newConnString; + auto value = _rsmIncrement.addAndFetch(1); + LOGV2_DEBUG(4620252, + 2, + "ShardRegistry stashed new connection string", + "newConnString"_attr = newConnString, + "newRSMIncrement"_attr = value); + } + + // Schedule a lookup, to incorporate the new connection string. + // TODO SERVER-50910: To avoid needing to use a separate thread to schedule the lookup, make + // _getData() async. + auto status = Grid::get(_service)->getExecutorPool()->getFixedExecutor()->scheduleWork( + [this](const CallbackArgs& cbArgs) { + ThreadClient tc("shard-registry-rsm-reload", _service); + + auto opCtx = tc->makeOperationContext(); + + try { + _getData(opCtx.get()); + } catch (const DBException& e) { + LOGV2(4620201, + "Error running reload of ShardRegistry for RSM update, caused by {error}", + "Error running reload of ShardRegistry for RSM update", + "error"_attr = redact(e)); + } + }); - // Remove RSMs that are not in the catalog any more. - for (auto& pair : removedShards) { - auto& shardId = pair.first; - auto& shard = pair.second; - invariant(shard); + if (status.getStatus() == ErrorCodes::ShutdownInProgress) { + LOGV2_DEBUG( + 4620202, + 1, + "Can't schedule ShardRegistry reload for RSM update, executor shutdown in progress"); + return; + } - auto name = shard->getConnString().getSetName(); - ReplicaSetMonitor::remove(name); - for (auto& callback : _shardRemovalHooks) { - // Run callbacks asynchronously. - ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()) - .getAsync([=](const Status&) { callback(shardId); }); - } + if (!status.isOK()) { + LOGV2_FATAL(4620203, + "Error scheduling ShardRegistry reload for RSM update, caused by {error}", + "Error scheduling ShardRegistry reload for RSM update", + "error"_attr = redact(status.getStatus())); + } +} + +std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const { + return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr); +} + +bool ShardRegistry::isUp() const { + return _isUp.load(); +} + +void ShardRegistry::toBSON(BSONObjBuilder* result) const { + BSONObjBuilder map; + BSONObjBuilder hosts; + BSONObjBuilder connStrings; + auto data = _getCachedData(); + data->toBSON(&map, &hosts, &connStrings); + { + stdx::lock_guard<Latch> lk(_mutex); + _configShardData.toBSON(&map, &hosts, &connStrings); } + result->append("map", map.obj()); + result->append("hosts", hosts.obj()); + result->append("connStrings", connStrings.obj()); +} + +bool ShardRegistry::reload(OperationContext* opCtx) { + // Make the next acquire do a lookup. + auto value = _forceReloadIncrement.addAndFetch(1); + LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value); + + // Force it to actually happen now. + _getData(opCtx); - nextReloadState = ReloadState::Idle; - // first successful reload means that registry is up - _isUp.store(true); return true; } void ShardRegistry::clearEntries() { - ShardRegistryData empty; - stdx::lock_guard<Latch> lk(_mutex); - _data = empty; + _cache->invalidateAll(); } void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext, @@ -393,7 +471,8 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex auto opCtx = tc->makeOperationContext(); auto const grid = Grid::get(opCtx.get()); - std::shared_ptr<Shard> s = grid->shardRegistry()->lookupRSName(connStr.getSetName()); + std::shared_ptr<Shard> s = + grid->shardRegistry()->_getShardForRSNameNoReload(connStr.getSetName()); if (!s) { LOGV2_DEBUG(22730, 1, @@ -427,25 +506,93 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex } } -void ShardRegistry::toBSON(BSONObjBuilder* result) const { - std::vector<std::shared_ptr<Shard>> shards; +// Inserts the initial empty ShardRegistryData into the cache, if the cache is empty. +void ShardRegistry::_initializeCacheIfNecessary() const { + if (!_cache->peekLatestCached(_kSingleton)) { + stdx::lock_guard<Latch> lk(_mutex); + if (!_cache->peekLatestCached(_kSingleton)) { + _cache->insertOrAssign(_kSingleton, {}, Date_t::now(), Time()); + } + } +} + +ShardRegistry::Cache::ValueHandle ShardRegistry::_getData(OperationContext* opCtx) { + _initializeCacheIfNecessary(); + + // If the forceReloadIncrement is 0, then we've never done a lookup, so we should be sure to do + // one now. + Increment uninitializedIncrement{0}; + _forceReloadIncrement.compareAndSwap(&uninitializedIncrement, 1); + + // Update the time the cache should be aiming for. + auto now = VectorClock::get(opCtx)->getTime(); + // The topologyTime should be advanced to either the actual topologyTime (if it is being + // gossiped), or else the previously cached topologyTime value (so that this part of the cache's + // time doesn't advance, if topologyTime isn't being gossiped). + Timestamp topologyTime = useActualTopologyTime() + ? now.topologyTime().asTimestamp() + : _cache->peekLatestCached(_kSingleton).getTime().topologyTime; + _cache->advanceTimeInStore( + _kSingleton, Time(topologyTime, _rsmIncrement.load(), _forceReloadIncrement.load())); + + return _cache->acquire(opCtx, _kSingleton, CacheCausalConsistency::kLatestKnown); +} + +// TODO SERVER-50206: Remove usage of these non-causally consistent accessors. + +ShardRegistry::Cache::ValueHandle ShardRegistry::_getCachedData() const { + _initializeCacheIfNecessary(); + return _cache->peekLatestCached(_kSingleton); +} + +std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) const { + // First check if this is a config shard lookup. { stdx::lock_guard<Latch> lk(_mutex); - _data.getAllShards(shards); + if (auto shard = _configShardData.findShard(shardId)) { + return shard; + } } + auto data = _getCachedData(); + return data->findShard(shardId); +} - std::sort(std::begin(shards), - std::end(shards), - [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) { - return lhs->getId() < rhs->getId(); - }); +std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) const { + // First check if this is a config shard lookup. + { + stdx::lock_guard<Latch> lk(_mutex); + if (auto shard = _configShardData.findByHostAndPort(host)) { + return shard; + } + } + auto data = _getCachedData(); + return data->findByHostAndPort(host); +} - BSONObjBuilder mapBob(result->subobjStart("map")); - for (auto&& shard : shards) { - // Intentionally calling getConnString while not holding _mutex - // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote. - mapBob.append(shard->getId(), shard->getConnString().toString()); +void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const { + std::set<ShardId> seen; + auto data = _getCachedData(); + data->getAllShardIds(seen); + all->assign(seen.begin(), seen.end()); +} + +int ShardRegistry::getNumShardsNoReload() const { + std::set<ShardId> seen; + auto data = _getCachedData(); + data->getAllShardIds(seen); + return seen.size(); +} + +std::shared_ptr<Shard> ShardRegistry::_getShardForRSNameNoReload(const std::string& name) const { + // First check if this is a config shard lookup. + { + stdx::lock_guard<Latch> lk(_mutex); + if (auto shard = _configShardData.findByRSName(name)) { + return shard; + } } + auto data = _getCachedData(); + return data->findByRSName(name); } ////////////// ShardRegistryData ////////////////// @@ -456,9 +603,8 @@ ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr<S return data; } -ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* opCtx, - ShardFactory* shardFactory, - std::shared_ptr<Shard> configShard) { +std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClient( + OperationContext* opCtx, ShardFactory* shardFactory) { auto const catalogClient = Grid::get(opCtx)->catalogClient(); auto readConcern = repl::ReadConcernLevel::kMajorityReadConcern; @@ -489,6 +635,7 @@ ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* o // Do this before re-taking the mutex to avoid deadlock with the ReplicaSetMonitor updating // hosts for a given shard. std::vector<std::tuple<std::string, ConnectionString>> shardsInfo; + Timestamp maxTopologyTime; for (const auto& shardType : shards) { // This validation should ideally go inside the ShardType::validate call. However, doing // it there would prevent us from loading previously faulty shard hosts, which might have @@ -502,11 +649,15 @@ ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* o continue; } + if (auto thisTopologyTime = shardType.getTopologyTime(); + maxTopologyTime < thisTopologyTime) { + maxTopologyTime = thisTopologyTime; + } + shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue())); } ShardRegistryData data; - data._addShard(configShard, true); for (auto& shardInfo : shardsInfo) { if (std::get<0>(shardInfo) == "config") { continue; @@ -517,7 +668,7 @@ ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* o data._addShard(std::move(shard), false); } - return data; + return {data, maxTopologyTime}; } std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mergeExisting( @@ -550,21 +701,20 @@ std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mer return {mergedData, removedShards}; } -std::pair<ShardRegistryData, std::shared_ptr<Shard>> ShardRegistryData::createFromExisting( - const ShardRegistryData& existingData, - const ConnectionString& newConnString, - ShardFactory* shardFactory) { +ShardRegistryData ShardRegistryData::createFromExisting(const ShardRegistryData& existingData, + const ConnectionString& newConnString, + ShardFactory* shardFactory) { ShardRegistryData data(existingData); auto it = data._rsLookup.find(newConnString.getSetName()); if (it == data._rsLookup.end()) { - return {data, nullptr}; + return data; } invariant(it->second); auto updatedShard = shardFactory->createShard(it->second->getId(), newConnString); data._addShard(updatedShard, true); - return {data, updatedShard}; + return data; } std::shared_ptr<Shard> ShardRegistryData::findByRSName(const std::string& name) const { @@ -682,4 +832,70 @@ void ShardRegistryData::_addShard(std::shared_ptr<Shard> shard, bool useOriginal } } +void ShardRegistryData::toBSON(BSONObjBuilder* map, + BSONObjBuilder* hosts, + BSONObjBuilder* connStrings) const { + std::vector<std::shared_ptr<Shard>> shards; + getAllShards(shards); + + std::sort(std::begin(shards), + std::end(shards), + [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) { + return lhs->getId() < rhs->getId(); + }); + + if (map) { + for (auto&& shard : shards) { + map->append(shard->getId(), shard->getConnString().toString()); + } + } + + if (hosts) { + for (const auto& hostIt : _hostLookup) { + hosts->append(hostIt.first.toString(), hostIt.second->getId()); + } + } + + if (connStrings) { + for (const auto& connStringIt : _connStringLookup) { + connStrings->append(connStringIt.first.toString(), connStringIt.second->getId()); + } + } +} + +void ShardRegistryData::toBSON(BSONObjBuilder* result) const { + std::vector<std::shared_ptr<Shard>> shards; + getAllShards(shards); + + std::sort(std::begin(shards), + std::end(shards), + [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) { + return lhs->getId() < rhs->getId(); + }); + + BSONObjBuilder mapBob(result->subobjStart("map")); + for (auto&& shard : shards) { + mapBob.append(shard->getId(), shard->getConnString().toString()); + } + mapBob.done(); + + BSONObjBuilder hostsBob(result->subobjStart("hosts")); + for (const auto& hostIt : _hostLookup) { + hostsBob.append(hostIt.first.toString(), hostIt.second->getId()); + } + hostsBob.done(); + + BSONObjBuilder connStringsBob(result->subobjStart("connStrings")); + for (const auto& connStringIt : _connStringLookup) { + connStringsBob.append(connStringIt.first.toString(), connStringIt.second->getId()); + } + connStringsBob.done(); +} + +BSONObj ShardRegistryData::toBSON() const { + BSONObjBuilder bob; + toBSON(&bob); + return bob.obj(); +} + } // namespace mongo diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 486788c57dd..711c8e0d34b 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -29,30 +29,24 @@ #pragma once -#include <memory> -#include <set> #include <string> #include <vector> -#include "mongo/db/jsobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/client/connection_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/server_options.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/s/client/shard.h" -#include "mongo/stdx/condition_variable.h" +#include "mongo/s/client/shard_factory.h" #include "mongo/stdx/unordered_map.h" -#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/read_through_cache.h" namespace mongo { -class BSONObjBuilder; -struct HostAndPort; -class NamespaceString; -class OperationContext; -class ServiceContext; -class ShardFactory; -class Shard; -class ShardType; - class ShardRegistryData { public: using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>; @@ -67,9 +61,8 @@ public: /** * Reads shards docs from the catalog client and fills in maps. */ - static ShardRegistryData createFromCatalogClient(OperationContext* opCtx, - ShardFactory* shardFactory, - std::shared_ptr<Shard> configShard); + static std::pair<ShardRegistryData, Timestamp> createFromCatalogClient( + OperationContext* opCtx, ShardFactory* shardFactory); /** * Merges alreadyCachedData and configServerData into a new ShardRegistryData. @@ -93,10 +86,9 @@ public: * Create a duplicate of existingData, but additionally updates the shard for newConnString. * Used when notified by the RSM of a new connection string from a shard. */ - static std::pair<ShardRegistryData, std::shared_ptr<Shard>> createFromExisting( - const ShardRegistryData& existingData, - const ConnectionString& newConnString, - ShardFactory* shardFactory); + static ShardRegistryData createFromExisting(const ShardRegistryData& existingData, + const ConnectionString& newConnString, + ShardFactory* shardFactory); /** * Returns the shard with the given shard id, connection string, or host and port. @@ -128,6 +120,10 @@ public: */ void getAllShards(std::vector<std::shared_ptr<Shard>>& result) const; + void toBSON(BSONObjBuilder* result) const; + void toBSON(BSONObjBuilder* map, BSONObjBuilder* hosts, BSONObjBuilder* connStrings) const; + BSONObj toBSON() const; + private: /** * Returns the shard with the given shard id, or nullptr if no such shard. @@ -198,70 +194,69 @@ public: ~ShardRegistry(); /** - * Starts ReplicaSetMonitor by adding a config shard. + * Initializes ShardRegistry with config shard. Must be called outside c-tor to avoid calls on + * this while its still not fully constructed. */ - void startup(OperationContext* opCtx); + void init(ServiceContext* service); /** - * This is invalid to use on the config server and will hit an invariant if it is done. - * If the config server has need of a connection string for itself, it should get it from the - * replication state. - * - * Returns the connection string for the config server. + * Startup the periodic reloader of the ShardRegistry. + * Can be called only after ShardRegistry::init() */ - ConnectionString getConfigServerConnectionString() const; + void startupPeriodicReloader(OperationContext* opCtx); /** - * Reloads the ShardRegistry based on the contents of the config server's config.shards - * collection. Returns true if this call performed a reload and false if this call only waited - * for another thread to perform the reload and did not actually reload. Because of this, it is - * possible that calling reload once may not result in the most up to date view. If strict - * reloading is required, the caller should call this method one more time if the first call - * returned false. + * Shutdown the periodic reloader of the ShardRegistry. */ - bool reload(OperationContext* opCtx); + void shutdownPeriodicReloader(); /** - * Clears all entries from the shard registry entries, which will force the registry to do a - * reload on next access. + * Shuts down the threadPool. Needs to be called explicitly because ShardRegistry is never + * destroyed as it's owned by the static grid object. */ - void clearEntries(); + void shutdown(); /** - * Takes a connection string describing either a shard or config server replica set, looks - * up the corresponding Shard object based on the replica set name, then updates the - * ShardRegistry's notion of what hosts make up that shard. + * This is invalid to use on the config server and will hit an invariant if it is done. + * If the config server has need of a connection string for itself, it should get it from the + * replication state. + * + * Returns the connection string for the config server. */ - void updateReplSetHosts(const ConnectionString& newConnString); + ConnectionString getConfigServerConnectionString() const; + + /** + * Returns shared pointer to the shard object representing the config servers. + * + * The config shard is always known, so this function never blocks. + */ + std::shared_ptr<Shard> getConfigShard() const; /** * Returns a shared pointer to the shard object with the given shard id, or ShardNotFound error * otherwise. * * May refresh the shard registry if there's no cached information about the shard. The shardId - * parameter can actually be the shard name or the HostAndPort for any - * server in the shard. + * parameter can actually be the shard name or the HostAndPort for any server in the shard. */ StatusWith<std::shared_ptr<Shard>> getShard(OperationContext* opCtx, const ShardId& shardId); /** - * Returns a shared pointer to the shard object with the given shard id. The shardId parameter - * can actually be the shard name or the HostAndPort for any server in the shard. Will not - * refresh the shard registry or otherwise perform any network traffic. This means that if the - * shard was recently added it may not be found. USE WITH CAUTION. + * Populates all known shard ids into the given vector. */ - std::shared_ptr<Shard> getShardNoReload(const ShardId& shardId); + void getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all); /** - * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not - * refresh the shard registry or otherwise perform any network traffic. + * Returns the number of shards. */ - std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost); + int getNumShards(OperationContext* opCtx); /** - * Returns shared pointer to the shard object representing the config servers. + * Takes a connection string describing either a shard or config server replica set, looks + * up the corresponding Shard object based on the replica set name, then updates the + * ShardRegistry's notion of what hosts make up that shard. */ - std::shared_ptr<Shard> getConfigShard() const; + void updateReplSetHosts(const ConnectionString& newConnString); /** * Instantiates a new detached shard connection, which does not appear in the list of shards @@ -274,36 +269,28 @@ public: std::unique_ptr<Shard> createConnection(const ConnectionString& connStr) const; /** - * Lookup shard by replica set name. Returns nullptr if the name can't be found. - * Note: this doesn't refresh the table if the name isn't found, so it's possible that a - * newly added shard/Replica Set may not be found. + * The ShardRegistry is "up" once a successful lookup from the config servers has been + * completed. */ - std::shared_ptr<Shard> lookupRSName(const std::string& name) const; - - void getAllShardIdsNoReload(std::vector<ShardId>* all) const; - - /** - * Like getAllShardIdsNoReload(), but does a reload internally in the case that - * getAllShardIdsNoReload() comes back empty - */ - void getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all); - - int getNumShards() const; + bool isUp() const; void toBSON(BSONObjBuilder* result) const; - bool isUp() const; /** - * Initializes ShardRegistry with config shard. Must be called outside c-tor to avoid calls on - * this while its still not fully constructed. + * Reloads the ShardRegistry based on the contents of the config server's config.shards + * collection. Returns true if this call performed a reload and false if this call only waited + * for another thread to perform the reload and did not actually reload. Because of this, it is + * possible that calling reload once may not result in the most up to date view. If strict + * reloading is required, the caller should call this method one more time if the first call + * returned false. */ - void init(); + bool reload(OperationContext* opCtx); /** - * Shuts down _executor. Needs to be called explicitly because ShardRegistry is never destroyed - * as it's owned by the static grid object. + * Clears all entries from the shard registry entries, which will force the registry to do a + * reload on next access. */ - void shutdown(); + void clearEntries(); /** * For use in mongos which needs notifications about changes to shard replset membership to @@ -312,8 +299,136 @@ public: static void updateReplicaSetOnConfigServer(ServiceContext* serviceContex, const ConnectionString& connStr) noexcept; + // TODO SERVER-50206: Remove usage of these non-causally consistent accessors. + // + // Their most important current users are dispatching requests to hosts, and processing + // responses from hosts. These contexts need to know the shard that the host is associated + // with, but usually have no access to any associated opCtx (if there even is one), and also + // cannot tolerate waiting for further network activity (if the cache is stale and needs to be + // refreshed via _lookup()). + + /** + * Returns a shared pointer to the shard object with the given shard id. The shardId parameter + * can actually be the shard name or the HostAndPort for any server in the shard. Will not + * refresh the shard registry or otherwise perform any network traffic. This means that if the + * shard was recently added it may not be found. USE WITH CAUTION. + */ + std::shared_ptr<Shard> getShardNoReload(const ShardId& shardId) const; + + /** + * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not + * refresh the shard registry or otherwise perform any network traffic. + */ + std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost) const; + + void getAllShardIdsNoReload(std::vector<ShardId>* all) const; + + int getNumShardsNoReload() const; + private: - void _internalReload(const executor::TaskExecutor::CallbackArgs& cbArgs); + /** + * The ShardRegistry uses the ReadThroughCache to handle refreshing itself. The cache stores + * a single entry, with key of Singleton, value of ShardRegistryData, and causal-consistency + * time which is primarily Timestamp (based on the TopologyTime), but with additional + * "increment"s that are used to flag additional refresh criteria. + */ + + using Increment = int64_t; + + struct Time { + explicit Time() {} + + explicit Time(Timestamp topologyTime, + Increment rsmIncrement, + Increment forceReloadIncrement) + : topologyTime(topologyTime), + rsmIncrement(rsmIncrement), + forceReloadIncrement(forceReloadIncrement) {} + + bool operator==(const Time& other) const { + return topologyTime == other.topologyTime && rsmIncrement == other.rsmIncrement && + forceReloadIncrement == other.forceReloadIncrement; + } + bool operator!=(const Time& other) const { + return !(*this == other); + } + bool operator>(const Time& other) const { + return topologyTime > other.topologyTime || rsmIncrement > other.rsmIncrement || + forceReloadIncrement > other.forceReloadIncrement; + } + bool operator>=(const Time& other) const { + return (*this > other) || (*this == other); + } + bool operator<(const Time& other) const { + return !(*this >= other); + } + bool operator<=(const Time& other) const { + return !(*this > other); + } + + BSONObj toBSON() const { + BSONObjBuilder bob; + bob.append("topologyTime", topologyTime); + bob.append("rsmIncrement", rsmIncrement); + bob.append("forceReloadIncrement", forceReloadIncrement); + return bob.obj(); + } + + Timestamp topologyTime; + + // The increments are used locally to trigger the lookup function. + // + // The rsmIncrement is used to indicate that that there are stashed RSM updates that need to + // be incorporated. + // + // The forceReloadIncrement is used to indicate that the latest data should be fetched from + // the configsvrs (ie. when the topologyTime can't be used for this, eg. in the first + // lookup, and in contexts like unittests where topologyTime isn't gossipped but the + // ShardRegistry still needs to be reloaded). This is how reload() is able to force a + // refresh from the config servers - incrementing the forceReloadIncrement causes the cache + // to call _lookup() (rather than having reload() attempt to do a synchronous refresh). + Increment rsmIncrement{0}; + Increment forceReloadIncrement{0}; + }; + + enum class Singleton { Only }; + static constexpr auto _kSingleton = Singleton::Only; + + using Cache = ReadThroughCache<Singleton, ShardRegistryData, Time>; + + Cache::LookupResult _lookup(OperationContext* opCtx, + const Singleton& key, + const Cache::ValueHandle& cachedData, + const Time& timeInStore); + + /** + * Gets a causally-consistent (ie. latest-known) copy of the ShardRegistryData, refreshing from + * the config servers if necessary. + */ + Cache::ValueHandle _getData(OperationContext* opCtx); + + /** + * Gets the latest-cached copy of the ShardRegistryData. Never fetches from the config servers. + * Only used by the "NoReload" accessors. + * TODO SERVER-50206: Remove usage of this non-causally consistent accessor. + */ + Cache::ValueHandle _getCachedData() const; + + /** + * Lookup shard by replica set name. Returns nullptr if the name can't be found. + * Note: this doesn't refresh the table if the name isn't found, so it's possible that a + * newly added shard/Replica Set may not be found. + * TODO SERVER-50206: Remove usage of this non-causally consistent accessor. + */ + std::shared_ptr<Shard> _getShardForRSNameNoReload(const std::string& name) const; + + using LatestConnStrings = stdx::unordered_map<ShardId, ConnectionString, ShardId::Hasher>; + + std::pair<std::vector<LatestConnStrings::value_type>, Increment> _getLatestConnStrings() const; + + void _initializeCacheIfNecessary() const; + + void _periodicReload(const executor::TaskExecutor::CallbackArgs& cbArgs); /** * Factory to create shards. Never changed after startup so safe to access outside of _mutex. @@ -326,24 +441,37 @@ private: */ const ConnectionString _initConfigServerCS; - AtomicWord<bool> _isInitialized{false}; - /** * A list of callbacks to be called asynchronously when it has been discovered that a shard was * removed. */ const std::vector<ShardRemovalHook> _shardRemovalHooks; - // Protects the ShardRegistryData lookup maps in _data, and _configShard. + // Thread pool used when looking up new values for the cache (ie. in which _lookup() runs). + ThreadPool _threadPool; + + // Executor for periodically reloading the registry (ie. in which _periodicReload() runs). + std::unique_ptr<executor::TaskExecutor> _executor{}; + + mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("ShardRegistry::_cacheMutex"); + std::unique_ptr<Cache> _cache; + + // Counters for incrementing the rsmIncrement and forceReloadIncrement fields of the Time used + // by the _cache. See the comments for these fields in the Time class above for an explanation + // of their purpose. + AtomicWord<Increment> _rsmIncrement{0}; + AtomicWord<Increment> _forceReloadIncrement{0}; + + // Protects _configShardData, and _latestNewConnStrings. mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardRegistry::_mutex"); - ShardRegistryData _data; + // Store a reference to the configShard. + ShardRegistryData _configShardData; - // Store a separate reference to the configShard. - std::shared_ptr<Shard> _configShard; + // The key is replset name (the type is ShardId just to take advantage of its hasher). + LatestConnStrings _latestConnStrings; - // Executor for reloading. - std::unique_ptr<executor::TaskExecutor> _executor{}; + AtomicWord<bool> _isInitialized{false}; // The ShardRegistry is "up" once there has been a successful refresh. AtomicWord<bool> _isUp{false}; @@ -351,17 +479,7 @@ private: // Set to true in shutdown call to prevent calling it twice. AtomicWord<bool> _isShutdown{false}; - // Protects the _reloadState during startup and refresh. - mutable Mutex _reloadMutex = MONGO_MAKE_LATCH("ShardRegistry::_reloadMutex"); - stdx::condition_variable _inReloadCV; - - enum class ReloadState { - Idle, // no other thread is loading data from config server in reload(). - Reloading, // another thread is loading data from the config server in reload(). - Failed, // last call to reload() caused an error when contacting the config server. - }; - - ReloadState _reloadState{ReloadState::Idle}; + ServiceContext* _service{nullptr}; }; } // namespace mongo diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index c2e3cd011ac..ea28033590e 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -90,7 +90,7 @@ void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient, _executorPool = std::move(executorPool); _network = network; - _shardRegistry->init(); + _shardRegistry->init(grid.owner(this)); } bool Grid::isShardingInitialized() const { diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index e3baf3d6e60..2b73154e767 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -218,7 +218,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx, networkPtr); // The shard registry must be started once the grid is initialized - grid->shardRegistry()->startup(opCtx); + grid->shardRegistry()->startupPeriodicReloader(opCtx); // The catalog client must be started after the shard registry has been started up grid->catalogClient()->startup(); diff --git a/src/mongo/util/invalidating_lru_cache.h b/src/mongo/util/invalidating_lru_cache.h index c8ead4adecc..2852313ef22 100644 --- a/src/mongo/util/invalidating_lru_cache.h +++ b/src/mongo/util/invalidating_lru_cache.h @@ -67,6 +67,18 @@ struct CacheNotCausallyConsistent { }; /** + * Helper for determining if a given type is CacheNotCausallyConsistent or not. + */ +template <typename T> +struct isCausallyConsistentImpl : std::true_type {}; + +template <> +struct isCausallyConsistentImpl<CacheNotCausallyConsistent> : std::false_type {}; + +template <class T> +inline constexpr bool isCausallyConsistent = isCausallyConsistentImpl<T>::value; + +/** * Specifies the desired causal consistency for calls to 'get' (and 'acquire', respectively in the * ReadThroughCache, which is its main consumer). */ @@ -200,12 +212,8 @@ public: // doesn't support pinning items. Their only usage must be in the authorization mananager // for the internal authentication user. explicit ValueHandle(Value&& value) - : _value(std::make_shared<StoredValue>(nullptr, - 0, - boost::none, - std::move(value), - CacheNotCausallyConsistent(), - CacheNotCausallyConsistent())) {} + : _value(std::make_shared<StoredValue>( + nullptr, 0, boost::none, std::move(value), Time(), Time())) {} explicit ValueHandle(Value&& value, const Time& t) : _value( @@ -218,9 +226,15 @@ public: } bool isValid() const { + invariant(bool(*this)); return _value->isValid.loadRelaxed(); } + const Time& getTime() const { + invariant(bool(*this)); + return _value->time; + } + Value* get() { invariant(bool(*this)); return &_value->value; @@ -260,13 +274,16 @@ public: * was called, it will become invalidated. * * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise - * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows - * non-causally-consistent users to not have to pass a second parameter, but would fail - * compilation if causally-consistent users forget to pass it. + * (since the time never changes). */ - void insertOrAssign(const Key& key, - Value&& value, - const Time& time = CacheNotCausallyConsistent()) { + void insertOrAssign(const Key& key, Value&& value) { + MONGO_STATIC_ASSERT_MSG( + !isCausallyConsistent<Time>, + "Time must be passed to insertOrAssign on causally consistent caches"); + insertOrAssign(key, std::move(value), Time()); + } + + void insertOrAssign(const Key& key, Value&& value, const Time& time) { LockGuardWithPostUnlockDestructor guard(_mutex); Time currentTime, currentTimeInStore; _invalidate(&guard, key, _cache.find(key), ¤tTime, ¤tTimeInStore); @@ -307,13 +324,16 @@ public: * destroyed. * * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise - * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows - * non-causally-consistent users to not have to pass a second parameter, but would fail - * compilation if causally-consistent users forget to pass it. + * (since the time never changes). */ - ValueHandle insertOrAssignAndGet(const Key& key, - Value&& value, - const Time& time = CacheNotCausallyConsistent()) { + ValueHandle insertOrAssignAndGet(const Key& key, Value&& value) { + MONGO_STATIC_ASSERT_MSG( + !isCausallyConsistent<Time>, + "Time must be passed to insertOrAssignAndGet on causally consistent caches"); + return insertOrAssignAndGet(key, std::move(value), Time()); + } + + ValueHandle insertOrAssignAndGet(const Key& key, Value&& value, const Time& time) { LockGuardWithPostUnlockDestructor guard(_mutex); Time currentTime, currentTimeInStore; _invalidate(&guard, key, _cache.find(key), ¤tTime, ¤tTimeInStore); diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h index 72b3e7a5771..32efbc576ae 100644 --- a/src/mongo/util/read_through_cache.h +++ b/src/mongo/util/read_through_cache.h @@ -152,6 +152,10 @@ public: return _valueHandle.isValid(); } + const Time& getTime() const { + return _valueHandle.getTime(); + } + Value* get() { return &_valueHandle->value; } @@ -302,6 +306,33 @@ public: /** * Invalidates the given 'key' and immediately replaces it with a new value. + * + * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise + * (since the time never changes). + */ + void insertOrAssign(const Key& key, Value&& newValue, Date_t updateWallClockTime) { + stdx::lock_guard lg(_mutex); + if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) + it->second->invalidateAndCancelCurrentLookupRound(lg); + _cache.insertOrAssign(key, {std::move(newValue), updateWallClockTime}); + } + + void insertOrAssign(const Key& key, + Value&& newValue, + Date_t updateWallClockTime, + const Time& time) { + stdx::lock_guard lg(_mutex); + if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) + it->second->invalidateAndCancelCurrentLookupRound(lg); + _cache.insertOrAssign(key, {std::move(newValue), updateWallClockTime}, time); + } + + /** + * Invalidates the given 'key' and immediately replaces it with a new value, returning a handle + * to the new value. + * + * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise + * (since the time never changes). */ ValueHandle insertOrAssignAndGet(const Key& key, Value&& newValue, Date_t updateWallClockTime) { stdx::lock_guard lg(_mutex); @@ -310,6 +341,16 @@ public: return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime}); } + ValueHandle insertOrAssignAndGet(const Key& key, + Value&& newValue, + Date_t updateWallClockTime, + const Time& time) { + stdx::lock_guard lg(_mutex); + if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end()) + it->second->invalidateAndCancelCurrentLookupRound(lg); + return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime}, time); + } + /** * Indicates to the cache that the backing store has a newer version of 'key', corresponding to * 'newTime'. Subsequent calls to 'acquireAsync' with a causal consistency set to 'LatestKnown' @@ -377,9 +418,8 @@ public: return _cache.getCacheInfo(); } -protected: /** - * ReadThroughCache constructor, to be called by sub-classes, which implement 'lookup'. + * ReadThroughCache constructor. * * The 'mutex' is for the exclusive usage of the ReadThroughCache and must not be used in any * way by the implementing class. Having the mutex stored by the sub-class allows latch |