summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp41
-rw-r--r--src/mongo/db/s/migration_util.cpp19
-rw-r--r--src/mongo/db/s/sessions_collection_config_server.cpp2
-rw-r--r--src/mongo/db/s/sharding_mongod_test_fixture.cpp4
-rw-r--r--src/mongo/s/client/shard_registry.cpp684
-rw-r--r--src/mongo/s/client/shard_registry.h310
-rw-r--r--src/mongo/s/grid.cpp2
-rw-r--r--src/mongo/s/sharding_initialization.cpp2
-rw-r--r--src/mongo/util/invalidating_lru_cache.h56
-rw-r--r--src/mongo/util/read_through_cache.h44
10 files changed, 787 insertions, 377 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index 22102845062..d72311a3369 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -235,21 +235,32 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
23899, 2, "Making abort decision durable", "migrationId"_attr = _migrationInfo.getId());
migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId());
- LOGV2_DEBUG(
- 23900,
- 2,
- "Bumping transaction number with lsid {lsid} and current txnNumber {currentTxnNumber} on "
- "recipient shard {recipientShardId} for abort of collection {nss}",
- "Bumping transaction number on recipient shard for abort",
- "namespace"_attr = _migrationInfo.getNss(),
- "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
- "lsid"_attr = _migrationInfo.getLsid(),
- "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
- "migrationId"_attr = _migrationInfo.getId());
- migrationutil::advanceTransactionOnRecipient(opCtx,
- _migrationInfo.getRecipientShardId(),
- _migrationInfo.getLsid(),
- _migrationInfo.getTxnNumber());
+ try {
+ LOGV2_DEBUG(23900,
+ 2,
+ "Bumping transaction number with lsid {lsid} and current txnNumber "
+ "{currentTxnNumber} on "
+ "recipient shard {recipientShardId} for abort of collection {nss}",
+ "Bumping transaction number on recipient shard for abort",
+ "namespace"_attr = _migrationInfo.getNss(),
+ "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
+ "lsid"_attr = _migrationInfo.getLsid(),
+ "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
+ "migrationId"_attr = _migrationInfo.getId());
+ migrationutil::advanceTransactionOnRecipient(opCtx,
+ _migrationInfo.getRecipientShardId(),
+ _migrationInfo.getLsid(),
+ _migrationInfo.getTxnNumber());
+ } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) {
+ LOGV2_DEBUG(4620231,
+ 1,
+ "Failed to advance transaction number on recipient shard for abort",
+ "namespace"_attr = _migrationInfo.getNss(),
+ "migrationId"_attr = _migrationInfo.getId(),
+ "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
+ "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
+ "error"_attr = exShardNotFound);
+ }
hangBeforeSendingAbortDecision.pauseWhileSet();
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index f1c7cec016a..7d289fb979e 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -679,11 +679,20 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx,
opCtx, "ready remote range deletion", [&](OperationContext* newOpCtx) {
hangInReadyRangeDeletionOnRecipientInterruptible.pauseWhileSet(newOpCtx);
- sendToRecipient(
- newOpCtx,
- recipientId,
- updateOp,
- BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+ try {
+ sendToRecipient(
+ newOpCtx,
+ recipientId,
+ updateOp,
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+ } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) {
+ LOGV2_DEBUG(4620232,
+ 1,
+ "Failed to mark range deletion task on recipient shard as ready",
+ "migrationId"_attr = migrationId,
+ "error"_attr = exShardNotFound);
+ return;
+ }
if (hangInReadyRangeDeletionOnRecipientThenSimulateErrorUninterruptible.shouldFail()) {
hangInReadyRangeDeletionOnRecipientThenSimulateErrorUninterruptible.pauseWhileSet(
diff --git a/src/mongo/db/s/sessions_collection_config_server.cpp b/src/mongo/db/s/sessions_collection_config_server.cpp
index 95ee087e5ae..99eb2277d02 100644
--- a/src/mongo/db/s/sessions_collection_config_server.cpp
+++ b/src/mongo/db/s/sessions_collection_config_server.cpp
@@ -61,7 +61,7 @@ void SessionsCollectionConfigServer::_shardCollectionIfNeeded(OperationContext*
uassert(ErrorCodes::ShardNotFound,
str::stream() << "Failed to create " << NamespaceString::kLogicalSessionsNamespace
<< ": cannot create the collection until there are shards",
- Grid::get(opCtx)->shardRegistry()->getNumShards() != 0);
+ Grid::get(opCtx)->shardRegistry()->getNumShardsNoReload() != 0);
// First, shard the sessions collection to create it.
ConfigsvrShardCollectionRequest shardCollection;
diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.cpp b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
index a78fea13156..78bd0f861ae 100644
--- a/src/mongo/db/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
@@ -266,10 +266,6 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest(
std::move(executorPoolPtr),
_mockNetwork);
- // NOTE: ShardRegistry::startup() is not called because it starts a task executor with a
- // self-rescheduling task to reload the ShardRegistry over the network.
- // grid->shardRegistry()->startup();
-
if (grid->catalogClient()) {
grid->catalogClient()->startup();
}
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index e0a5cc13476..842da344bec 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -33,50 +33,41 @@
#include "mongo/s/client/shard_registry.h"
-#include <memory>
-#include <set>
-
-#include "mongo/bson/bsonobj.h"
-#include "mongo/bson/util/bson_extract.h"
-#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/client.h"
#include "mongo/db/logical_time_metadata_hook.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/server_options.h"
-#include "mongo/executor/network_connection_hook.h"
+#include "mongo/db/vector_clock.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
-#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
-#include "mongo/platform/mutex.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/client/shard.h"
-#include "mongo/s/client/shard_factory.h"
#include "mongo/s/grid.h"
-#include "mongo/util/concurrency/with_lock.h"
-#include "mongo/util/scopeguard.h"
#include "mongo/util/str.h"
namespace mongo {
-using executor::NetworkInterface;
-using executor::NetworkInterfaceThreadPool;
-using executor::TaskExecutor;
-using executor::TaskExecutorPool;
-using executor::ThreadPoolTaskExecutor;
-using CallbackArgs = TaskExecutor::CallbackArgs;
-using CallbackHandle = TaskExecutor::CallbackHandle;
-
-
namespace {
+
const Seconds kRefreshPeriod(30);
+
+/**
+ * Whether or not the actual topologyTime should be used. When this is false, the
+ * topologyTime part of the cache's Time will stay fixed and not advance.
+ */
+bool useActualTopologyTime() {
+ return serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo(
+ ServerGlobalParams::FeatureCompatibility::Version::kVersion47);
+}
+
} // namespace
+using CallbackArgs = executor::TaskExecutor::CallbackArgs;
+
const ShardId ShardRegistry::kConfigServerShardId = ShardId("config");
ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
@@ -84,154 +75,145 @@ ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
std::vector<ShardRemovalHook> shardRemovalHooks)
: _shardFactory(std::move(shardFactory)),
_initConfigServerCS(configServerCS),
- _shardRemovalHooks(std::move(shardRemovalHooks)) {
+ _shardRemovalHooks(std::move(shardRemovalHooks)),
+ _threadPool([] {
+ ThreadPool::Options options;
+ options.poolName = "ShardRegistry";
+ options.minThreads = 0;
+ options.maxThreads = 1;
+ return options;
+ }()) {
invariant(_initConfigServerCS.isValid());
+ _threadPool.startup();
}
ShardRegistry::~ShardRegistry() {
shutdown();
}
-void ShardRegistry::shutdown() {
- if (_executor && !_isShutdown.load()) {
- LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry");
- _executor->shutdown();
- _executor->join();
- _isShutdown.store(true);
- }
-}
-
-ConnectionString ShardRegistry::getConfigServerConnectionString() const {
- return getConfigShard()->getConnString();
-}
-
-StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
- const ShardId& shardId) {
- // If we know about the shard, return it.
- auto shard = getShardNoReload(shardId);
- if (shard) {
- return shard;
- }
-
- // If we can't find the shard, attempt to reload the ShardRegistry.
- bool didReload = reload(opCtx);
- shard = getShardNoReload(shardId);
-
- // If we found the shard, return it.
- if (shard) {
- return shard;
- }
-
- // If we did not find the shard but performed the reload
- // ourselves, return, because it means the shard does not exist.
- if (didReload) {
- return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
- }
-
- // If we did not perform the reload ourselves (because there was a concurrent reload), force a
- // reload again to ensure that we have seen data at least as up to date as our first reload.
- reload(opCtx);
- shard = getShardNoReload(shardId);
-
- if (shard) {
- return shard;
- }
-
- return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
-}
-
-std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) {
- stdx::lock_guard<Latch> lk(_mutex);
- return _data.findShard(shardId);
-}
-
-std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) {
- stdx::lock_guard<Latch> lk(_mutex);
- return _data.findByHostAndPort(host);
-}
+void ShardRegistry::init(ServiceContext* service) {
+ invariant(!_isInitialized.load());
-std::shared_ptr<Shard> ShardRegistry::getConfigShard() const {
- stdx::lock_guard<Latch> lk(_mutex);
- invariant(_configShard);
- return _configShard;
-}
+ invariant(!_service);
+ _service = service;
-std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
- return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr);
-}
+ auto lookupFn = [this](OperationContext* opCtx,
+ const Singleton& key,
+ const Cache::ValueHandle& cachedData,
+ const Time& timeInStore) {
+ return _lookup(opCtx, key, cachedData, timeInStore);
+ };
-std::shared_ptr<Shard> ShardRegistry::lookupRSName(const std::string& name) const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _data.findByRSName(name);
-}
+ _cache =
+ std::make_unique<Cache>(_cacheMutex, _service, _threadPool, lookupFn, 1 /* cacheSize */);
-void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const {
- std::set<ShardId> seen;
{
stdx::lock_guard<Latch> lk(_mutex);
- _data.getAllShardIds(seen);
+ _configShardData = ShardRegistryData::createWithConfigShardOnly(
+ _shardFactory->createShard(kConfigServerShardId, _initConfigServerCS));
}
- all->assign(seen.begin(), seen.end());
+
+ _isInitialized.store(true);
}
-void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) {
- getAllShardIdsNoReload(all);
- if (all->empty()) {
- bool didReload = reload(opCtx);
- getAllShardIdsNoReload(all);
- // If we didn't do the reload ourselves, we should retry to ensure
- // that the reload is actually initiated while we're executing this
- if (!didReload && all->empty()) {
- reload(opCtx);
- getAllShardIdsNoReload(all);
+ShardRegistry::Cache::LookupResult ShardRegistry::_lookup(OperationContext* opCtx,
+ const Singleton& key,
+ const Cache::ValueHandle& cachedData,
+ const Time& timeInStore) {
+ invariant(key == _kSingleton);
+ invariant(cachedData, "ShardRegistry::_lookup called but the cache is empty");
+
+ LOGV2_DEBUG(4620250,
+ 2,
+ "Starting ShardRegistry::_lookup",
+ "cachedData"_attr = cachedData->toBSON(),
+ "cachedData.getTime()"_attr = cachedData.getTime().toBSON(),
+ "timeInStore"_attr = timeInStore.toBSON());
+
+ // Check if we need to refresh from the configsvrs. If so, then do that and get the results,
+ // otherwise (this is a lookup only to incorporate updated connection strings from the RSM),
+ // then get the equivalent values from the previously cached data.
+ auto [returnData,
+ returnTopologyTime,
+ returnForceReloadIncrement,
+ removedShards,
+ fetchedFromConfigServers] = [&]()
+ -> std::tuple<ShardRegistryData, Timestamp, Increment, ShardRegistryData::ShardMap, bool> {
+ if (timeInStore.topologyTime > cachedData.getTime().topologyTime ||
+ timeInStore.forceReloadIncrement > cachedData.getTime().forceReloadIncrement) {
+ auto [reloadedData, maxTopologyTime] =
+ ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get());
+ if (!useActualTopologyTime()) {
+ // If not using the actual topology time, then just use the topologyTime currently
+ // in the cache, instead of the maximum topologyTime value from config.shards. This
+ // is necessary during upgrade/downgrade when topologyTime might not be gossiped by
+ // all nodes (and so isn't being used).
+ maxTopologyTime = cachedData.getTime().topologyTime;
+ }
+
+ auto [mergedData, removedShards] =
+ ShardRegistryData::mergeExisting(*cachedData, reloadedData);
+
+ return {
+ mergedData, maxTopologyTime, timeInStore.forceReloadIncrement, removedShards, true};
+ } else {
+ return {*cachedData,
+ cachedData.getTime().topologyTime,
+ cachedData.getTime().forceReloadIncrement,
+ {},
+ false};
}
- }
-}
+ }();
-int ShardRegistry::getNumShards() const {
- std::set<ShardId> seen;
- {
- stdx::lock_guard<Latch> lk(_mutex);
- _data.getAllShardIds(seen);
- }
- return seen.size();
-}
+ // Always apply the latest conn strings.
+ auto [latestConnStrings, rsmIncrementForConnStrings] = _getLatestConnStrings();
-void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) {
- invariant(newConnString.type() == ConnectionString::SET ||
- newConnString.type() == ConnectionString::CUSTOM); // For dbtests
+ for (const auto& latestConnString : latestConnStrings) {
+ // TODO SERVER-50909: Optimise by only doing this work if the latest conn string differs.
- // to prevent update config shard connection string during init
- stdx::unique_lock<Latch> lock(_mutex);
+ auto shard = returnData.findByRSName(latestConnString.first.toString());
+ if (!shard) {
+ continue;
+ }
- auto shard = _data.findByRSName(newConnString.getSetName());
- if (!shard) {
- return;
+ auto newData = ShardRegistryData::createFromExisting(
+ returnData, latestConnString.second, _shardFactory.get());
+ returnData = newData;
}
- auto [data, updatedShard] =
- ShardRegistryData::createFromExisting(_data, newConnString, _shardFactory.get());
+ // Remove RSMs that are not in the catalog any more.
+ for (auto& pair : removedShards) {
+ auto& shardId = pair.first;
+ auto& shard = pair.second;
+ invariant(shard);
- if (updatedShard && updatedShard->isConfig()) {
- _configShard = updatedShard;
+ auto name = shard->getConnString().getSetName();
+ ReplicaSetMonitor::remove(name);
+ for (auto& callback : _shardRemovalHooks) {
+ // Run callbacks asynchronously.
+ // TODO SERVER-50906: Consider running these callbacks synchronously.
+ ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor())
+ .getAsync([=](const Status&) { callback(shardId); });
+ }
}
- _data = data;
-}
-
-void ShardRegistry::init() {
- invariant(!_isInitialized.load());
- {
- stdx::unique_lock<Latch> lock(_mutex);
- _configShard =
- _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS);
- _data = ShardRegistryData::createWithConfigShardOnly(_configShard);
+ // The registry is "up" once there has been a successful lookup from the config servers.
+ if (fetchedFromConfigServers) {
+ _isUp.store(true);
}
- _isInitialized.store(true);
+
+ Time returnTime{returnTopologyTime, rsmIncrementForConnStrings, returnForceReloadIncrement};
+ LOGV2_DEBUG(4620251,
+ 2,
+ "Finished ShardRegistry::_lookup",
+ "returnData"_attr = returnData.toBSON(),
+ "returnTime"_attr = returnTime);
+ return Cache::LookupResult{returnData, returnTime};
}
-void ShardRegistry::startup(OperationContext* opCtx) {
- // startup() must be called only once
+void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) {
+ invariant(_isInitialized.load());
+ // startupPeriodicReloader() must be called only once
invariant(!_executor);
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
@@ -240,16 +222,17 @@ void ShardRegistry::startup(OperationContext* opCtx) {
// construct task executor
auto net = executor::makeNetworkInterface("ShardRegistryUpdater", nullptr, std::move(hookList));
auto netPtr = net.get();
- _executor = std::make_unique<ThreadPoolTaskExecutor>(
- std::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
+ _executor = std::make_unique<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net));
LOGV2_DEBUG(22724, 1, "Starting up task executor for periodic reloading of ShardRegistry");
_executor->startup();
auto status =
- _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); });
+ _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); });
if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
- LOGV2_DEBUG(22725, 1, "Cant schedule Shard Registry reload. Executor shutdown in progress");
+ LOGV2_DEBUG(
+ 22725, 1, "Can't schedule Shard Registry reload. Executor shutdown in progress");
return;
}
@@ -261,7 +244,27 @@ void ShardRegistry::startup(OperationContext* opCtx) {
}
}
-void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) {
+void ShardRegistry::shutdownPeriodicReloader() {
+ if (_executor) {
+ LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry");
+ _executor->shutdown();
+ _executor->join();
+ _executor.reset();
+ }
+}
+
+void ShardRegistry::shutdown() {
+ shutdownPeriodicReloader();
+
+ if (!_isShutdown.load()) {
+ LOGV2_DEBUG(4620235, 1, "Shutting down shard registry");
+ _threadPool.shutdown();
+ _threadPool.join();
+ _isShutdown.store(true);
+ }
+}
+
+void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) {
LOGV2_DEBUG(22726, 1, "Reloading shardRegistry");
if (!cbArgs.status.isOK()) {
LOGV2_WARNING(22734,
@@ -275,21 +278,26 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) {
auto opCtx = tc->makeOperationContext();
+ auto refreshPeriod = kRefreshPeriod;
+
try {
reload(opCtx.get());
} catch (const DBException& e) {
+ if (e.code() == ErrorCodes::ReadConcernMajorityNotAvailableYet) {
+ refreshPeriod = Seconds(1);
+ }
LOGV2(22727,
"Error running periodic reload of shard registry caused by {error}; will retry after "
"{shardRegistryReloadInterval}",
"Error running periodic reload of shard registry",
"error"_attr = redact(e),
- "shardRegistryReloadInterval"_attr = kRefreshPeriod);
+ "shardRegistryReloadInterval"_attr = refreshPeriod);
}
// reschedule itself
auto status =
- _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
- [this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); });
+ _executor->scheduleWorkAt(_executor->now() + refreshPeriod,
+ [this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); });
if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
LOGV2_DEBUG(
@@ -305,85 +313,155 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) {
}
}
-bool ShardRegistry::isUp() const {
- return _isUp.load();
+ConnectionString ShardRegistry::getConfigServerConnectionString() const {
+ return getConfigShard()->getConnString();
}
-bool ShardRegistry::reload(OperationContext* opCtx) {
- stdx::unique_lock<Latch> reloadLock(_reloadMutex);
-
- if (_reloadState == ReloadState::Reloading) {
- // Another thread is already in the process of reloading so no need to do duplicate work.
- // There is also an issue if multiple threads are allowed to call getAllShards()
- // simultaneously because there is no good way to determine which of the threads has the
- // more recent version of the data.
- try {
- opCtx->waitForConditionOrInterrupt(
- _inReloadCV, reloadLock, [&] { return _reloadState != ReloadState::Reloading; });
- } catch (const DBException& e) {
- LOGV2_DEBUG(22729,
- 1,
- "Error reloading shard registry caused by {error}",
- "Error reloading shard registry",
- "error"_attr = redact(e));
- return false;
- }
+std::shared_ptr<Shard> ShardRegistry::getConfigShard() const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _configShardData.findShard(kConfigServerShardId);
+}
- if (_reloadState == ReloadState::Idle) {
- return false;
+StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
+ const ShardId& shardId) {
+ // First check if this is a config shard lookup.
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (auto shard = _configShardData.findShard(shardId)) {
+ return shard;
}
- // else proceed to reload since an error occured on the last reload attempt.
- invariant(_reloadState == ReloadState::Failed);
}
- _reloadState = ReloadState::Reloading;
- reloadLock.unlock();
+ if (auto shard = _getData(opCtx)->findShard(shardId)) {
+ return shard;
+ }
- auto nextReloadState = ReloadState::Failed;
+ // Reload and try again if the shard was not in the registry
+ reload(opCtx);
+ if (auto shard = _getData(opCtx)->findShard(shardId)) {
+ return shard;
+ }
- auto failGuard = makeGuard([&] {
- if (!reloadLock.owns_lock()) {
- reloadLock.lock();
- }
- _reloadState = nextReloadState;
- _inReloadCV.notify_all();
- });
+ return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
+}
- ShardRegistryData reloadedData =
- ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get(), getConfigShard());
+void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) {
+ std::set<ShardId> seen;
+ auto data = _getData(opCtx);
+ data->getAllShardIds(seen);
+ if (seen.empty()) {
+ reload(opCtx);
+ data = _getData(opCtx);
+ data->getAllShardIds(seen);
+ }
+ all->assign(seen.begin(), seen.end());
+}
+int ShardRegistry::getNumShards(OperationContext* opCtx) {
+ std::set<ShardId> seen;
+ auto data = _getData(opCtx);
+ data->getAllShardIds(seen);
+ return seen.size();
+}
+
+std::pair<std::vector<ShardRegistry::LatestConnStrings::value_type>, ShardRegistry::Increment>
+ShardRegistry::_getLatestConnStrings() const {
stdx::unique_lock<Latch> lock(_mutex);
+ return {{_latestConnStrings.begin(), _latestConnStrings.end()}, _rsmIncrement.load()};
+}
- auto [mergedData, removedShards] = ShardRegistryData::mergeExisting(_data, reloadedData);
- _data = std::move(mergedData);
+void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) {
+ invariant(newConnString.type() == ConnectionString::SET ||
+ newConnString.type() == ConnectionString::CUSTOM); // For dbtests
- lock.unlock();
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (auto shard = _configShardData.findByRSName(newConnString.getSetName())) {
+ auto newData = ShardRegistryData::createFromExisting(
+ _configShardData, newConnString, _shardFactory.get());
+ _configShardData = newData;
+
+ } else {
+ // Stash the new connection string and bump the RSM increment.
+ _latestConnStrings[newConnString.getSetName()] = newConnString;
+ auto value = _rsmIncrement.addAndFetch(1);
+ LOGV2_DEBUG(4620252,
+ 2,
+ "ShardRegistry stashed new connection string",
+ "newConnString"_attr = newConnString,
+ "newRSMIncrement"_attr = value);
+ }
+
+ // Schedule a lookup, to incorporate the new connection string.
+ // TODO SERVER-50910: To avoid needing to use a separate thread to schedule the lookup, make
+ // _getData() async.
+ auto status = Grid::get(_service)->getExecutorPool()->getFixedExecutor()->scheduleWork(
+ [this](const CallbackArgs& cbArgs) {
+ ThreadClient tc("shard-registry-rsm-reload", _service);
+
+ auto opCtx = tc->makeOperationContext();
+
+ try {
+ _getData(opCtx.get());
+ } catch (const DBException& e) {
+ LOGV2(4620201,
+ "Error running reload of ShardRegistry for RSM update, caused by {error}",
+ "Error running reload of ShardRegistry for RSM update",
+ "error"_attr = redact(e));
+ }
+ });
- // Remove RSMs that are not in the catalog any more.
- for (auto& pair : removedShards) {
- auto& shardId = pair.first;
- auto& shard = pair.second;
- invariant(shard);
+ if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
+ LOGV2_DEBUG(
+ 4620202,
+ 1,
+ "Can't schedule ShardRegistry reload for RSM update, executor shutdown in progress");
+ return;
+ }
- auto name = shard->getConnString().getSetName();
- ReplicaSetMonitor::remove(name);
- for (auto& callback : _shardRemovalHooks) {
- // Run callbacks asynchronously.
- ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor())
- .getAsync([=](const Status&) { callback(shardId); });
- }
+ if (!status.isOK()) {
+ LOGV2_FATAL(4620203,
+ "Error scheduling ShardRegistry reload for RSM update, caused by {error}",
+ "Error scheduling ShardRegistry reload for RSM update",
+ "error"_attr = redact(status.getStatus()));
+ }
+}
+
+std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
+ return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr);
+}
+
+bool ShardRegistry::isUp() const {
+ return _isUp.load();
+}
+
+void ShardRegistry::toBSON(BSONObjBuilder* result) const {
+ BSONObjBuilder map;
+ BSONObjBuilder hosts;
+ BSONObjBuilder connStrings;
+ auto data = _getCachedData();
+ data->toBSON(&map, &hosts, &connStrings);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _configShardData.toBSON(&map, &hosts, &connStrings);
}
+ result->append("map", map.obj());
+ result->append("hosts", hosts.obj());
+ result->append("connStrings", connStrings.obj());
+}
+
+bool ShardRegistry::reload(OperationContext* opCtx) {
+ // Make the next acquire do a lookup.
+ auto value = _forceReloadIncrement.addAndFetch(1);
+ LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value);
+
+ // Force it to actually happen now.
+ _getData(opCtx);
- nextReloadState = ReloadState::Idle;
- // first successful reload means that registry is up
- _isUp.store(true);
return true;
}
void ShardRegistry::clearEntries() {
- ShardRegistryData empty;
- stdx::lock_guard<Latch> lk(_mutex);
- _data = empty;
+ _cache->invalidateAll();
}
void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext,
@@ -393,7 +471,8 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex
auto opCtx = tc->makeOperationContext();
auto const grid = Grid::get(opCtx.get());
- std::shared_ptr<Shard> s = grid->shardRegistry()->lookupRSName(connStr.getSetName());
+ std::shared_ptr<Shard> s =
+ grid->shardRegistry()->_getShardForRSNameNoReload(connStr.getSetName());
if (!s) {
LOGV2_DEBUG(22730,
1,
@@ -427,25 +506,93 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex
}
}
-void ShardRegistry::toBSON(BSONObjBuilder* result) const {
- std::vector<std::shared_ptr<Shard>> shards;
+// Inserts the initial empty ShardRegistryData into the cache, if the cache is empty.
+void ShardRegistry::_initializeCacheIfNecessary() const {
+ if (!_cache->peekLatestCached(_kSingleton)) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (!_cache->peekLatestCached(_kSingleton)) {
+ _cache->insertOrAssign(_kSingleton, {}, Date_t::now(), Time());
+ }
+ }
+}
+
+ShardRegistry::Cache::ValueHandle ShardRegistry::_getData(OperationContext* opCtx) {
+ _initializeCacheIfNecessary();
+
+ // If the forceReloadIncrement is 0, then we've never done a lookup, so we should be sure to do
+ // one now.
+ Increment uninitializedIncrement{0};
+ _forceReloadIncrement.compareAndSwap(&uninitializedIncrement, 1);
+
+ // Update the time the cache should be aiming for.
+ auto now = VectorClock::get(opCtx)->getTime();
+ // The topologyTime should be advanced to either the actual topologyTime (if it is being
+ // gossiped), or else the previously cached topologyTime value (so that this part of the cache's
+ // time doesn't advance, if topologyTime isn't being gossiped).
+ Timestamp topologyTime = useActualTopologyTime()
+ ? now.topologyTime().asTimestamp()
+ : _cache->peekLatestCached(_kSingleton).getTime().topologyTime;
+ _cache->advanceTimeInStore(
+ _kSingleton, Time(topologyTime, _rsmIncrement.load(), _forceReloadIncrement.load()));
+
+ return _cache->acquire(opCtx, _kSingleton, CacheCausalConsistency::kLatestKnown);
+}
+
+// TODO SERVER-50206: Remove usage of these non-causally consistent accessors.
+
+ShardRegistry::Cache::ValueHandle ShardRegistry::_getCachedData() const {
+ _initializeCacheIfNecessary();
+ return _cache->peekLatestCached(_kSingleton);
+}
+
+std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) const {
+ // First check if this is a config shard lookup.
{
stdx::lock_guard<Latch> lk(_mutex);
- _data.getAllShards(shards);
+ if (auto shard = _configShardData.findShard(shardId)) {
+ return shard;
+ }
}
+ auto data = _getCachedData();
+ return data->findShard(shardId);
+}
- std::sort(std::begin(shards),
- std::end(shards),
- [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) {
- return lhs->getId() < rhs->getId();
- });
+std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) const {
+ // First check if this is a config shard lookup.
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (auto shard = _configShardData.findByHostAndPort(host)) {
+ return shard;
+ }
+ }
+ auto data = _getCachedData();
+ return data->findByHostAndPort(host);
+}
- BSONObjBuilder mapBob(result->subobjStart("map"));
- for (auto&& shard : shards) {
- // Intentionally calling getConnString while not holding _mutex
- // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote.
- mapBob.append(shard->getId(), shard->getConnString().toString());
+void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const {
+ std::set<ShardId> seen;
+ auto data = _getCachedData();
+ data->getAllShardIds(seen);
+ all->assign(seen.begin(), seen.end());
+}
+
+int ShardRegistry::getNumShardsNoReload() const {
+ std::set<ShardId> seen;
+ auto data = _getCachedData();
+ data->getAllShardIds(seen);
+ return seen.size();
+}
+
+std::shared_ptr<Shard> ShardRegistry::_getShardForRSNameNoReload(const std::string& name) const {
+ // First check if this is a config shard lookup.
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (auto shard = _configShardData.findByRSName(name)) {
+ return shard;
+ }
}
+ auto data = _getCachedData();
+ return data->findByRSName(name);
}
////////////// ShardRegistryData //////////////////
@@ -456,9 +603,8 @@ ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr<S
return data;
}
-ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* opCtx,
- ShardFactory* shardFactory,
- std::shared_ptr<Shard> configShard) {
+std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClient(
+ OperationContext* opCtx, ShardFactory* shardFactory) {
auto const catalogClient = Grid::get(opCtx)->catalogClient();
auto readConcern = repl::ReadConcernLevel::kMajorityReadConcern;
@@ -489,6 +635,7 @@ ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* o
// Do this before re-taking the mutex to avoid deadlock with the ReplicaSetMonitor updating
// hosts for a given shard.
std::vector<std::tuple<std::string, ConnectionString>> shardsInfo;
+ Timestamp maxTopologyTime;
for (const auto& shardType : shards) {
// This validation should ideally go inside the ShardType::validate call. However, doing
// it there would prevent us from loading previously faulty shard hosts, which might have
@@ -502,11 +649,15 @@ ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* o
continue;
}
+ if (auto thisTopologyTime = shardType.getTopologyTime();
+ maxTopologyTime < thisTopologyTime) {
+ maxTopologyTime = thisTopologyTime;
+ }
+
shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue()));
}
ShardRegistryData data;
- data._addShard(configShard, true);
for (auto& shardInfo : shardsInfo) {
if (std::get<0>(shardInfo) == "config") {
continue;
@@ -517,7 +668,7 @@ ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* o
data._addShard(std::move(shard), false);
}
- return data;
+ return {data, maxTopologyTime};
}
std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mergeExisting(
@@ -550,21 +701,20 @@ std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mer
return {mergedData, removedShards};
}
-std::pair<ShardRegistryData, std::shared_ptr<Shard>> ShardRegistryData::createFromExisting(
- const ShardRegistryData& existingData,
- const ConnectionString& newConnString,
- ShardFactory* shardFactory) {
+ShardRegistryData ShardRegistryData::createFromExisting(const ShardRegistryData& existingData,
+ const ConnectionString& newConnString,
+ ShardFactory* shardFactory) {
ShardRegistryData data(existingData);
auto it = data._rsLookup.find(newConnString.getSetName());
if (it == data._rsLookup.end()) {
- return {data, nullptr};
+ return data;
}
invariant(it->second);
auto updatedShard = shardFactory->createShard(it->second->getId(), newConnString);
data._addShard(updatedShard, true);
- return {data, updatedShard};
+ return data;
}
std::shared_ptr<Shard> ShardRegistryData::findByRSName(const std::string& name) const {
@@ -682,4 +832,70 @@ void ShardRegistryData::_addShard(std::shared_ptr<Shard> shard, bool useOriginal
}
}
+void ShardRegistryData::toBSON(BSONObjBuilder* map,
+ BSONObjBuilder* hosts,
+ BSONObjBuilder* connStrings) const {
+ std::vector<std::shared_ptr<Shard>> shards;
+ getAllShards(shards);
+
+ std::sort(std::begin(shards),
+ std::end(shards),
+ [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) {
+ return lhs->getId() < rhs->getId();
+ });
+
+ if (map) {
+ for (auto&& shard : shards) {
+ map->append(shard->getId(), shard->getConnString().toString());
+ }
+ }
+
+ if (hosts) {
+ for (const auto& hostIt : _hostLookup) {
+ hosts->append(hostIt.first.toString(), hostIt.second->getId());
+ }
+ }
+
+ if (connStrings) {
+ for (const auto& connStringIt : _connStringLookup) {
+ connStrings->append(connStringIt.first.toString(), connStringIt.second->getId());
+ }
+ }
+}
+
+void ShardRegistryData::toBSON(BSONObjBuilder* result) const {
+ std::vector<std::shared_ptr<Shard>> shards;
+ getAllShards(shards);
+
+ std::sort(std::begin(shards),
+ std::end(shards),
+ [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) {
+ return lhs->getId() < rhs->getId();
+ });
+
+ BSONObjBuilder mapBob(result->subobjStart("map"));
+ for (auto&& shard : shards) {
+ mapBob.append(shard->getId(), shard->getConnString().toString());
+ }
+ mapBob.done();
+
+ BSONObjBuilder hostsBob(result->subobjStart("hosts"));
+ for (const auto& hostIt : _hostLookup) {
+ hostsBob.append(hostIt.first.toString(), hostIt.second->getId());
+ }
+ hostsBob.done();
+
+ BSONObjBuilder connStringsBob(result->subobjStart("connStrings"));
+ for (const auto& connStringIt : _connStringLookup) {
+ connStringsBob.append(connStringIt.first.toString(), connStringIt.second->getId());
+ }
+ connStringsBob.done();
+}
+
+BSONObj ShardRegistryData::toBSON() const {
+ BSONObjBuilder bob;
+ toBSON(&bob);
+ return bob.obj();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index 486788c57dd..711c8e0d34b 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -29,30 +29,24 @@
#pragma once
-#include <memory>
-#include <set>
#include <string>
#include <vector>
-#include "mongo/db/jsobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/connection_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/server_options.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/s/client/shard.h"
-#include "mongo/stdx/condition_variable.h"
+#include "mongo/s/client/shard_factory.h"
#include "mongo/stdx/unordered_map.h"
-#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/read_through_cache.h"
namespace mongo {
-class BSONObjBuilder;
-struct HostAndPort;
-class NamespaceString;
-class OperationContext;
-class ServiceContext;
-class ShardFactory;
-class Shard;
-class ShardType;
-
class ShardRegistryData {
public:
using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>;
@@ -67,9 +61,8 @@ public:
/**
* Reads shards docs from the catalog client and fills in maps.
*/
- static ShardRegistryData createFromCatalogClient(OperationContext* opCtx,
- ShardFactory* shardFactory,
- std::shared_ptr<Shard> configShard);
+ static std::pair<ShardRegistryData, Timestamp> createFromCatalogClient(
+ OperationContext* opCtx, ShardFactory* shardFactory);
/**
* Merges alreadyCachedData and configServerData into a new ShardRegistryData.
@@ -93,10 +86,9 @@ public:
* Create a duplicate of existingData, but additionally updates the shard for newConnString.
* Used when notified by the RSM of a new connection string from a shard.
*/
- static std::pair<ShardRegistryData, std::shared_ptr<Shard>> createFromExisting(
- const ShardRegistryData& existingData,
- const ConnectionString& newConnString,
- ShardFactory* shardFactory);
+ static ShardRegistryData createFromExisting(const ShardRegistryData& existingData,
+ const ConnectionString& newConnString,
+ ShardFactory* shardFactory);
/**
* Returns the shard with the given shard id, connection string, or host and port.
@@ -128,6 +120,10 @@ public:
*/
void getAllShards(std::vector<std::shared_ptr<Shard>>& result) const;
+ void toBSON(BSONObjBuilder* result) const;
+ void toBSON(BSONObjBuilder* map, BSONObjBuilder* hosts, BSONObjBuilder* connStrings) const;
+ BSONObj toBSON() const;
+
private:
/**
* Returns the shard with the given shard id, or nullptr if no such shard.
@@ -198,70 +194,69 @@ public:
~ShardRegistry();
/**
- * Starts ReplicaSetMonitor by adding a config shard.
+ * Initializes ShardRegistry with config shard. Must be called outside c-tor to avoid calls on
+ * this while its still not fully constructed.
*/
- void startup(OperationContext* opCtx);
+ void init(ServiceContext* service);
/**
- * This is invalid to use on the config server and will hit an invariant if it is done.
- * If the config server has need of a connection string for itself, it should get it from the
- * replication state.
- *
- * Returns the connection string for the config server.
+ * Startup the periodic reloader of the ShardRegistry.
+ * Can be called only after ShardRegistry::init()
*/
- ConnectionString getConfigServerConnectionString() const;
+ void startupPeriodicReloader(OperationContext* opCtx);
/**
- * Reloads the ShardRegistry based on the contents of the config server's config.shards
- * collection. Returns true if this call performed a reload and false if this call only waited
- * for another thread to perform the reload and did not actually reload. Because of this, it is
- * possible that calling reload once may not result in the most up to date view. If strict
- * reloading is required, the caller should call this method one more time if the first call
- * returned false.
+ * Shutdown the periodic reloader of the ShardRegistry.
*/
- bool reload(OperationContext* opCtx);
+ void shutdownPeriodicReloader();
/**
- * Clears all entries from the shard registry entries, which will force the registry to do a
- * reload on next access.
+ * Shuts down the threadPool. Needs to be called explicitly because ShardRegistry is never
+ * destroyed as it's owned by the static grid object.
*/
- void clearEntries();
+ void shutdown();
/**
- * Takes a connection string describing either a shard or config server replica set, looks
- * up the corresponding Shard object based on the replica set name, then updates the
- * ShardRegistry's notion of what hosts make up that shard.
+ * This is invalid to use on the config server and will hit an invariant if it is done.
+ * If the config server has need of a connection string for itself, it should get it from the
+ * replication state.
+ *
+ * Returns the connection string for the config server.
*/
- void updateReplSetHosts(const ConnectionString& newConnString);
+ ConnectionString getConfigServerConnectionString() const;
+
+ /**
+ * Returns shared pointer to the shard object representing the config servers.
+ *
+ * The config shard is always known, so this function never blocks.
+ */
+ std::shared_ptr<Shard> getConfigShard() const;
/**
* Returns a shared pointer to the shard object with the given shard id, or ShardNotFound error
* otherwise.
*
* May refresh the shard registry if there's no cached information about the shard. The shardId
- * parameter can actually be the shard name or the HostAndPort for any
- * server in the shard.
+ * parameter can actually be the shard name or the HostAndPort for any server in the shard.
*/
StatusWith<std::shared_ptr<Shard>> getShard(OperationContext* opCtx, const ShardId& shardId);
/**
- * Returns a shared pointer to the shard object with the given shard id. The shardId parameter
- * can actually be the shard name or the HostAndPort for any server in the shard. Will not
- * refresh the shard registry or otherwise perform any network traffic. This means that if the
- * shard was recently added it may not be found. USE WITH CAUTION.
+ * Populates all known shard ids into the given vector.
*/
- std::shared_ptr<Shard> getShardNoReload(const ShardId& shardId);
+ void getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all);
/**
- * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not
- * refresh the shard registry or otherwise perform any network traffic.
+ * Returns the number of shards.
*/
- std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost);
+ int getNumShards(OperationContext* opCtx);
/**
- * Returns shared pointer to the shard object representing the config servers.
+ * Takes a connection string describing either a shard or config server replica set, looks
+ * up the corresponding Shard object based on the replica set name, then updates the
+ * ShardRegistry's notion of what hosts make up that shard.
*/
- std::shared_ptr<Shard> getConfigShard() const;
+ void updateReplSetHosts(const ConnectionString& newConnString);
/**
* Instantiates a new detached shard connection, which does not appear in the list of shards
@@ -274,36 +269,28 @@ public:
std::unique_ptr<Shard> createConnection(const ConnectionString& connStr) const;
/**
- * Lookup shard by replica set name. Returns nullptr if the name can't be found.
- * Note: this doesn't refresh the table if the name isn't found, so it's possible that a
- * newly added shard/Replica Set may not be found.
+ * The ShardRegistry is "up" once a successful lookup from the config servers has been
+ * completed.
*/
- std::shared_ptr<Shard> lookupRSName(const std::string& name) const;
-
- void getAllShardIdsNoReload(std::vector<ShardId>* all) const;
-
- /**
- * Like getAllShardIdsNoReload(), but does a reload internally in the case that
- * getAllShardIdsNoReload() comes back empty
- */
- void getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all);
-
- int getNumShards() const;
+ bool isUp() const;
void toBSON(BSONObjBuilder* result) const;
- bool isUp() const;
/**
- * Initializes ShardRegistry with config shard. Must be called outside c-tor to avoid calls on
- * this while its still not fully constructed.
+ * Reloads the ShardRegistry based on the contents of the config server's config.shards
+ * collection. Returns true if this call performed a reload and false if this call only waited
+ * for another thread to perform the reload and did not actually reload. Because of this, it is
+ * possible that calling reload once may not result in the most up to date view. If strict
+ * reloading is required, the caller should call this method one more time if the first call
+ * returned false.
*/
- void init();
+ bool reload(OperationContext* opCtx);
/**
- * Shuts down _executor. Needs to be called explicitly because ShardRegistry is never destroyed
- * as it's owned by the static grid object.
+ * Clears all entries from the shard registry entries, which will force the registry to do a
+ * reload on next access.
*/
- void shutdown();
+ void clearEntries();
/**
* For use in mongos which needs notifications about changes to shard replset membership to
@@ -312,8 +299,136 @@ public:
static void updateReplicaSetOnConfigServer(ServiceContext* serviceContex,
const ConnectionString& connStr) noexcept;
+ // TODO SERVER-50206: Remove usage of these non-causally consistent accessors.
+ //
+ // Their most important current users are dispatching requests to hosts, and processing
+ // responses from hosts. These contexts need to know the shard that the host is associated
+ // with, but usually have no access to any associated opCtx (if there even is one), and also
+ // cannot tolerate waiting for further network activity (if the cache is stale and needs to be
+ // refreshed via _lookup()).
+
+ /**
+ * Returns a shared pointer to the shard object with the given shard id. The shardId parameter
+ * can actually be the shard name or the HostAndPort for any server in the shard. Will not
+ * refresh the shard registry or otherwise perform any network traffic. This means that if the
+ * shard was recently added it may not be found. USE WITH CAUTION.
+ */
+ std::shared_ptr<Shard> getShardNoReload(const ShardId& shardId) const;
+
+ /**
+ * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not
+ * refresh the shard registry or otherwise perform any network traffic.
+ */
+ std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost) const;
+
+ void getAllShardIdsNoReload(std::vector<ShardId>* all) const;
+
+ int getNumShardsNoReload() const;
+
private:
- void _internalReload(const executor::TaskExecutor::CallbackArgs& cbArgs);
+ /**
+ * The ShardRegistry uses the ReadThroughCache to handle refreshing itself. The cache stores
+ * a single entry, with key of Singleton, value of ShardRegistryData, and causal-consistency
+ * time which is primarily Timestamp (based on the TopologyTime), but with additional
+ * "increment"s that are used to flag additional refresh criteria.
+ */
+
+ using Increment = int64_t;
+
+ struct Time {
+ explicit Time() {}
+
+ explicit Time(Timestamp topologyTime,
+ Increment rsmIncrement,
+ Increment forceReloadIncrement)
+ : topologyTime(topologyTime),
+ rsmIncrement(rsmIncrement),
+ forceReloadIncrement(forceReloadIncrement) {}
+
+ bool operator==(const Time& other) const {
+ return topologyTime == other.topologyTime && rsmIncrement == other.rsmIncrement &&
+ forceReloadIncrement == other.forceReloadIncrement;
+ }
+ bool operator!=(const Time& other) const {
+ return !(*this == other);
+ }
+ bool operator>(const Time& other) const {
+ return topologyTime > other.topologyTime || rsmIncrement > other.rsmIncrement ||
+ forceReloadIncrement > other.forceReloadIncrement;
+ }
+ bool operator>=(const Time& other) const {
+ return (*this > other) || (*this == other);
+ }
+ bool operator<(const Time& other) const {
+ return !(*this >= other);
+ }
+ bool operator<=(const Time& other) const {
+ return !(*this > other);
+ }
+
+ BSONObj toBSON() const {
+ BSONObjBuilder bob;
+ bob.append("topologyTime", topologyTime);
+ bob.append("rsmIncrement", rsmIncrement);
+ bob.append("forceReloadIncrement", forceReloadIncrement);
+ return bob.obj();
+ }
+
+ Timestamp topologyTime;
+
+ // The increments are used locally to trigger the lookup function.
+ //
+ // The rsmIncrement is used to indicate that that there are stashed RSM updates that need to
+ // be incorporated.
+ //
+ // The forceReloadIncrement is used to indicate that the latest data should be fetched from
+ // the configsvrs (ie. when the topologyTime can't be used for this, eg. in the first
+ // lookup, and in contexts like unittests where topologyTime isn't gossipped but the
+ // ShardRegistry still needs to be reloaded). This is how reload() is able to force a
+ // refresh from the config servers - incrementing the forceReloadIncrement causes the cache
+ // to call _lookup() (rather than having reload() attempt to do a synchronous refresh).
+ Increment rsmIncrement{0};
+ Increment forceReloadIncrement{0};
+ };
+
+ enum class Singleton { Only };
+ static constexpr auto _kSingleton = Singleton::Only;
+
+ using Cache = ReadThroughCache<Singleton, ShardRegistryData, Time>;
+
+ Cache::LookupResult _lookup(OperationContext* opCtx,
+ const Singleton& key,
+ const Cache::ValueHandle& cachedData,
+ const Time& timeInStore);
+
+ /**
+ * Gets a causally-consistent (ie. latest-known) copy of the ShardRegistryData, refreshing from
+ * the config servers if necessary.
+ */
+ Cache::ValueHandle _getData(OperationContext* opCtx);
+
+ /**
+ * Gets the latest-cached copy of the ShardRegistryData. Never fetches from the config servers.
+ * Only used by the "NoReload" accessors.
+ * TODO SERVER-50206: Remove usage of this non-causally consistent accessor.
+ */
+ Cache::ValueHandle _getCachedData() const;
+
+ /**
+ * Lookup shard by replica set name. Returns nullptr if the name can't be found.
+ * Note: this doesn't refresh the table if the name isn't found, so it's possible that a
+ * newly added shard/Replica Set may not be found.
+ * TODO SERVER-50206: Remove usage of this non-causally consistent accessor.
+ */
+ std::shared_ptr<Shard> _getShardForRSNameNoReload(const std::string& name) const;
+
+ using LatestConnStrings = stdx::unordered_map<ShardId, ConnectionString, ShardId::Hasher>;
+
+ std::pair<std::vector<LatestConnStrings::value_type>, Increment> _getLatestConnStrings() const;
+
+ void _initializeCacheIfNecessary() const;
+
+ void _periodicReload(const executor::TaskExecutor::CallbackArgs& cbArgs);
/**
* Factory to create shards. Never changed after startup so safe to access outside of _mutex.
@@ -326,24 +441,37 @@ private:
*/
const ConnectionString _initConfigServerCS;
- AtomicWord<bool> _isInitialized{false};
-
/**
* A list of callbacks to be called asynchronously when it has been discovered that a shard was
* removed.
*/
const std::vector<ShardRemovalHook> _shardRemovalHooks;
- // Protects the ShardRegistryData lookup maps in _data, and _configShard.
+ // Thread pool used when looking up new values for the cache (ie. in which _lookup() runs).
+ ThreadPool _threadPool;
+
+ // Executor for periodically reloading the registry (ie. in which _periodicReload() runs).
+ std::unique_ptr<executor::TaskExecutor> _executor{};
+
+ mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("ShardRegistry::_cacheMutex");
+ std::unique_ptr<Cache> _cache;
+
+ // Counters for incrementing the rsmIncrement and forceReloadIncrement fields of the Time used
+ // by the _cache. See the comments for these fields in the Time class above for an explanation
+ // of their purpose.
+ AtomicWord<Increment> _rsmIncrement{0};
+ AtomicWord<Increment> _forceReloadIncrement{0};
+
+ // Protects _configShardData, and _latestNewConnStrings.
mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardRegistry::_mutex");
- ShardRegistryData _data;
+ // Store a reference to the configShard.
+ ShardRegistryData _configShardData;
- // Store a separate reference to the configShard.
- std::shared_ptr<Shard> _configShard;
+ // The key is replset name (the type is ShardId just to take advantage of its hasher).
+ LatestConnStrings _latestConnStrings;
- // Executor for reloading.
- std::unique_ptr<executor::TaskExecutor> _executor{};
+ AtomicWord<bool> _isInitialized{false};
// The ShardRegistry is "up" once there has been a successful refresh.
AtomicWord<bool> _isUp{false};
@@ -351,17 +479,7 @@ private:
// Set to true in shutdown call to prevent calling it twice.
AtomicWord<bool> _isShutdown{false};
- // Protects the _reloadState during startup and refresh.
- mutable Mutex _reloadMutex = MONGO_MAKE_LATCH("ShardRegistry::_reloadMutex");
- stdx::condition_variable _inReloadCV;
-
- enum class ReloadState {
- Idle, // no other thread is loading data from config server in reload().
- Reloading, // another thread is loading data from the config server in reload().
- Failed, // last call to reload() caused an error when contacting the config server.
- };
-
- ReloadState _reloadState{ReloadState::Idle};
+ ServiceContext* _service{nullptr};
};
} // namespace mongo
diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp
index c2e3cd011ac..ea28033590e 100644
--- a/src/mongo/s/grid.cpp
+++ b/src/mongo/s/grid.cpp
@@ -90,7 +90,7 @@ void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient,
_executorPool = std::move(executorPool);
_network = network;
- _shardRegistry->init();
+ _shardRegistry->init(grid.owner(this));
}
bool Grid::isShardingInitialized() const {
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index e3baf3d6e60..2b73154e767 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -218,7 +218,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
networkPtr);
// The shard registry must be started once the grid is initialized
- grid->shardRegistry()->startup(opCtx);
+ grid->shardRegistry()->startupPeriodicReloader(opCtx);
// The catalog client must be started after the shard registry has been started up
grid->catalogClient()->startup();
diff --git a/src/mongo/util/invalidating_lru_cache.h b/src/mongo/util/invalidating_lru_cache.h
index c8ead4adecc..2852313ef22 100644
--- a/src/mongo/util/invalidating_lru_cache.h
+++ b/src/mongo/util/invalidating_lru_cache.h
@@ -67,6 +67,18 @@ struct CacheNotCausallyConsistent {
};
/**
+ * Helper for determining if a given type is CacheNotCausallyConsistent or not.
+ */
+template <typename T>
+struct isCausallyConsistentImpl : std::true_type {};
+
+template <>
+struct isCausallyConsistentImpl<CacheNotCausallyConsistent> : std::false_type {};
+
+template <class T>
+inline constexpr bool isCausallyConsistent = isCausallyConsistentImpl<T>::value;
+
+/**
* Specifies the desired causal consistency for calls to 'get' (and 'acquire', respectively in the
* ReadThroughCache, which is its main consumer).
*/
@@ -200,12 +212,8 @@ public:
// doesn't support pinning items. Their only usage must be in the authorization mananager
// for the internal authentication user.
explicit ValueHandle(Value&& value)
- : _value(std::make_shared<StoredValue>(nullptr,
- 0,
- boost::none,
- std::move(value),
- CacheNotCausallyConsistent(),
- CacheNotCausallyConsistent())) {}
+ : _value(std::make_shared<StoredValue>(
+ nullptr, 0, boost::none, std::move(value), Time(), Time())) {}
explicit ValueHandle(Value&& value, const Time& t)
: _value(
@@ -218,9 +226,15 @@ public:
}
bool isValid() const {
+ invariant(bool(*this));
return _value->isValid.loadRelaxed();
}
+ const Time& getTime() const {
+ invariant(bool(*this));
+ return _value->time;
+ }
+
Value* get() {
invariant(bool(*this));
return &_value->value;
@@ -260,13 +274,16 @@ public:
* was called, it will become invalidated.
*
* The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
- * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows
- * non-causally-consistent users to not have to pass a second parameter, but would fail
- * compilation if causally-consistent users forget to pass it.
+ * (since the time never changes).
*/
- void insertOrAssign(const Key& key,
- Value&& value,
- const Time& time = CacheNotCausallyConsistent()) {
+ void insertOrAssign(const Key& key, Value&& value) {
+ MONGO_STATIC_ASSERT_MSG(
+ !isCausallyConsistent<Time>,
+ "Time must be passed to insertOrAssign on causally consistent caches");
+ insertOrAssign(key, std::move(value), Time());
+ }
+
+ void insertOrAssign(const Key& key, Value&& value, const Time& time) {
LockGuardWithPostUnlockDestructor guard(_mutex);
Time currentTime, currentTimeInStore;
_invalidate(&guard, key, _cache.find(key), &currentTime, &currentTimeInStore);
@@ -307,13 +324,16 @@ public:
* destroyed.
*
* The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
- * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows
- * non-causally-consistent users to not have to pass a second parameter, but would fail
- * compilation if causally-consistent users forget to pass it.
+ * (since the time never changes).
*/
- ValueHandle insertOrAssignAndGet(const Key& key,
- Value&& value,
- const Time& time = CacheNotCausallyConsistent()) {
+ ValueHandle insertOrAssignAndGet(const Key& key, Value&& value) {
+ MONGO_STATIC_ASSERT_MSG(
+ !isCausallyConsistent<Time>,
+ "Time must be passed to insertOrAssignAndGet on causally consistent caches");
+ return insertOrAssignAndGet(key, std::move(value), Time());
+ }
+
+ ValueHandle insertOrAssignAndGet(const Key& key, Value&& value, const Time& time) {
LockGuardWithPostUnlockDestructor guard(_mutex);
Time currentTime, currentTimeInStore;
_invalidate(&guard, key, _cache.find(key), &currentTime, &currentTimeInStore);
diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h
index 72b3e7a5771..32efbc576ae 100644
--- a/src/mongo/util/read_through_cache.h
+++ b/src/mongo/util/read_through_cache.h
@@ -152,6 +152,10 @@ public:
return _valueHandle.isValid();
}
+ const Time& getTime() const {
+ return _valueHandle.getTime();
+ }
+
Value* get() {
return &_valueHandle->value;
}
@@ -302,6 +306,33 @@ public:
/**
* Invalidates the given 'key' and immediately replaces it with a new value.
+ *
+ * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
+ * (since the time never changes).
+ */
+ void insertOrAssign(const Key& key, Value&& newValue, Date_t updateWallClockTime) {
+ stdx::lock_guard lg(_mutex);
+ if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
+ it->second->invalidateAndCancelCurrentLookupRound(lg);
+ _cache.insertOrAssign(key, {std::move(newValue), updateWallClockTime});
+ }
+
+ void insertOrAssign(const Key& key,
+ Value&& newValue,
+ Date_t updateWallClockTime,
+ const Time& time) {
+ stdx::lock_guard lg(_mutex);
+ if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
+ it->second->invalidateAndCancelCurrentLookupRound(lg);
+ _cache.insertOrAssign(key, {std::move(newValue), updateWallClockTime}, time);
+ }
+
+ /**
+ * Invalidates the given 'key' and immediately replaces it with a new value, returning a handle
+ * to the new value.
+ *
+ * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
+ * (since the time never changes).
*/
ValueHandle insertOrAssignAndGet(const Key& key, Value&& newValue, Date_t updateWallClockTime) {
stdx::lock_guard lg(_mutex);
@@ -310,6 +341,16 @@ public:
return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime});
}
+ ValueHandle insertOrAssignAndGet(const Key& key,
+ Value&& newValue,
+ Date_t updateWallClockTime,
+ const Time& time) {
+ stdx::lock_guard lg(_mutex);
+ if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
+ it->second->invalidateAndCancelCurrentLookupRound(lg);
+ return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime}, time);
+ }
+
/**
* Indicates to the cache that the backing store has a newer version of 'key', corresponding to
* 'newTime'. Subsequent calls to 'acquireAsync' with a causal consistency set to 'LatestKnown'
@@ -377,9 +418,8 @@ public:
return _cache.getCacheInfo();
}
-protected:
/**
- * ReadThroughCache constructor, to be called by sub-classes, which implement 'lookup'.
+ * ReadThroughCache constructor.
*
* The 'mutex' is for the exclusive usage of the ReadThroughCache and must not be used in any
* way by the implementing class. Having the mutex stored by the sub-class allows latch