summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2020-09-16 00:11:16 +0200
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-15 23:32:25 +0000
commit945ca4f1112f8846e793b40ff47610927b6fc298 (patch)
tree809ba4107d38f25570b0688aa99c3b1c171df6f3
parentf9f7f4832d8e94c233e7281790cbeb9b70129382 (diff)
downloadmongo-945ca4f1112f8846e793b40ff47610927b6fc298.tar.gz
Revert "SERVER-46202 Implement ShardRegistry on top of ReadThroughCache"
This reverts commit a8913858697363a26b06996f0821045b550bea27.
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp41
-rw-r--r--src/mongo/db/s/migration_util.cpp19
-rw-r--r--src/mongo/db/s/sessions_collection_config_server.cpp2
-rw-r--r--src/mongo/db/s/sharding_mongod_test_fixture.cpp4
-rw-r--r--src/mongo/s/client/shard_registry.cpp684
-rw-r--r--src/mongo/s/client/shard_registry.h310
-rw-r--r--src/mongo/s/grid.cpp2
-rw-r--r--src/mongo/s/sharding_initialization.cpp2
-rw-r--r--src/mongo/util/invalidating_lru_cache.h56
-rw-r--r--src/mongo/util/read_through_cache.h44
10 files changed, 377 insertions, 787 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index d72311a3369..22102845062 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -235,32 +235,21 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
23899, 2, "Making abort decision durable", "migrationId"_attr = _migrationInfo.getId());
migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId());
- try {
- LOGV2_DEBUG(23900,
- 2,
- "Bumping transaction number with lsid {lsid} and current txnNumber "
- "{currentTxnNumber} on "
- "recipient shard {recipientShardId} for abort of collection {nss}",
- "Bumping transaction number on recipient shard for abort",
- "namespace"_attr = _migrationInfo.getNss(),
- "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
- "lsid"_attr = _migrationInfo.getLsid(),
- "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
- "migrationId"_attr = _migrationInfo.getId());
- migrationutil::advanceTransactionOnRecipient(opCtx,
- _migrationInfo.getRecipientShardId(),
- _migrationInfo.getLsid(),
- _migrationInfo.getTxnNumber());
- } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) {
- LOGV2_DEBUG(4620231,
- 1,
- "Failed to advance transaction number on recipient shard for abort",
- "namespace"_attr = _migrationInfo.getNss(),
- "migrationId"_attr = _migrationInfo.getId(),
- "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
- "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
- "error"_attr = exShardNotFound);
- }
+ LOGV2_DEBUG(
+ 23900,
+ 2,
+ "Bumping transaction number with lsid {lsid} and current txnNumber {currentTxnNumber} on "
+ "recipient shard {recipientShardId} for abort of collection {nss}",
+ "Bumping transaction number on recipient shard for abort",
+ "namespace"_attr = _migrationInfo.getNss(),
+ "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
+ "lsid"_attr = _migrationInfo.getLsid(),
+ "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
+ "migrationId"_attr = _migrationInfo.getId());
+ migrationutil::advanceTransactionOnRecipient(opCtx,
+ _migrationInfo.getRecipientShardId(),
+ _migrationInfo.getLsid(),
+ _migrationInfo.getTxnNumber());
hangBeforeSendingAbortDecision.pauseWhileSet();
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 7d289fb979e..f1c7cec016a 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -679,20 +679,11 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx,
opCtx, "ready remote range deletion", [&](OperationContext* newOpCtx) {
hangInReadyRangeDeletionOnRecipientInterruptible.pauseWhileSet(newOpCtx);
- try {
- sendToRecipient(
- newOpCtx,
- recipientId,
- updateOp,
- BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
- } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) {
- LOGV2_DEBUG(4620232,
- 1,
- "Failed to mark range deletion task on recipient shard as ready",
- "migrationId"_attr = migrationId,
- "error"_attr = exShardNotFound);
- return;
- }
+ sendToRecipient(
+ newOpCtx,
+ recipientId,
+ updateOp,
+ BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
if (hangInReadyRangeDeletionOnRecipientThenSimulateErrorUninterruptible.shouldFail()) {
hangInReadyRangeDeletionOnRecipientThenSimulateErrorUninterruptible.pauseWhileSet(
diff --git a/src/mongo/db/s/sessions_collection_config_server.cpp b/src/mongo/db/s/sessions_collection_config_server.cpp
index 99eb2277d02..95ee087e5ae 100644
--- a/src/mongo/db/s/sessions_collection_config_server.cpp
+++ b/src/mongo/db/s/sessions_collection_config_server.cpp
@@ -61,7 +61,7 @@ void SessionsCollectionConfigServer::_shardCollectionIfNeeded(OperationContext*
uassert(ErrorCodes::ShardNotFound,
str::stream() << "Failed to create " << NamespaceString::kLogicalSessionsNamespace
<< ": cannot create the collection until there are shards",
- Grid::get(opCtx)->shardRegistry()->getNumShardsNoReload() != 0);
+ Grid::get(opCtx)->shardRegistry()->getNumShards() != 0);
// First, shard the sessions collection to create it.
ConfigsvrShardCollectionRequest shardCollection;
diff --git a/src/mongo/db/s/sharding_mongod_test_fixture.cpp b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
index 78bd0f861ae..a78fea13156 100644
--- a/src/mongo/db/s/sharding_mongod_test_fixture.cpp
+++ b/src/mongo/db/s/sharding_mongod_test_fixture.cpp
@@ -266,6 +266,10 @@ Status ShardingMongodTestFixture::initializeGlobalShardingStateForMongodForTest(
std::move(executorPoolPtr),
_mockNetwork);
+ // NOTE: ShardRegistry::startup() is not called because it starts a task executor with a
+ // self-rescheduling task to reload the ShardRegistry over the network.
+ // grid->shardRegistry()->startup();
+
if (grid->catalogClient()) {
grid->catalogClient()->startup();
}
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 842da344bec..e0a5cc13476 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -33,41 +33,50 @@
#include "mongo/s/client/shard_registry.h"
+#include <memory>
+#include <set>
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/client.h"
#include "mongo/db/logical_time_metadata_hook.h"
-#include "mongo/db/vector_clock.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/server_options.h"
+#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/network_interface_thread_pool.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/platform/mutex.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_factory.h"
#include "mongo/s/grid.h"
+#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/str.h"
namespace mongo {
-namespace {
+using executor::NetworkInterface;
+using executor::NetworkInterfaceThreadPool;
+using executor::TaskExecutor;
+using executor::TaskExecutorPool;
+using executor::ThreadPoolTaskExecutor;
+using CallbackArgs = TaskExecutor::CallbackArgs;
+using CallbackHandle = TaskExecutor::CallbackHandle;
-const Seconds kRefreshPeriod(30);
-
-/**
- * Whether or not the actual topologyTime should be used. When this is false, the
- * topologyTime part of the cache's Time will stay fixed and not advance.
- */
-bool useActualTopologyTime() {
- return serverGlobalParams.featureCompatibility.isVersionInitialized() &&
- serverGlobalParams.featureCompatibility.isGreaterThanOrEqualTo(
- ServerGlobalParams::FeatureCompatibility::Version::kVersion47);
-}
+namespace {
+const Seconds kRefreshPeriod(30);
} // namespace
-using CallbackArgs = executor::TaskExecutor::CallbackArgs;
-
const ShardId ShardRegistry::kConfigServerShardId = ShardId("config");
ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
@@ -75,145 +84,154 @@ ShardRegistry::ShardRegistry(std::unique_ptr<ShardFactory> shardFactory,
std::vector<ShardRemovalHook> shardRemovalHooks)
: _shardFactory(std::move(shardFactory)),
_initConfigServerCS(configServerCS),
- _shardRemovalHooks(std::move(shardRemovalHooks)),
- _threadPool([] {
- ThreadPool::Options options;
- options.poolName = "ShardRegistry";
- options.minThreads = 0;
- options.maxThreads = 1;
- return options;
- }()) {
+ _shardRemovalHooks(std::move(shardRemovalHooks)) {
invariant(_initConfigServerCS.isValid());
- _threadPool.startup();
}
ShardRegistry::~ShardRegistry() {
shutdown();
}
-void ShardRegistry::init(ServiceContext* service) {
- invariant(!_isInitialized.load());
+void ShardRegistry::shutdown() {
+ if (_executor && !_isShutdown.load()) {
+ LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry");
+ _executor->shutdown();
+ _executor->join();
+ _isShutdown.store(true);
+ }
+}
- invariant(!_service);
- _service = service;
+ConnectionString ShardRegistry::getConfigServerConnectionString() const {
+ return getConfigShard()->getConnString();
+}
- auto lookupFn = [this](OperationContext* opCtx,
- const Singleton& key,
- const Cache::ValueHandle& cachedData,
- const Time& timeInStore) {
- return _lookup(opCtx, key, cachedData, timeInStore);
- };
+StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
+ const ShardId& shardId) {
+ // If we know about the shard, return it.
+ auto shard = getShardNoReload(shardId);
+ if (shard) {
+ return shard;
+ }
- _cache =
- std::make_unique<Cache>(_cacheMutex, _service, _threadPool, lookupFn, 1 /* cacheSize */);
+ // If we can't find the shard, attempt to reload the ShardRegistry.
+ bool didReload = reload(opCtx);
+ shard = getShardNoReload(shardId);
- {
- stdx::lock_guard<Latch> lk(_mutex);
- _configShardData = ShardRegistryData::createWithConfigShardOnly(
- _shardFactory->createShard(kConfigServerShardId, _initConfigServerCS));
+ // If we found the shard, return it.
+ if (shard) {
+ return shard;
}
- _isInitialized.store(true);
+ // If we did not find the shard but performed the reload
+ // ourselves, return, because it means the shard does not exist.
+ if (didReload) {
+ return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
+ }
+
+ // If we did not perform the reload ourselves (because there was a concurrent reload), force a
+ // reload again to ensure that we have seen data at least as up to date as our first reload.
+ reload(opCtx);
+ shard = getShardNoReload(shardId);
+
+ if (shard) {
+ return shard;
+ }
+
+ return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
}
-ShardRegistry::Cache::LookupResult ShardRegistry::_lookup(OperationContext* opCtx,
- const Singleton& key,
- const Cache::ValueHandle& cachedData,
- const Time& timeInStore) {
- invariant(key == _kSingleton);
- invariant(cachedData, "ShardRegistry::_lookup called but the cache is empty");
-
- LOGV2_DEBUG(4620250,
- 2,
- "Starting ShardRegistry::_lookup",
- "cachedData"_attr = cachedData->toBSON(),
- "cachedData.getTime()"_attr = cachedData.getTime().toBSON(),
- "timeInStore"_attr = timeInStore.toBSON());
-
- // Check if we need to refresh from the configsvrs. If so, then do that and get the results,
- // otherwise (this is a lookup only to incorporate updated connection strings from the RSM),
- // then get the equivalent values from the previously cached data.
- auto [returnData,
- returnTopologyTime,
- returnForceReloadIncrement,
- removedShards,
- fetchedFromConfigServers] = [&]()
- -> std::tuple<ShardRegistryData, Timestamp, Increment, ShardRegistryData::ShardMap, bool> {
- if (timeInStore.topologyTime > cachedData.getTime().topologyTime ||
- timeInStore.forceReloadIncrement > cachedData.getTime().forceReloadIncrement) {
- auto [reloadedData, maxTopologyTime] =
- ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get());
- if (!useActualTopologyTime()) {
- // If not using the actual topology time, then just use the topologyTime currently
- // in the cache, instead of the maximum topologyTime value from config.shards. This
- // is necessary during upgrade/downgrade when topologyTime might not be gossiped by
- // all nodes (and so isn't being used).
- maxTopologyTime = cachedData.getTime().topologyTime;
- }
-
- auto [mergedData, removedShards] =
- ShardRegistryData::mergeExisting(*cachedData, reloadedData);
-
- return {
- mergedData, maxTopologyTime, timeInStore.forceReloadIncrement, removedShards, true};
- } else {
- return {*cachedData,
- cachedData.getTime().topologyTime,
- cachedData.getTime().forceReloadIncrement,
- {},
- false};
- }
- }();
+std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _data.findShard(shardId);
+}
- // Always apply the latest conn strings.
- auto [latestConnStrings, rsmIncrementForConnStrings] = _getLatestConnStrings();
+std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _data.findByHostAndPort(host);
+}
- for (const auto& latestConnString : latestConnStrings) {
- // TODO SERVER-50909: Optimise by only doing this work if the latest conn string differs.
+std::shared_ptr<Shard> ShardRegistry::getConfigShard() const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ invariant(_configShard);
+ return _configShard;
+}
- auto shard = returnData.findByRSName(latestConnString.first.toString());
- if (!shard) {
- continue;
+std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
+ return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr);
+}
+
+std::shared_ptr<Shard> ShardRegistry::lookupRSName(const std::string& name) const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _data.findByRSName(name);
+}
+
+void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const {
+ std::set<ShardId> seen;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data.getAllShardIds(seen);
+ }
+ all->assign(seen.begin(), seen.end());
+}
+
+void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) {
+ getAllShardIdsNoReload(all);
+ if (all->empty()) {
+ bool didReload = reload(opCtx);
+ getAllShardIdsNoReload(all);
+ // If we didn't do the reload ourselves, we should retry to ensure
+ // that the reload is actually initiated while we're executing this
+ if (!didReload && all->empty()) {
+ reload(opCtx);
+ getAllShardIdsNoReload(all);
}
+ }
+}
- auto newData = ShardRegistryData::createFromExisting(
- returnData, latestConnString.second, _shardFactory.get());
- returnData = newData;
+int ShardRegistry::getNumShards() const {
+ std::set<ShardId> seen;
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data.getAllShardIds(seen);
}
+ return seen.size();
+}
- // Remove RSMs that are not in the catalog any more.
- for (auto& pair : removedShards) {
- auto& shardId = pair.first;
- auto& shard = pair.second;
- invariant(shard);
+void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) {
+ invariant(newConnString.type() == ConnectionString::SET ||
+ newConnString.type() == ConnectionString::CUSTOM); // For dbtests
- auto name = shard->getConnString().getSetName();
- ReplicaSetMonitor::remove(name);
- for (auto& callback : _shardRemovalHooks) {
- // Run callbacks asynchronously.
- // TODO SERVER-50906: Consider running these callbacks synchronously.
- ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor())
- .getAsync([=](const Status&) { callback(shardId); });
- }
+ // to prevent update config shard connection string during init
+ stdx::unique_lock<Latch> lock(_mutex);
+
+ auto shard = _data.findByRSName(newConnString.getSetName());
+ if (!shard) {
+ return;
}
- // The registry is "up" once there has been a successful lookup from the config servers.
- if (fetchedFromConfigServers) {
- _isUp.store(true);
+ auto [data, updatedShard] =
+ ShardRegistryData::createFromExisting(_data, newConnString, _shardFactory.get());
+
+ if (updatedShard && updatedShard->isConfig()) {
+ _configShard = updatedShard;
}
- Time returnTime{returnTopologyTime, rsmIncrementForConnStrings, returnForceReloadIncrement};
- LOGV2_DEBUG(4620251,
- 2,
- "Finished ShardRegistry::_lookup",
- "returnData"_attr = returnData.toBSON(),
- "returnTime"_attr = returnTime);
- return Cache::LookupResult{returnData, returnTime};
+ _data = data;
}
-void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) {
- invariant(_isInitialized.load());
- // startupPeriodicReloader() must be called only once
+void ShardRegistry::init() {
+ invariant(!_isInitialized.load());
+ {
+ stdx::unique_lock<Latch> lock(_mutex);
+ _configShard =
+ _shardFactory->createShard(ShardRegistry::kConfigServerShardId, _initConfigServerCS);
+ _data = ShardRegistryData::createWithConfigShardOnly(_configShard);
+ }
+ _isInitialized.store(true);
+}
+
+void ShardRegistry::startup(OperationContext* opCtx) {
+ // startup() must be called only once
invariant(!_executor);
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
@@ -222,17 +240,16 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) {
// construct task executor
auto net = executor::makeNetworkInterface("ShardRegistryUpdater", nullptr, std::move(hookList));
auto netPtr = net.get();
- _executor = std::make_unique<executor::ThreadPoolTaskExecutor>(
- std::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net));
+ _executor = std::make_unique<ThreadPoolTaskExecutor>(
+ std::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
LOGV2_DEBUG(22724, 1, "Starting up task executor for periodic reloading of ShardRegistry");
_executor->startup();
auto status =
- _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); });
+ _executor->scheduleWork([this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); });
if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
- LOGV2_DEBUG(
- 22725, 1, "Can't schedule Shard Registry reload. Executor shutdown in progress");
+ LOGV2_DEBUG(22725, 1, "Cant schedule Shard Registry reload. Executor shutdown in progress");
return;
}
@@ -244,27 +261,7 @@ void ShardRegistry::startupPeriodicReloader(OperationContext* opCtx) {
}
}
-void ShardRegistry::shutdownPeriodicReloader() {
- if (_executor) {
- LOGV2_DEBUG(22723, 1, "Shutting down task executor for reloading shard registry");
- _executor->shutdown();
- _executor->join();
- _executor.reset();
- }
-}
-
-void ShardRegistry::shutdown() {
- shutdownPeriodicReloader();
-
- if (!_isShutdown.load()) {
- LOGV2_DEBUG(4620235, 1, "Shutting down shard registry");
- _threadPool.shutdown();
- _threadPool.join();
- _isShutdown.store(true);
- }
-}
-
-void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) {
+void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) {
LOGV2_DEBUG(22726, 1, "Reloading shardRegistry");
if (!cbArgs.status.isOK()) {
LOGV2_WARNING(22734,
@@ -278,26 +275,21 @@ void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) {
auto opCtx = tc->makeOperationContext();
- auto refreshPeriod = kRefreshPeriod;
-
try {
reload(opCtx.get());
} catch (const DBException& e) {
- if (e.code() == ErrorCodes::ReadConcernMajorityNotAvailableYet) {
- refreshPeriod = Seconds(1);
- }
LOGV2(22727,
"Error running periodic reload of shard registry caused by {error}; will retry after "
"{shardRegistryReloadInterval}",
"Error running periodic reload of shard registry",
"error"_attr = redact(e),
- "shardRegistryReloadInterval"_attr = refreshPeriod);
+ "shardRegistryReloadInterval"_attr = kRefreshPeriod);
}
// reschedule itself
auto status =
- _executor->scheduleWorkAt(_executor->now() + refreshPeriod,
- [this](const CallbackArgs& cbArgs) { _periodicReload(cbArgs); });
+ _executor->scheduleWorkAt(_executor->now() + kRefreshPeriod,
+ [this](const CallbackArgs& cbArgs) { _internalReload(cbArgs); });
if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
LOGV2_DEBUG(
@@ -313,155 +305,85 @@ void ShardRegistry::_periodicReload(const CallbackArgs& cbArgs) {
}
}
-ConnectionString ShardRegistry::getConfigServerConnectionString() const {
- return getConfigShard()->getConnString();
-}
-
-std::shared_ptr<Shard> ShardRegistry::getConfigShard() const {
- stdx::lock_guard<Latch> lk(_mutex);
- return _configShardData.findShard(kConfigServerShardId);
+bool ShardRegistry::isUp() const {
+ return _isUp.load();
}
-StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
- const ShardId& shardId) {
- // First check if this is a config shard lookup.
- {
- stdx::lock_guard<Latch> lk(_mutex);
- if (auto shard = _configShardData.findShard(shardId)) {
- return shard;
+bool ShardRegistry::reload(OperationContext* opCtx) {
+ stdx::unique_lock<Latch> reloadLock(_reloadMutex);
+
+ if (_reloadState == ReloadState::Reloading) {
+ // Another thread is already in the process of reloading so no need to do duplicate work.
+ // There is also an issue if multiple threads are allowed to call getAllShards()
+ // simultaneously because there is no good way to determine which of the threads has the
+ // more recent version of the data.
+ try {
+ opCtx->waitForConditionOrInterrupt(
+ _inReloadCV, reloadLock, [&] { return _reloadState != ReloadState::Reloading; });
+ } catch (const DBException& e) {
+ LOGV2_DEBUG(22729,
+ 1,
+ "Error reloading shard registry caused by {error}",
+ "Error reloading shard registry",
+ "error"_attr = redact(e));
+ return false;
}
- }
- if (auto shard = _getData(opCtx)->findShard(shardId)) {
- return shard;
+ if (_reloadState == ReloadState::Idle) {
+ return false;
+ }
+ // else proceed to reload since an error occured on the last reload attempt.
+ invariant(_reloadState == ReloadState::Failed);
}
- // Reload and try again if the shard was not in the registry
- reload(opCtx);
- if (auto shard = _getData(opCtx)->findShard(shardId)) {
- return shard;
- }
+ _reloadState = ReloadState::Reloading;
+ reloadLock.unlock();
- return {ErrorCodes::ShardNotFound, str::stream() << "Shard " << shardId << " not found"};
-}
+ auto nextReloadState = ReloadState::Failed;
-void ShardRegistry::getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all) {
- std::set<ShardId> seen;
- auto data = _getData(opCtx);
- data->getAllShardIds(seen);
- if (seen.empty()) {
- reload(opCtx);
- data = _getData(opCtx);
- data->getAllShardIds(seen);
- }
- all->assign(seen.begin(), seen.end());
-}
+ auto failGuard = makeGuard([&] {
+ if (!reloadLock.owns_lock()) {
+ reloadLock.lock();
+ }
+ _reloadState = nextReloadState;
+ _inReloadCV.notify_all();
+ });
-int ShardRegistry::getNumShards(OperationContext* opCtx) {
- std::set<ShardId> seen;
- auto data = _getData(opCtx);
- data->getAllShardIds(seen);
- return seen.size();
-}
+ ShardRegistryData reloadedData =
+ ShardRegistryData::createFromCatalogClient(opCtx, _shardFactory.get(), getConfigShard());
-std::pair<std::vector<ShardRegistry::LatestConnStrings::value_type>, ShardRegistry::Increment>
-ShardRegistry::_getLatestConnStrings() const {
stdx::unique_lock<Latch> lock(_mutex);
- return {{_latestConnStrings.begin(), _latestConnStrings.end()}, _rsmIncrement.load()};
-}
-
-void ShardRegistry::updateReplSetHosts(const ConnectionString& newConnString) {
- invariant(newConnString.type() == ConnectionString::SET ||
- newConnString.type() == ConnectionString::CUSTOM); // For dbtests
-
- stdx::lock_guard<Latch> lk(_mutex);
- if (auto shard = _configShardData.findByRSName(newConnString.getSetName())) {
- auto newData = ShardRegistryData::createFromExisting(
- _configShardData, newConnString, _shardFactory.get());
- _configShardData = newData;
-
- } else {
- // Stash the new connection string and bump the RSM increment.
- _latestConnStrings[newConnString.getSetName()] = newConnString;
- auto value = _rsmIncrement.addAndFetch(1);
- LOGV2_DEBUG(4620252,
- 2,
- "ShardRegistry stashed new connection string",
- "newConnString"_attr = newConnString,
- "newRSMIncrement"_attr = value);
- }
-
- // Schedule a lookup, to incorporate the new connection string.
- // TODO SERVER-50910: To avoid needing to use a separate thread to schedule the lookup, make
- // _getData() async.
- auto status = Grid::get(_service)->getExecutorPool()->getFixedExecutor()->scheduleWork(
- [this](const CallbackArgs& cbArgs) {
- ThreadClient tc("shard-registry-rsm-reload", _service);
-
- auto opCtx = tc->makeOperationContext();
-
- try {
- _getData(opCtx.get());
- } catch (const DBException& e) {
- LOGV2(4620201,
- "Error running reload of ShardRegistry for RSM update, caused by {error}",
- "Error running reload of ShardRegistry for RSM update",
- "error"_attr = redact(e));
- }
- });
-
- if (status.getStatus() == ErrorCodes::ShutdownInProgress) {
- LOGV2_DEBUG(
- 4620202,
- 1,
- "Can't schedule ShardRegistry reload for RSM update, executor shutdown in progress");
- return;
- }
- if (!status.isOK()) {
- LOGV2_FATAL(4620203,
- "Error scheduling ShardRegistry reload for RSM update, caused by {error}",
- "Error scheduling ShardRegistry reload for RSM update",
- "error"_attr = redact(status.getStatus()));
- }
-}
+ auto [mergedData, removedShards] = ShardRegistryData::mergeExisting(_data, reloadedData);
+ _data = std::move(mergedData);
-std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
- return _shardFactory->createUniqueShard(ShardId("<unnamed>"), connStr);
-}
+ lock.unlock();
-bool ShardRegistry::isUp() const {
- return _isUp.load();
-}
+ // Remove RSMs that are not in the catalog any more.
+ for (auto& pair : removedShards) {
+ auto& shardId = pair.first;
+ auto& shard = pair.second;
+ invariant(shard);
-void ShardRegistry::toBSON(BSONObjBuilder* result) const {
- BSONObjBuilder map;
- BSONObjBuilder hosts;
- BSONObjBuilder connStrings;
- auto data = _getCachedData();
- data->toBSON(&map, &hosts, &connStrings);
- {
- stdx::lock_guard<Latch> lk(_mutex);
- _configShardData.toBSON(&map, &hosts, &connStrings);
+ auto name = shard->getConnString().getSetName();
+ ReplicaSetMonitor::remove(name);
+ for (auto& callback : _shardRemovalHooks) {
+ // Run callbacks asynchronously.
+ ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor())
+ .getAsync([=](const Status&) { callback(shardId); });
+ }
}
- result->append("map", map.obj());
- result->append("hosts", hosts.obj());
- result->append("connStrings", connStrings.obj());
-}
-
-bool ShardRegistry::reload(OperationContext* opCtx) {
- // Make the next acquire do a lookup.
- auto value = _forceReloadIncrement.addAndFetch(1);
- LOGV2_DEBUG(4620253, 2, "Forcing ShardRegistry reload", "newForceReloadIncrement"_attr = value);
-
- // Force it to actually happen now.
- _getData(opCtx);
+ nextReloadState = ReloadState::Idle;
+ // first successful reload means that registry is up
+ _isUp.store(true);
return true;
}
void ShardRegistry::clearEntries() {
- _cache->invalidateAll();
+ ShardRegistryData empty;
+ stdx::lock_guard<Latch> lk(_mutex);
+ _data = empty;
}
void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext,
@@ -471,8 +393,7 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex
auto opCtx = tc->makeOperationContext();
auto const grid = Grid::get(opCtx.get());
- std::shared_ptr<Shard> s =
- grid->shardRegistry()->_getShardForRSNameNoReload(connStr.getSetName());
+ std::shared_ptr<Shard> s = grid->shardRegistry()->lookupRSName(connStr.getSetName());
if (!s) {
LOGV2_DEBUG(22730,
1,
@@ -506,93 +427,25 @@ void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContex
}
}
-// Inserts the initial empty ShardRegistryData into the cache, if the cache is empty.
-void ShardRegistry::_initializeCacheIfNecessary() const {
- if (!_cache->peekLatestCached(_kSingleton)) {
- stdx::lock_guard<Latch> lk(_mutex);
- if (!_cache->peekLatestCached(_kSingleton)) {
- _cache->insertOrAssign(_kSingleton, {}, Date_t::now(), Time());
- }
- }
-}
-
-ShardRegistry::Cache::ValueHandle ShardRegistry::_getData(OperationContext* opCtx) {
- _initializeCacheIfNecessary();
-
- // If the forceReloadIncrement is 0, then we've never done a lookup, so we should be sure to do
- // one now.
- Increment uninitializedIncrement{0};
- _forceReloadIncrement.compareAndSwap(&uninitializedIncrement, 1);
-
- // Update the time the cache should be aiming for.
- auto now = VectorClock::get(opCtx)->getTime();
- // The topologyTime should be advanced to either the actual topologyTime (if it is being
- // gossiped), or else the previously cached topologyTime value (so that this part of the cache's
- // time doesn't advance, if topologyTime isn't being gossiped).
- Timestamp topologyTime = useActualTopologyTime()
- ? now.topologyTime().asTimestamp()
- : _cache->peekLatestCached(_kSingleton).getTime().topologyTime;
- _cache->advanceTimeInStore(
- _kSingleton, Time(topologyTime, _rsmIncrement.load(), _forceReloadIncrement.load()));
-
- return _cache->acquire(opCtx, _kSingleton, CacheCausalConsistency::kLatestKnown);
-}
-
-// TODO SERVER-50206: Remove usage of these non-causally consistent accessors.
-
-ShardRegistry::Cache::ValueHandle ShardRegistry::_getCachedData() const {
- _initializeCacheIfNecessary();
- return _cache->peekLatestCached(_kSingleton);
-}
-
-std::shared_ptr<Shard> ShardRegistry::getShardNoReload(const ShardId& shardId) const {
- // First check if this is a config shard lookup.
- {
- stdx::lock_guard<Latch> lk(_mutex);
- if (auto shard = _configShardData.findShard(shardId)) {
- return shard;
- }
- }
- auto data = _getCachedData();
- return data->findShard(shardId);
-}
-
-std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort& host) const {
- // First check if this is a config shard lookup.
+void ShardRegistry::toBSON(BSONObjBuilder* result) const {
+ std::vector<std::shared_ptr<Shard>> shards;
{
stdx::lock_guard<Latch> lk(_mutex);
- if (auto shard = _configShardData.findByHostAndPort(host)) {
- return shard;
- }
+ _data.getAllShards(shards);
}
- auto data = _getCachedData();
- return data->findByHostAndPort(host);
-}
-void ShardRegistry::getAllShardIdsNoReload(std::vector<ShardId>* all) const {
- std::set<ShardId> seen;
- auto data = _getCachedData();
- data->getAllShardIds(seen);
- all->assign(seen.begin(), seen.end());
-}
-
-int ShardRegistry::getNumShardsNoReload() const {
- std::set<ShardId> seen;
- auto data = _getCachedData();
- data->getAllShardIds(seen);
- return seen.size();
-}
+ std::sort(std::begin(shards),
+ std::end(shards),
+ [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) {
+ return lhs->getId() < rhs->getId();
+ });
-std::shared_ptr<Shard> ShardRegistry::_getShardForRSNameNoReload(const std::string& name) const {
- // First check if this is a config shard lookup.
- {
- stdx::lock_guard<Latch> lk(_mutex);
- if (auto shard = _configShardData.findByRSName(name)) {
- return shard;
- }
+ BSONObjBuilder mapBob(result->subobjStart("map"));
+ for (auto&& shard : shards) {
+ // Intentionally calling getConnString while not holding _mutex
+ // because it can take ReplicaSetMonitor::SetState::mutex if it's ShardRemote.
+ mapBob.append(shard->getId(), shard->getConnString().toString());
}
- auto data = _getCachedData();
- return data->findByRSName(name);
}
////////////// ShardRegistryData //////////////////
@@ -603,8 +456,9 @@ ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr<S
return data;
}
-std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClient(
- OperationContext* opCtx, ShardFactory* shardFactory) {
+ShardRegistryData ShardRegistryData::createFromCatalogClient(OperationContext* opCtx,
+ ShardFactory* shardFactory,
+ std::shared_ptr<Shard> configShard) {
auto const catalogClient = Grid::get(opCtx)->catalogClient();
auto readConcern = repl::ReadConcernLevel::kMajorityReadConcern;
@@ -635,7 +489,6 @@ std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClie
// Do this before re-taking the mutex to avoid deadlock with the ReplicaSetMonitor updating
// hosts for a given shard.
std::vector<std::tuple<std::string, ConnectionString>> shardsInfo;
- Timestamp maxTopologyTime;
for (const auto& shardType : shards) {
// This validation should ideally go inside the ShardType::validate call. However, doing
// it there would prevent us from loading previously faulty shard hosts, which might have
@@ -649,15 +502,11 @@ std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClie
continue;
}
- if (auto thisTopologyTime = shardType.getTopologyTime();
- maxTopologyTime < thisTopologyTime) {
- maxTopologyTime = thisTopologyTime;
- }
-
shardsInfo.push_back(std::make_tuple(shardType.getName(), shardHostStatus.getValue()));
}
ShardRegistryData data;
+ data._addShard(configShard, true);
for (auto& shardInfo : shardsInfo) {
if (std::get<0>(shardInfo) == "config") {
continue;
@@ -668,7 +517,7 @@ std::pair<ShardRegistryData, Timestamp> ShardRegistryData::createFromCatalogClie
data._addShard(std::move(shard), false);
}
- return {data, maxTopologyTime};
+ return data;
}
std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mergeExisting(
@@ -701,20 +550,21 @@ std::pair<ShardRegistryData, ShardRegistryData::ShardMap> ShardRegistryData::mer
return {mergedData, removedShards};
}
-ShardRegistryData ShardRegistryData::createFromExisting(const ShardRegistryData& existingData,
- const ConnectionString& newConnString,
- ShardFactory* shardFactory) {
+std::pair<ShardRegistryData, std::shared_ptr<Shard>> ShardRegistryData::createFromExisting(
+ const ShardRegistryData& existingData,
+ const ConnectionString& newConnString,
+ ShardFactory* shardFactory) {
ShardRegistryData data(existingData);
auto it = data._rsLookup.find(newConnString.getSetName());
if (it == data._rsLookup.end()) {
- return data;
+ return {data, nullptr};
}
invariant(it->second);
auto updatedShard = shardFactory->createShard(it->second->getId(), newConnString);
data._addShard(updatedShard, true);
- return data;
+ return {data, updatedShard};
}
std::shared_ptr<Shard> ShardRegistryData::findByRSName(const std::string& name) const {
@@ -832,70 +682,4 @@ void ShardRegistryData::_addShard(std::shared_ptr<Shard> shard, bool useOriginal
}
}
-void ShardRegistryData::toBSON(BSONObjBuilder* map,
- BSONObjBuilder* hosts,
- BSONObjBuilder* connStrings) const {
- std::vector<std::shared_ptr<Shard>> shards;
- getAllShards(shards);
-
- std::sort(std::begin(shards),
- std::end(shards),
- [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) {
- return lhs->getId() < rhs->getId();
- });
-
- if (map) {
- for (auto&& shard : shards) {
- map->append(shard->getId(), shard->getConnString().toString());
- }
- }
-
- if (hosts) {
- for (const auto& hostIt : _hostLookup) {
- hosts->append(hostIt.first.toString(), hostIt.second->getId());
- }
- }
-
- if (connStrings) {
- for (const auto& connStringIt : _connStringLookup) {
- connStrings->append(connStringIt.first.toString(), connStringIt.second->getId());
- }
- }
-}
-
-void ShardRegistryData::toBSON(BSONObjBuilder* result) const {
- std::vector<std::shared_ptr<Shard>> shards;
- getAllShards(shards);
-
- std::sort(std::begin(shards),
- std::end(shards),
- [](std::shared_ptr<const Shard> lhs, std::shared_ptr<const Shard> rhs) {
- return lhs->getId() < rhs->getId();
- });
-
- BSONObjBuilder mapBob(result->subobjStart("map"));
- for (auto&& shard : shards) {
- mapBob.append(shard->getId(), shard->getConnString().toString());
- }
- mapBob.done();
-
- BSONObjBuilder hostsBob(result->subobjStart("hosts"));
- for (const auto& hostIt : _hostLookup) {
- hostsBob.append(hostIt.first.toString(), hostIt.second->getId());
- }
- hostsBob.done();
-
- BSONObjBuilder connStringsBob(result->subobjStart("connStrings"));
- for (const auto& connStringIt : _connStringLookup) {
- connStringsBob.append(connStringIt.first.toString(), connStringIt.second->getId());
- }
- connStringsBob.done();
-}
-
-BSONObj ShardRegistryData::toBSON() const {
- BSONObjBuilder bob;
- toBSON(&bob);
- return bob.obj();
-}
-
} // namespace mongo
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index 711c8e0d34b..486788c57dd 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -29,24 +29,30 @@
#pragma once
+#include <memory>
+#include <set>
#include <string>
#include <vector>
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/client/connection_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/server_options.h"
+#include "mongo/db/jsobj.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/s/client/shard.h"
-#include "mongo/s/client/shard_factory.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/unordered_map.h"
-#include "mongo/util/concurrency/thread_pool.h"
-#include "mongo/util/net/hostandport.h"
-#include "mongo/util/read_through_cache.h"
+#include "mongo/util/concurrency/with_lock.h"
namespace mongo {
+class BSONObjBuilder;
+struct HostAndPort;
+class NamespaceString;
+class OperationContext;
+class ServiceContext;
+class ShardFactory;
+class Shard;
+class ShardType;
+
class ShardRegistryData {
public:
using ShardMap = stdx::unordered_map<ShardId, std::shared_ptr<Shard>, ShardId::Hasher>;
@@ -61,8 +67,9 @@ public:
/**
* Reads shards docs from the catalog client and fills in maps.
*/
- static std::pair<ShardRegistryData, Timestamp> createFromCatalogClient(
- OperationContext* opCtx, ShardFactory* shardFactory);
+ static ShardRegistryData createFromCatalogClient(OperationContext* opCtx,
+ ShardFactory* shardFactory,
+ std::shared_ptr<Shard> configShard);
/**
* Merges alreadyCachedData and configServerData into a new ShardRegistryData.
@@ -86,9 +93,10 @@ public:
* Create a duplicate of existingData, but additionally updates the shard for newConnString.
* Used when notified by the RSM of a new connection string from a shard.
*/
- static ShardRegistryData createFromExisting(const ShardRegistryData& existingData,
- const ConnectionString& newConnString,
- ShardFactory* shardFactory);
+ static std::pair<ShardRegistryData, std::shared_ptr<Shard>> createFromExisting(
+ const ShardRegistryData& existingData,
+ const ConnectionString& newConnString,
+ ShardFactory* shardFactory);
/**
* Returns the shard with the given shard id, connection string, or host and port.
@@ -120,10 +128,6 @@ public:
*/
void getAllShards(std::vector<std::shared_ptr<Shard>>& result) const;
- void toBSON(BSONObjBuilder* result) const;
- void toBSON(BSONObjBuilder* map, BSONObjBuilder* hosts, BSONObjBuilder* connStrings) const;
- BSONObj toBSON() const;
-
private:
/**
* Returns the shard with the given shard id, or nullptr if no such shard.
@@ -194,27 +198,9 @@ public:
~ShardRegistry();
/**
- * Initializes ShardRegistry with config shard. Must be called outside c-tor to avoid calls on
- * this while its still not fully constructed.
- */
- void init(ServiceContext* service);
-
- /**
- * Startup the periodic reloader of the ShardRegistry.
- * Can be called only after ShardRegistry::init()
+ * Starts ReplicaSetMonitor by adding a config shard.
*/
- void startupPeriodicReloader(OperationContext* opCtx);
-
- /**
- * Shutdown the periodic reloader of the ShardRegistry.
- */
- void shutdownPeriodicReloader();
-
- /**
- * Shuts down the threadPool. Needs to be called explicitly because ShardRegistry is never
- * destroyed as it's owned by the static grid object.
- */
- void shutdown();
+ void startup(OperationContext* opCtx);
/**
* This is invalid to use on the config server and will hit an invariant if it is done.
@@ -226,37 +212,56 @@ public:
ConnectionString getConfigServerConnectionString() const;
/**
- * Returns shared pointer to the shard object representing the config servers.
- *
- * The config shard is always known, so this function never blocks.
+ * Reloads the ShardRegistry based on the contents of the config server's config.shards
+ * collection. Returns true if this call performed a reload and false if this call only waited
+ * for another thread to perform the reload and did not actually reload. Because of this, it is
+ * possible that calling reload once may not result in the most up to date view. If strict
+ * reloading is required, the caller should call this method one more time if the first call
+ * returned false.
*/
- std::shared_ptr<Shard> getConfigShard() const;
+ bool reload(OperationContext* opCtx);
+
+ /**
+ * Clears all entries from the shard registry entries, which will force the registry to do a
+ * reload on next access.
+ */
+ void clearEntries();
+
+ /**
+ * Takes a connection string describing either a shard or config server replica set, looks
+ * up the corresponding Shard object based on the replica set name, then updates the
+ * ShardRegistry's notion of what hosts make up that shard.
+ */
+ void updateReplSetHosts(const ConnectionString& newConnString);
/**
* Returns a shared pointer to the shard object with the given shard id, or ShardNotFound error
* otherwise.
*
* May refresh the shard registry if there's no cached information about the shard. The shardId
- * parameter can actually be the shard name or the HostAndPort for any server in the shard.
+ * parameter can actually be the shard name or the HostAndPort for any
+ * server in the shard.
*/
StatusWith<std::shared_ptr<Shard>> getShard(OperationContext* opCtx, const ShardId& shardId);
/**
- * Populates all known shard ids into the given vector.
+ * Returns a shared pointer to the shard object with the given shard id. The shardId parameter
+ * can actually be the shard name or the HostAndPort for any server in the shard. Will not
+ * refresh the shard registry or otherwise perform any network traffic. This means that if the
+ * shard was recently added it may not be found. USE WITH CAUTION.
*/
- void getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all);
+ std::shared_ptr<Shard> getShardNoReload(const ShardId& shardId);
/**
- * Returns the number of shards.
+ * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not
+ * refresh the shard registry or otherwise perform any network traffic.
*/
- int getNumShards(OperationContext* opCtx);
+ std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost);
/**
- * Takes a connection string describing either a shard or config server replica set, looks
- * up the corresponding Shard object based on the replica set name, then updates the
- * ShardRegistry's notion of what hosts make up that shard.
+ * Returns shared pointer to the shard object representing the config servers.
*/
- void updateReplSetHosts(const ConnectionString& newConnString);
+ std::shared_ptr<Shard> getConfigShard() const;
/**
* Instantiates a new detached shard connection, which does not appear in the list of shards
@@ -269,28 +274,36 @@ public:
std::unique_ptr<Shard> createConnection(const ConnectionString& connStr) const;
/**
- * The ShardRegistry is "up" once a successful lookup from the config servers has been
- * completed.
+ * Lookup shard by replica set name. Returns nullptr if the name can't be found.
+ * Note: this doesn't refresh the table if the name isn't found, so it's possible that a
+ * newly added shard/Replica Set may not be found.
*/
- bool isUp() const;
+ std::shared_ptr<Shard> lookupRSName(const std::string& name) const;
+
+ void getAllShardIdsNoReload(std::vector<ShardId>* all) const;
+
+ /**
+ * Like getAllShardIdsNoReload(), but does a reload internally in the case that
+ * getAllShardIdsNoReload() comes back empty
+ */
+ void getAllShardIds(OperationContext* opCtx, std::vector<ShardId>* all);
+
+ int getNumShards() const;
void toBSON(BSONObjBuilder* result) const;
+ bool isUp() const;
/**
- * Reloads the ShardRegistry based on the contents of the config server's config.shards
- * collection. Returns true if this call performed a reload and false if this call only waited
- * for another thread to perform the reload and did not actually reload. Because of this, it is
- * possible that calling reload once may not result in the most up to date view. If strict
- * reloading is required, the caller should call this method one more time if the first call
- * returned false.
+ * Initializes ShardRegistry with config shard. Must be called outside c-tor to avoid calls on
+ * this while its still not fully constructed.
*/
- bool reload(OperationContext* opCtx);
+ void init();
/**
- * Clears all entries from the shard registry entries, which will force the registry to do a
- * reload on next access.
+ * Shuts down _executor. Needs to be called explicitly because ShardRegistry is never destroyed
+ * as it's owned by the static grid object.
*/
- void clearEntries();
+ void shutdown();
/**
* For use in mongos which needs notifications about changes to shard replset membership to
@@ -299,136 +312,8 @@ public:
static void updateReplicaSetOnConfigServer(ServiceContext* serviceContex,
const ConnectionString& connStr) noexcept;
- // TODO SERVER-50206: Remove usage of these non-causally consistent accessors.
- //
- // Their most important current users are dispatching requests to hosts, and processing
- // responses from hosts. These contexts need to know the shard that the host is associated
- // with, but usually have no access to any associated opCtx (if there even is one), and also
- // cannot tolerate waiting for further network activity (if the cache is stale and needs to be
- // refreshed via _lookup()).
-
- /**
- * Returns a shared pointer to the shard object with the given shard id. The shardId parameter
- * can actually be the shard name or the HostAndPort for any server in the shard. Will not
- * refresh the shard registry or otherwise perform any network traffic. This means that if the
- * shard was recently added it may not be found. USE WITH CAUTION.
- */
- std::shared_ptr<Shard> getShardNoReload(const ShardId& shardId) const;
-
- /**
- * Finds the Shard that the mongod listening at this HostAndPort is a member of. Will not
- * refresh the shard registry or otherwise perform any network traffic.
- */
- std::shared_ptr<Shard> getShardForHostNoReload(const HostAndPort& shardHost) const;
-
- void getAllShardIdsNoReload(std::vector<ShardId>* all) const;
-
- int getNumShardsNoReload() const;
-
private:
- /**
- * The ShardRegistry uses the ReadThroughCache to handle refreshing itself. The cache stores
- * a single entry, with key of Singleton, value of ShardRegistryData, and causal-consistency
- * time which is primarily Timestamp (based on the TopologyTime), but with additional
- * "increment"s that are used to flag additional refresh criteria.
- */
-
- using Increment = int64_t;
-
- struct Time {
- explicit Time() {}
-
- explicit Time(Timestamp topologyTime,
- Increment rsmIncrement,
- Increment forceReloadIncrement)
- : topologyTime(topologyTime),
- rsmIncrement(rsmIncrement),
- forceReloadIncrement(forceReloadIncrement) {}
-
- bool operator==(const Time& other) const {
- return topologyTime == other.topologyTime && rsmIncrement == other.rsmIncrement &&
- forceReloadIncrement == other.forceReloadIncrement;
- }
- bool operator!=(const Time& other) const {
- return !(*this == other);
- }
- bool operator>(const Time& other) const {
- return topologyTime > other.topologyTime || rsmIncrement > other.rsmIncrement ||
- forceReloadIncrement > other.forceReloadIncrement;
- }
- bool operator>=(const Time& other) const {
- return (*this > other) || (*this == other);
- }
- bool operator<(const Time& other) const {
- return !(*this >= other);
- }
- bool operator<=(const Time& other) const {
- return !(*this > other);
- }
-
- BSONObj toBSON() const {
- BSONObjBuilder bob;
- bob.append("topologyTime", topologyTime);
- bob.append("rsmIncrement", rsmIncrement);
- bob.append("forceReloadIncrement", forceReloadIncrement);
- return bob.obj();
- }
-
- Timestamp topologyTime;
-
- // The increments are used locally to trigger the lookup function.
- //
- // The rsmIncrement is used to indicate that that there are stashed RSM updates that need to
- // be incorporated.
- //
- // The forceReloadIncrement is used to indicate that the latest data should be fetched from
- // the configsvrs (ie. when the topologyTime can't be used for this, eg. in the first
- // lookup, and in contexts like unittests where topologyTime isn't gossipped but the
- // ShardRegistry still needs to be reloaded). This is how reload() is able to force a
- // refresh from the config servers - incrementing the forceReloadIncrement causes the cache
- // to call _lookup() (rather than having reload() attempt to do a synchronous refresh).
- Increment rsmIncrement{0};
- Increment forceReloadIncrement{0};
- };
-
- enum class Singleton { Only };
- static constexpr auto _kSingleton = Singleton::Only;
-
- using Cache = ReadThroughCache<Singleton, ShardRegistryData, Time>;
-
- Cache::LookupResult _lookup(OperationContext* opCtx,
- const Singleton& key,
- const Cache::ValueHandle& cachedData,
- const Time& timeInStore);
-
- /**
- * Gets a causally-consistent (ie. latest-known) copy of the ShardRegistryData, refreshing from
- * the config servers if necessary.
- */
- Cache::ValueHandle _getData(OperationContext* opCtx);
-
- /**
- * Gets the latest-cached copy of the ShardRegistryData. Never fetches from the config servers.
- * Only used by the "NoReload" accessors.
- * TODO SERVER-50206: Remove usage of this non-causally consistent accessor.
- */
- Cache::ValueHandle _getCachedData() const;
-
- /**
- * Lookup shard by replica set name. Returns nullptr if the name can't be found.
- * Note: this doesn't refresh the table if the name isn't found, so it's possible that a
- * newly added shard/Replica Set may not be found.
- * TODO SERVER-50206: Remove usage of this non-causally consistent accessor.
- */
- std::shared_ptr<Shard> _getShardForRSNameNoReload(const std::string& name) const;
-
- using LatestConnStrings = stdx::unordered_map<ShardId, ConnectionString, ShardId::Hasher>;
-
- std::pair<std::vector<LatestConnStrings::value_type>, Increment> _getLatestConnStrings() const;
-
- void _initializeCacheIfNecessary() const;
-
- void _periodicReload(const executor::TaskExecutor::CallbackArgs& cbArgs);
+ void _internalReload(const executor::TaskExecutor::CallbackArgs& cbArgs);
/**
* Factory to create shards. Never changed after startup so safe to access outside of _mutex.
@@ -441,37 +326,24 @@ private:
*/
const ConnectionString _initConfigServerCS;
+ AtomicWord<bool> _isInitialized{false};
+
/**
* A list of callbacks to be called asynchronously when it has been discovered that a shard was
* removed.
*/
const std::vector<ShardRemovalHook> _shardRemovalHooks;
- // Thread pool used when looking up new values for the cache (ie. in which _lookup() runs).
- ThreadPool _threadPool;
-
- // Executor for periodically reloading the registry (ie. in which _periodicReload() runs).
- std::unique_ptr<executor::TaskExecutor> _executor{};
-
- mutable Mutex _cacheMutex = MONGO_MAKE_LATCH("ShardRegistry::_cacheMutex");
- std::unique_ptr<Cache> _cache;
-
- // Counters for incrementing the rsmIncrement and forceReloadIncrement fields of the Time used
- // by the _cache. See the comments for these fields in the Time class above for an explanation
- // of their purpose.
- AtomicWord<Increment> _rsmIncrement{0};
- AtomicWord<Increment> _forceReloadIncrement{0};
-
- // Protects _configShardData, and _latestNewConnStrings.
+ // Protects the ShardRegistryData lookup maps in _data, and _configShard.
mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardRegistry::_mutex");
- // Store a reference to the configShard.
- ShardRegistryData _configShardData;
+ ShardRegistryData _data;
- // The key is replset name (the type is ShardId just to take advantage of its hasher).
- LatestConnStrings _latestConnStrings;
+ // Store a separate reference to the configShard.
+ std::shared_ptr<Shard> _configShard;
- AtomicWord<bool> _isInitialized{false};
+ // Executor for reloading.
+ std::unique_ptr<executor::TaskExecutor> _executor{};
// The ShardRegistry is "up" once there has been a successful refresh.
AtomicWord<bool> _isUp{false};
@@ -479,7 +351,17 @@ private:
// Set to true in shutdown call to prevent calling it twice.
AtomicWord<bool> _isShutdown{false};
- ServiceContext* _service{nullptr};
+ // Protects the _reloadState during startup and refresh.
+ mutable Mutex _reloadMutex = MONGO_MAKE_LATCH("ShardRegistry::_reloadMutex");
+ stdx::condition_variable _inReloadCV;
+
+ enum class ReloadState {
+ Idle, // no other thread is loading data from config server in reload().
+ Reloading, // another thread is loading data from the config server in reload().
+ Failed, // last call to reload() caused an error when contacting the config server.
+ };
+
+ ReloadState _reloadState{ReloadState::Idle};
};
} // namespace mongo
diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp
index ea28033590e..c2e3cd011ac 100644
--- a/src/mongo/s/grid.cpp
+++ b/src/mongo/s/grid.cpp
@@ -90,7 +90,7 @@ void Grid::init(std::unique_ptr<ShardingCatalogClient> catalogClient,
_executorPool = std::move(executorPool);
_network = network;
- _shardRegistry->init(grid.owner(this));
+ _shardRegistry->init();
}
bool Grid::isShardingInitialized() const {
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 2b73154e767..e3baf3d6e60 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -218,7 +218,7 @@ Status initializeGlobalShardingState(OperationContext* opCtx,
networkPtr);
// The shard registry must be started once the grid is initialized
- grid->shardRegistry()->startupPeriodicReloader(opCtx);
+ grid->shardRegistry()->startup(opCtx);
// The catalog client must be started after the shard registry has been started up
grid->catalogClient()->startup();
diff --git a/src/mongo/util/invalidating_lru_cache.h b/src/mongo/util/invalidating_lru_cache.h
index 2852313ef22..c8ead4adecc 100644
--- a/src/mongo/util/invalidating_lru_cache.h
+++ b/src/mongo/util/invalidating_lru_cache.h
@@ -67,18 +67,6 @@ struct CacheNotCausallyConsistent {
};
/**
- * Helper for determining if a given type is CacheNotCausallyConsistent or not.
- */
-template <typename T>
-struct isCausallyConsistentImpl : std::true_type {};
-
-template <>
-struct isCausallyConsistentImpl<CacheNotCausallyConsistent> : std::false_type {};
-
-template <class T>
-inline constexpr bool isCausallyConsistent = isCausallyConsistentImpl<T>::value;
-
-/**
* Specifies the desired causal consistency for calls to 'get' (and 'acquire', respectively in the
* ReadThroughCache, which is its main consumer).
*/
@@ -212,8 +200,12 @@ public:
// doesn't support pinning items. Their only usage must be in the authorization mananager
// for the internal authentication user.
explicit ValueHandle(Value&& value)
- : _value(std::make_shared<StoredValue>(
- nullptr, 0, boost::none, std::move(value), Time(), Time())) {}
+ : _value(std::make_shared<StoredValue>(nullptr,
+ 0,
+ boost::none,
+ std::move(value),
+ CacheNotCausallyConsistent(),
+ CacheNotCausallyConsistent())) {}
explicit ValueHandle(Value&& value, const Time& t)
: _value(
@@ -226,15 +218,9 @@ public:
}
bool isValid() const {
- invariant(bool(*this));
return _value->isValid.loadRelaxed();
}
- const Time& getTime() const {
- invariant(bool(*this));
- return _value->time;
- }
-
Value* get() {
invariant(bool(*this));
return &_value->value;
@@ -274,16 +260,13 @@ public:
* was called, it will become invalidated.
*
* The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
- * (since the time never changes).
+ * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows
+ * non-causally-consistent users to not have to pass a second parameter, but would fail
+ * compilation if causally-consistent users forget to pass it.
*/
- void insertOrAssign(const Key& key, Value&& value) {
- MONGO_STATIC_ASSERT_MSG(
- !isCausallyConsistent<Time>,
- "Time must be passed to insertOrAssign on causally consistent caches");
- insertOrAssign(key, std::move(value), Time());
- }
-
- void insertOrAssign(const Key& key, Value&& value, const Time& time) {
+ void insertOrAssign(const Key& key,
+ Value&& value,
+ const Time& time = CacheNotCausallyConsistent()) {
LockGuardWithPostUnlockDestructor guard(_mutex);
Time currentTime, currentTimeInStore;
_invalidate(&guard, key, _cache.find(key), &currentTime, &currentTimeInStore);
@@ -324,16 +307,13 @@ public:
* destroyed.
*
* The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
- * (since the time never changes).
+ * (since the time never changes). Using a default of '= CacheNotCausallyConsistent()' allows
+ * non-causally-consistent users to not have to pass a second parameter, but would fail
+ * compilation if causally-consistent users forget to pass it.
*/
- ValueHandle insertOrAssignAndGet(const Key& key, Value&& value) {
- MONGO_STATIC_ASSERT_MSG(
- !isCausallyConsistent<Time>,
- "Time must be passed to insertOrAssignAndGet on causally consistent caches");
- return insertOrAssignAndGet(key, std::move(value), Time());
- }
-
- ValueHandle insertOrAssignAndGet(const Key& key, Value&& value, const Time& time) {
+ ValueHandle insertOrAssignAndGet(const Key& key,
+ Value&& value,
+ const Time& time = CacheNotCausallyConsistent()) {
LockGuardWithPostUnlockDestructor guard(_mutex);
Time currentTime, currentTimeInStore;
_invalidate(&guard, key, _cache.find(key), &currentTime, &currentTimeInStore);
diff --git a/src/mongo/util/read_through_cache.h b/src/mongo/util/read_through_cache.h
index 32efbc576ae..72b3e7a5771 100644
--- a/src/mongo/util/read_through_cache.h
+++ b/src/mongo/util/read_through_cache.h
@@ -152,10 +152,6 @@ public:
return _valueHandle.isValid();
}
- const Time& getTime() const {
- return _valueHandle.getTime();
- }
-
Value* get() {
return &_valueHandle->value;
}
@@ -306,33 +302,6 @@ public:
/**
* Invalidates the given 'key' and immediately replaces it with a new value.
- *
- * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
- * (since the time never changes).
- */
- void insertOrAssign(const Key& key, Value&& newValue, Date_t updateWallClockTime) {
- stdx::lock_guard lg(_mutex);
- if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- it->second->invalidateAndCancelCurrentLookupRound(lg);
- _cache.insertOrAssign(key, {std::move(newValue), updateWallClockTime});
- }
-
- void insertOrAssign(const Key& key,
- Value&& newValue,
- Date_t updateWallClockTime,
- const Time& time) {
- stdx::lock_guard lg(_mutex);
- if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- it->second->invalidateAndCancelCurrentLookupRound(lg);
- _cache.insertOrAssign(key, {std::move(newValue), updateWallClockTime}, time);
- }
-
- /**
- * Invalidates the given 'key' and immediately replaces it with a new value, returning a handle
- * to the new value.
- *
- * The 'time' parameter is mandatory for causally-consistent caches, but not needed otherwise
- * (since the time never changes).
*/
ValueHandle insertOrAssignAndGet(const Key& key, Value&& newValue, Date_t updateWallClockTime) {
stdx::lock_guard lg(_mutex);
@@ -341,16 +310,6 @@ public:
return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime});
}
- ValueHandle insertOrAssignAndGet(const Key& key,
- Value&& newValue,
- Date_t updateWallClockTime,
- const Time& time) {
- stdx::lock_guard lg(_mutex);
- if (auto it = _inProgressLookups.find(key); it != _inProgressLookups.end())
- it->second->invalidateAndCancelCurrentLookupRound(lg);
- return _cache.insertOrAssignAndGet(key, {std::move(newValue), updateWallClockTime}, time);
- }
-
/**
* Indicates to the cache that the backing store has a newer version of 'key', corresponding to
* 'newTime'. Subsequent calls to 'acquireAsync' with a causal consistency set to 'LatestKnown'
@@ -418,8 +377,9 @@ public:
return _cache.getCacheInfo();
}
+protected:
/**
- * ReadThroughCache constructor.
+ * ReadThroughCache constructor, to be called by sub-classes, which implement 'lookup'.
*
* The 'mutex' is for the exclusive usage of the ReadThroughCache and must not be used in any
* way by the implementing class. Having the mutex stored by the sub-class allows latch