summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-12-20 21:31:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-21 00:20:57 +0000
commitd1500aed6baa47ca511692c5ca326321c80cd50d (patch)
tree54789abf914b3038cea7ad40f5f37d52571f678e
parent22d2a4696c86a42662a86752a63fdb4ed9686ff7 (diff)
downloadmongo-d1500aed6baa47ca511692c5ca326321c80cd50d.tar.gz
SERVER-72088 Use ShardRemote for config shard in ShardRegistry
-rw-r--r--src/mongo/db/catalog_shard_feature_flag.idl9
-rw-r--r--src/mongo/db/mongod_main.cpp19
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp4
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp25
-rw-r--r--src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp15
-rw-r--r--src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp48
-rw-r--r--src/mongo/db/s/balancer/balancer_policy.cpp3
-rw-r--r--src/mongo/db/s/balancer/cluster_statistics_impl.cpp6
-rw-r--r--src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp4
-rw-r--r--src/mongo/db/s/config/configsvr_cleanup_reshard_collection_command.cpp2
-rw-r--r--src/mongo/db/s/config/configsvr_clear_jumbo_flag_command.cpp2
-rw-r--r--src/mongo/db/s/config/configsvr_commit_reshard_collection_command.cpp3
-rw-r--r--src/mongo/db/s/config/configsvr_refine_collection_shard_key_command.cpp2
-rw-r--r--src/mongo/db/s/config/configsvr_remove_chunks_command.cpp15
-rw-r--r--src/mongo/db/s/config/configsvr_remove_shard_command.cpp2
-rw-r--r--src/mongo/db/s/config/configsvr_remove_tags_command.cpp15
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp3
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp4
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp49
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp4
-rw-r--r--src/mongo/db/s/config_server_op_observer_test.cpp4
-rw-r--r--src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp12
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.cpp72
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.h12
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp22
-rw-r--r--src/mongo/s/catalog_cache.cpp6
-rw-r--r--src/mongo/s/client/shard_registry.cpp85
-rw-r--r--src/mongo/s/client/shard_registry.h20
-rw-r--r--src/mongo/s/client/shard_remote.cpp26
-rw-r--r--src/mongo/s/cluster_identity_loader.cpp9
-rw-r--r--src/mongo/s/cluster_identity_loader.h6
-rw-r--r--src/mongo/s/cluster_identity_loader_test.cpp58
-rw-r--r--src/mongo/s/config_server_catalog_cache_loader.cpp6
-rw-r--r--src/mongo/s/sharding_initialization.cpp3
35 files changed, 381 insertions, 198 deletions
diff --git a/src/mongo/db/catalog_shard_feature_flag.idl b/src/mongo/db/catalog_shard_feature_flag.idl
index 256489888e9..490ba239ad0 100644
--- a/src/mongo/db/catalog_shard_feature_flag.idl
+++ b/src/mongo/db/catalog_shard_feature_flag.idl
@@ -34,3 +34,12 @@ feature_flags:
description: "Feature flag for enabling shared config server/shard server cluster role"
cpp_varname: gFeatureFlagCatalogShard
default: false
+ # TODO SERVER-72282: Replace with featureFlagCatalogShard once it is stable.
+ #
+ # Temporary feature flag to get coverage for always using config server ShardRemote in the all
+ # feature flags variants. Can be replaced with featureFlagCatalogShard once that flag is stable
+ # enough to run in the all feature flags variants.
+ featureFlagConfigServerAlwaysShardRemote:
+ description: "Feature flag for always using a ShardRemote for ShardRegistry config shard"
+ cpp_varname: gFeatureFlagConfigServerAlwaysShardRemote
+ default: false
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index afd65cb1266..062624f2d9f 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -137,7 +137,6 @@
#include "mongo/db/s/collection_sharding_state_factory_shard.h"
#include "mongo/db/s/collection_sharding_state_factory_standalone.h"
#include "mongo/db/s/config/configsvr_coordinator_service.h"
-#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/config_server_op_observer.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/op_observer_sharding_impl.h"
@@ -740,23 +739,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
}
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- initializeGlobalShardingStateForMongoD(
- startupOpCtx.get(), ShardId::kConfigServerId, ConnectionString::forLocal());
-
- // ShardLocal to use for explicitly local commands on the config server.
- auto localConfigShard =
- Grid::get(serviceContext)->shardRegistry()->createLocalConfigShard();
- auto localCatalogClient = std::make_unique<ShardingCatalogClientImpl>(localConfigShard);
-
- ShardingCatalogManager::create(
- startupOpCtx->getServiceContext(),
- makeShardingTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor")),
- std::move(localConfigShard),
- std::move(localCatalogClient));
-
- if (!gFeatureFlagCatalogShard.isEnabledAndIgnoreFCV()) {
- Grid::get(startupOpCtx.get())->setShardingInitialized();
- }
+ initializeGlobalShardingStateForConfigServer(startupOpCtx.get());
}
if (serverGlobalParams.clusterRole == ClusterRole::None &&
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 92b8d4d1dd3..f83fe715268 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -917,7 +917,9 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
// to ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can
// only meaningfully fail if the node is shutting down.
status = ClusterIdentityLoader::get(opCtx)->loadClusterId(
- opCtx, repl::ReadConcernLevel::kLocalReadConcern);
+ opCtx,
+ ShardingCatalogManager::get(opCtx)->localCatalogClient(),
+ repl::ReadConcernLevel::kLocalReadConcern);
if (ErrorCodes::isShutdownError(status.code())) {
return;
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index 3b0a5777c9b..d7e3839b533 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -382,8 +382,9 @@ Status Balancer::rebalanceSingleChunk(OperationContext* opCtx,
return refreshStatus;
}
- auto coll = Grid::get(opCtx)->catalogClient()->getCollection(
- opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern);
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ auto coll =
+ catalogClient->getCollection(opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern);
auto maxChunkSize =
coll.getMaxChunkSizeBytes().value_or(balancerConfig->getMaxChunkSizeBytes());
@@ -408,8 +409,9 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx,
return moveAllowedStatus;
}
- auto coll = Grid::get(opCtx)->catalogClient()->getCollection(
- opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern);
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ auto coll =
+ catalogClient->getCollection(opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern);
const auto maxChunkSize = getMaxChunkSizeBytes(opCtx, coll);
MoveChunkSettings settings(maxChunkSize, secondaryThrottle, waitForDelete);
@@ -426,8 +428,9 @@ Status Balancer::moveRange(OperationContext* opCtx,
const NamespaceString& nss,
const ConfigsvrMoveRange& request,
bool issuedByRemoteUser) {
- auto coll = Grid::get(opCtx)->catalogClient()->getCollection(
- opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern);
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ auto coll =
+ catalogClient->getCollection(opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern);
const auto maxChunkSize = getMaxChunkSizeBytes(opCtx, coll);
const auto [fromShardId, min] = [&]() {
@@ -654,6 +657,7 @@ void Balancer::_mainThread() {
Client::initThread("Balancer");
auto opCtx = cc().makeOperationContext();
auto shardingContext = Grid::get(opCtx.get());
+ const auto catalogClient = ShardingCatalogManager::get(opCtx.get())->localCatalogClient();
LOGV2(21856, "CSRS balancer is starting");
@@ -767,7 +771,7 @@ void Balancer::_mainThread() {
// Collect and apply up-to-date configuration values on the cluster collections.
{
OperationContext* ctx = opCtx.get();
- auto allCollections = Grid::get(ctx)->catalogClient()->getCollections(ctx, {});
+ auto allCollections = catalogClient->getCollections(ctx, {});
for (const auto& coll : allCollections) {
_defragmentationPolicy->startCollectionDefragmentation(ctx, coll);
}
@@ -1020,7 +1024,7 @@ int Balancer::_moveChunks(OperationContext* opCtx,
const MigrateInfoVector& chunksToRebalance,
const MigrateInfoVector& chunksToDefragment) {
auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
- auto catalogClient = Grid::get(opCtx)->catalogClient();
+ auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
// If the balancer was disabled since we started this round, don't start new chunk moves
if (_stopRequested() || !balancerConfig->shouldBalance() ||
@@ -1037,7 +1041,7 @@ int Balancer::_moveChunks(OperationContext* opCtx,
return *migrateInfo.optMaxChunkSizeBytes;
}
- auto coll = Grid::get(opCtx)->catalogClient()->getCollection(
+ auto coll = catalogClient->getCollection(
opCtx, migrateInfo.nss, repl::ReadConcernLevel::kMajorityReadConcern);
return coll.getMaxChunkSizeBytes().value_or(balancerConfig->getMaxChunkSizeBytes());
}();
@@ -1156,9 +1160,10 @@ SharedSemiFuture<void> Balancer::applyLegacyChunkSizeConstraintsOnClusterData(
BalancerCollectionStatusResponse Balancer::getBalancerStatusForNs(OperationContext* opCtx,
const NamespaceString& ns) {
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
CollectionType coll;
try {
- coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, ns, {});
+ coll = catalogClient->getCollection(opCtx, ns, {});
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
uasserted(ErrorCodes::NamespaceNotSharded, "Collection unsharded or undefined");
}
diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
index 98f9709efeb..0cdcb5727fb 100644
--- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp
@@ -38,6 +38,7 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj_comparator_interface.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/sharding_config_server_parameters_gen.h"
#include "mongo/db/s/sharding_util.h"
#include "mongo/logv2/log.h"
@@ -175,7 +176,8 @@ getDataSizeInfoForCollections(OperationContext* opCtx,
CollectionDataSizeInfoForBalancing getDataSizeInfoForCollection(OperationContext* opCtx,
const NamespaceString& nss) {
- const auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss);
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ const auto coll = catalogClient->getCollection(opCtx, nss);
std::vector<CollectionType> vec{coll};
return std::move(getDataSizeInfoForCollections(opCtx, vec).at(nss));
}
@@ -348,7 +350,8 @@ StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToSpli
const auto& shardStats = shardStatsStatus.getValue();
- auto collections = Grid::get(opCtx)->catalogClient()->getCollections(opCtx, {});
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ auto collections = catalogClient->getCollections(opCtx, {});
if (collections.empty()) {
return SplitInfoVector{};
}
@@ -412,7 +415,8 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo
return MigrateInfoVector{};
}
- auto collections = Grid::get(opCtx)->catalogClient()->getCollections(opCtx, {});
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ auto collections = catalogClient->getCollections(opCtx, {});
if (collections.empty()) {
return MigrateInfoVector{};
}
@@ -506,7 +510,7 @@ StatusWith<MigrateInfosWithReason> BalancerChunkSelectionPolicyImpl::selectChunk
// Used to check locally if the collection exists, it should trow NamespaceNotFound if it
// doesn't.
- Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss);
+ ShardingCatalogManager::get(opCtx)->localCatalogClient()->getCollection(opCtx, nss);
stdx::unordered_set<ShardId> availableShards;
std::transform(shardStats.begin(),
@@ -569,7 +573,8 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCt
return shardStatsStatus.getStatus();
}
- const CollectionType collection = Grid::get(opCtx)->catalogClient()->getCollection(
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ const CollectionType collection = catalogClient->getCollection(
opCtx, chunk.getCollectionUUID(), repl::ReadConcernLevel::kLocalReadConcern);
const auto& nss = collection.getNss();
diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
index 971c7eeddc9..c7d43d723df 100644
--- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp
@@ -69,16 +69,17 @@ ShardVersion getShardVersion(OperationContext* opCtx,
}
std::vector<ChunkType> getCollectionChunks(OperationContext* opCtx, const CollectionType& coll) {
- return uassertStatusOK(Grid::get(opCtx)->catalogClient()->getChunks(
- opCtx,
- BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/,
- BSON(ChunkType::min() << 1) /*sort*/,
- boost::none /*limit*/,
- nullptr /*opTime*/,
- coll.getEpoch(),
- coll.getTimestamp(),
- repl::ReadConcernLevel::kLocalReadConcern,
- boost::none));
+ auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ return uassertStatusOK(
+ catalogClient->getChunks(opCtx,
+ BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/,
+ BSON(ChunkType::min() << 1) /*sort*/,
+ boost::none /*limit*/,
+ nullptr /*opTime*/,
+ coll.getEpoch(),
+ coll.getTimestamp(),
+ repl::ReadConcernLevel::kLocalReadConcern,
+ boost::none));
}
uint64_t getCollectionMaxChunkSizeBytes(OperationContext* opCtx, const CollectionType& coll) {
@@ -1181,16 +1182,17 @@ class SplitChunksPhase : public DefragmentationPhase {
public:
static std::unique_ptr<SplitChunksPhase> build(OperationContext* opCtx,
const CollectionType& coll) {
- auto collectionChunks = uassertStatusOK(Grid::get(opCtx)->catalogClient()->getChunks(
- opCtx,
- BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/,
- BSON(ChunkType::min() << 1) /*sort*/,
- boost::none /*limit*/,
- nullptr /*opTime*/,
- coll.getEpoch(),
- coll.getTimestamp(),
- repl::ReadConcernLevel::kLocalReadConcern,
- boost::none));
+ auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ auto collectionChunks = uassertStatusOK(
+ catalogClient->getChunks(opCtx,
+ BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/,
+ BSON(ChunkType::min() << 1) /*sort*/,
+ boost::none /*limit*/,
+ nullptr /*opTime*/,
+ coll.getEpoch(),
+ coll.getTimestamp(),
+ repl::ReadConcernLevel::kLocalReadConcern,
+ boost::none));
stdx::unordered_map<ShardId, PendingActions> pendingActionsByShards;
@@ -1421,7 +1423,8 @@ void BalancerDefragmentationPolicyImpl::startCollectionDefragmentation(Operation
void BalancerDefragmentationPolicyImpl::abortCollectionDefragmentation(OperationContext* opCtx,
const NamespaceString& nss) {
stdx::lock_guard<Latch> lk(_stateMutex);
- auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss, {});
+ auto coll =
+ ShardingCatalogManager::get(opCtx)->localCatalogClient()->getCollection(opCtx, nss, {});
if (coll.getDefragmentCollection()) {
if (_defragmentationStates.contains(coll.getUuid())) {
// Notify phase to abort current phase
@@ -1593,7 +1596,8 @@ bool BalancerDefragmentationPolicyImpl::_advanceToNextActionablePhase(OperationC
boost::optional<CollectionType> coll(boost::none);
while (phaseTransitionNeeded()) {
if (!coll) {
- coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, collUuid);
+ coll = ShardingCatalogManager::get(opCtx)->localCatalogClient()->getCollection(
+ opCtx, collUuid);
}
currentPhase = _transitionPhases(opCtx, *coll, currentPhase->getNextPhase());
advanced = true;
diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp
index 8f28c3cef8f..71d42629e52 100644
--- a/src/mongo/db/s/balancer/balancer_policy.cpp
+++ b/src/mongo/db/s/balancer/balancer_policy.cpp
@@ -35,6 +35,7 @@
#include <random>
#include "mongo/db/s/balancer/type_migration.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/logv2/log.h"
#include "mongo/s/balancer_configuration.h"
#include "mongo/s/catalog/type_shard.h"
@@ -200,7 +201,7 @@ StatusWith<ZoneInfo> ZoneInfo::getZonesForCollection(OperationContext* opCtx,
const NamespaceString& nss,
const KeyPattern& keyPattern) {
const auto swCollectionZones =
- Grid::get(opCtx)->catalogClient()->getTagsForCollection(opCtx, nss);
+ ShardingCatalogManager::get(opCtx)->localCatalogClient()->getTagsForCollection(opCtx, nss);
if (!swCollectionZones.isOK()) {
return swCollectionZones.getStatus().withContext(
str::stream() << "Unable to load zones for collection " << nss);
diff --git a/src/mongo/db/s/balancer/cluster_statistics_impl.cpp b/src/mongo/db/s/balancer/cluster_statistics_impl.cpp
index d1bf5ee2e80..97e114b0f8d 100644
--- a/src/mongo/db/s/balancer/cluster_statistics_impl.cpp
+++ b/src/mongo/db/s/balancer/cluster_statistics_impl.cpp
@@ -37,6 +37,7 @@
#include "mongo/base/status_with.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/read_preference.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
@@ -115,8 +116,9 @@ StatusWith<std::vector<ShardStatistics>> ClusterStatisticsImpl::_getStats(
// db.serverStatus() (mem.mapped) to all shards.
//
// TODO: skip unresponsive shards and mark information as stale.
- auto shardsStatus = Grid::get(opCtx)->catalogClient()->getAllShards(
- opCtx, repl::ReadConcernLevel::kMajorityReadConcern);
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ auto shardsStatus =
+ catalogClient->getAllShards(opCtx, repl::ReadConcernLevel::kMajorityReadConcern);
if (!shardsStatus.isOK()) {
return shardsStatus.getStatus();
}
diff --git a/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp b/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp
index bfedddd6a3a..57f11dd1472 100644
--- a/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp
@@ -51,8 +51,8 @@ UUID retrieveReshardingUUID(OperationContext* opCtx, const NamespaceString& ns)
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
- const auto collEntry = catalogClient->getCollection(opCtx, ns);
+ const auto collEntry =
+ ShardingCatalogManager::get(opCtx)->localCatalogClient()->getCollection(opCtx, ns);
uassert(ErrorCodes::NoSuchReshardCollection,
"Could not find resharding-related metadata that matches the given namespace",
diff --git a/src/mongo/db/s/config/configsvr_cleanup_reshard_collection_command.cpp b/src/mongo/db/s/config/configsvr_cleanup_reshard_collection_command.cpp
index d3a253f6610..dcaf9bb5b4d 100644
--- a/src/mongo/db/s/config/configsvr_cleanup_reshard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_cleanup_reshard_collection_command.cpp
@@ -90,7 +90,7 @@ public:
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
auto collEntry = catalogClient->getCollection(opCtx, ns());
if (!collEntry.getReshardingFields()) {
// If the collection entry doesn't have resharding fields, we assume that the
diff --git a/src/mongo/db/s/config/configsvr_clear_jumbo_flag_command.cpp b/src/mongo/db/s/config/configsvr_clear_jumbo_flag_command.cpp
index baf26278f65..e8b3761e21a 100644
--- a/src/mongo/db/s/config/configsvr_clear_jumbo_flag_command.cpp
+++ b/src/mongo/db/s/config/configsvr_clear_jumbo_flag_command.cpp
@@ -63,7 +63,7 @@ public:
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
CollectionType collType;
try {
diff --git a/src/mongo/db/s/config/configsvr_commit_reshard_collection_command.cpp b/src/mongo/db/s/config/configsvr_commit_reshard_collection_command.cpp
index 8e151c60625..0fa75ee14b3 100644
--- a/src/mongo/db/s/config/configsvr_commit_reshard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_commit_reshard_collection_command.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/repl/primary_only_service.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/resharding/resharding_coordinator_service.h"
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
#include "mongo/logv2/log.h"
@@ -51,7 +52,7 @@ UUID retrieveReshardingUUID(OperationContext* opCtx, const NamespaceString& ns)
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
const auto collEntry = catalogClient->getCollection(opCtx, ns);
uassert(ErrorCodes::NoSuchReshardCollection,
diff --git a/src/mongo/db/s/config/configsvr_refine_collection_shard_key_command.cpp b/src/mongo/db/s/config/configsvr_refine_collection_shard_key_command.cpp
index 01c13128c15..e952b0e9cd4 100644
--- a/src/mongo/db/s/config/configsvr_refine_collection_shard_key_command.cpp
+++ b/src/mongo/db/s/config/configsvr_refine_collection_shard_key_command.cpp
@@ -74,7 +74,7 @@ public:
repl::ReadConcernArgs::get(opCtx) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
// Validate the given namespace is (i) sharded, (ii) doesn't already have the proposed
// key, and (iii) has the same epoch as the router that received
diff --git a/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp
index c54390d759a..a842c597425 100644
--- a/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp
+++ b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/remove_chunks_gen.h"
#include "mongo/db/session/session_catalog_mongod.h"
#include "mongo/db/transaction/transaction_participant.h"
@@ -94,13 +95,13 @@ public:
// Write with localWriteConcern because we cannot wait for replication with a
// session checked out. The command will wait for majority WC on the epilogue after
// the session has been checked in.
- uassertStatusOK(
- Grid::get(newOpCtxPtr.get())
- ->catalogClient()
- ->removeConfigDocuments(newOpCtxPtr.get(),
- ChunkType::ConfigNS,
- BSON(ChunkType::collectionUUID << collectionUUID),
- ShardingCatalogClient::kLocalWriteConcern));
+ const auto catalogClient =
+ ShardingCatalogManager::get(newOpCtxPtr.get())->localCatalogClient();
+ uassertStatusOK(catalogClient->removeConfigDocuments(
+ newOpCtxPtr.get(),
+ ChunkType::ConfigNS,
+ BSON(ChunkType::collectionUUID << collectionUUID),
+ ShardingCatalogClient::kLocalWriteConcern));
}
// Since we no write happened on this txnNumber, we need to make a dummy write so that
diff --git a/src/mongo/db/s/config/configsvr_remove_shard_command.cpp b/src/mongo/db/s/config/configsvr_remove_shard_command.cpp
index efebf6f1cf9..486de3cb501 100644
--- a/src/mongo/db/s/config/configsvr_remove_shard_command.cpp
+++ b/src/mongo/db/s/config/configsvr_remove_shard_command.cpp
@@ -118,7 +118,7 @@ public:
return shard->getId();
}();
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
const auto shardingCatalogManager = ShardingCatalogManager::get(opCtx);
const auto shardDrainingStatus = [&] {
diff --git a/src/mongo/db/s/config/configsvr_remove_tags_command.cpp b/src/mongo/db/s/config/configsvr_remove_tags_command.cpp
index 90b2317c001..8d0c530bc65 100644
--- a/src/mongo/db/s/config/configsvr_remove_tags_command.cpp
+++ b/src/mongo/db/s/config/configsvr_remove_tags_command.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/remove_tags_gen.h"
#include "mongo/db/session/session_catalog_mongod.h"
#include "mongo/db/transaction/transaction_participant.h"
@@ -89,13 +90,13 @@ public:
auto newOpCtxPtr = CancelableOperationContext(
cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
- uassertStatusOK(
- Grid::get(newOpCtxPtr.get())
- ->catalogClient()
- ->removeConfigDocuments(newOpCtxPtr.get(),
- TagsType::ConfigNS,
- BSON(TagsType::ns(nss.ns())),
- ShardingCatalogClient::kLocalWriteConcern));
+ const auto catalogClient =
+ ShardingCatalogManager::get(newOpCtxPtr.get())->localCatalogClient();
+ uassertStatusOK(catalogClient->removeConfigDocuments(
+ newOpCtxPtr.get(),
+ TagsType::ConfigNS,
+ BSON(TagsType::ns(nss.ns())),
+ ShardingCatalogClient::kLocalWriteConcern));
}
// Since we no write happened on this txnNumber, we need to make a dummy write so that
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index 87267b0fbf5..77d3f19dded 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/repl/primary_only_service.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/resharding/coordinator_document_gen.h"
#include "mongo/db/s/resharding/resharding_coordinator_service.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
@@ -77,7 +78,7 @@ public:
const NamespaceString& nss = ns();
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
try {
const auto collEntry = catalogClient->getCollection(opCtx, nss);
uassert(ErrorCodes::NotImplemented,
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp
index 5ac1e13ff2b..a5f1703688f 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp
@@ -85,8 +85,8 @@ protected:
setUpAndInitializeConfigDb();
auto clusterIdLoader = ClusterIdentityLoader::get(operationContext());
- ASSERT_OK(clusterIdLoader->loadClusterId(operationContext(),
- repl::ReadConcernLevel::kLocalReadConcern));
+ ASSERT_OK(clusterIdLoader->loadClusterId(
+ operationContext(), catalogClient(), repl::ReadConcernLevel::kLocalReadConcern));
_clusterId = clusterIdLoader->getClusterId();
}
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 960ae41c796..d622162ef09 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -227,8 +227,7 @@ StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse(
* Helper function to get the collection entry and version for nss. Always uses kLocalReadConcern.
*/
StatusWith<std::pair<CollectionType, ChunkVersion>> getCollectionAndVersion(
- OperationContext* opCtx, const NamespaceString& nss) {
- auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ OperationContext* opCtx, Shard* configShard, const NamespaceString& nss) {
auto findCollResponse =
configShard->exhaustiveFindOnConfig(opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
@@ -250,14 +249,13 @@ StatusWith<std::pair<CollectionType, ChunkVersion>> getCollectionAndVersion(
const auto chunksQuery = BSON(ChunkType::collectionUUID << coll.getUuid());
const auto version = uassertStatusOK(getMaxChunkVersionFromQueryResponse(
coll,
- Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- ChunkType::ConfigNS,
- chunksQuery, // Query all chunks for this namespace.
- BSON(ChunkType::lastmod << -1), // Sort by version.
- 1)) // Limit 1.
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ chunksQuery, // Query all chunks for this namespace.
+ BSON(ChunkType::lastmod << -1), // Sort by version.
+ 1)) // Limit 1.
);
return std::pair<CollectionType, ChunkVersion>{std::move(coll), std::move(version)};
@@ -367,7 +365,8 @@ void ShardingCatalogManager::bumpMajorVersionOneChunkPerShard(
const NamespaceString& nss,
TxnNumber txnNumber,
const std::vector<ShardId>& shardIds) {
- const auto [coll, curCollectionVersion] = uassertStatusOK(getCollectionAndVersion(opCtx, nss));
+ const auto [coll, curCollectionVersion] =
+ uassertStatusOK(getCollectionAndVersion(opCtx, _localConfigShard.get(), nss));
ChunkVersion targetChunkVersion(
{curCollectionVersion.epoch(), curCollectionVersion.getTimestamp()},
{curCollectionVersion.majorVersion() + 1, 0});
@@ -596,7 +595,7 @@ ShardingCatalogManager::commitChunkSplit(OperationContext* opCtx,
Lock::ExclusiveLock lk(opCtx, _kChunkOpLock);
// Get collection entry and max chunk version for this namespace.
- auto swCollAndVersion = getCollectionAndVersion(opCtx, nss);
+ auto swCollAndVersion = getCollectionAndVersion(opCtx, _localConfigShard.get(), nss);
if (!swCollAndVersion.isOK()) {
return swCollAndVersion.getStatus().withContext(
@@ -794,7 +793,8 @@ ShardingCatalogManager::commitChunksMerge(OperationContext* opCtx,
Lock::ExclusiveLock lk(opCtx, _kChunkOpLock);
// 1. Retrieve the initial collection version info to build up the logging info.
- const auto [coll, collVersion] = uassertStatusOK(getCollectionAndVersion(opCtx, nss));
+ const auto [coll, collVersion] =
+ uassertStatusOK(getCollectionAndVersion(opCtx, _localConfigShard.get(), nss));
uassert(ErrorCodes::StaleEpoch,
"Collection changed",
(!epoch || collVersion.epoch() == epoch) &&
@@ -1242,8 +1242,8 @@ void ShardingCatalogManager::upgradeChunksHistory(OperationContext* opCtx,
// migrations.
Lock::ExclusiveLock lk(opCtx, _kChunkOpLock);
- const auto [coll, collVersion] = uassertStatusOK(getCollectionAndVersion(opCtx, nss));
- auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ const auto [coll, collVersion] =
+ uassertStatusOK(getCollectionAndVersion(opCtx, _localConfigShard.get(), nss));
if (force) {
LOGV2(620650,
@@ -1251,7 +1251,7 @@ void ShardingCatalogManager::upgradeChunksHistory(OperationContext* opCtx,
"order to force all chunks' history to get recreated",
"namespace"_attr = nss.ns());
- BatchedCommandRequest request([&configShard, collUuid = coll.getUuid()] {
+ BatchedCommandRequest request([collUuid = coll.getUuid()] {
write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS);
updateOp.setUpdates({[&] {
write_ops::UpdateOpEntry entry;
@@ -1949,15 +1949,14 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction(
const auto chunkQuery =
BSON(ChunkType::collectionUUID << migratedChunk->getCollectionUUID() << ChunkType::shard
<< migratedChunk->getShard());
- auto findResponse = uassertStatusOK(
- Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- ChunkType::ConfigNS,
- chunkQuery,
- BSONObj(),
- 1 /* limit */));
+ auto findResponse = uassertStatusOK(_localConfigShard->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ chunkQuery,
+ BSONObj(),
+ 1 /* limit */));
return findResponse.docs.empty();
}();
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp
index 37da8bfe872..5d00ebc34be 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp
@@ -85,8 +85,8 @@ protected:
setUpAndInitializeConfigDb();
auto clusterIdLoader = ClusterIdentityLoader::get(operationContext());
- ASSERT_OK(clusterIdLoader->loadClusterId(operationContext(),
- repl::ReadConcernLevel::kLocalReadConcern));
+ ASSERT_OK(clusterIdLoader->loadClusterId(
+ operationContext(), catalogClient(), repl::ReadConcernLevel::kLocalReadConcern));
_clusterId = clusterIdLoader->getClusterId();
ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
diff --git a/src/mongo/db/s/config_server_op_observer_test.cpp b/src/mongo/db/s/config_server_op_observer_test.cpp
index 83a1ef8694a..e2ed5c1ff6d 100644
--- a/src/mongo/db/s/config_server_op_observer_test.cpp
+++ b/src/mongo/db/s/config_server_op_observer_test.cpp
@@ -45,8 +45,8 @@ protected:
setUpAndInitializeConfigDb();
auto clusterIdLoader = ClusterIdentityLoader::get(operationContext());
- ASSERT_OK(clusterIdLoader->loadClusterId(operationContext(),
- repl::ReadConcernLevel::kLocalReadConcern));
+ ASSERT_OK(clusterIdLoader->loadClusterId(
+ operationContext(), catalogClient(), repl::ReadConcernLevel::kLocalReadConcern));
_clusterId = clusterIdLoader->getClusterId();
}
diff --git a/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp b/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp
index b5378f38e02..fb5b113216d 100644
--- a/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp
+++ b/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/curop.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
@@ -125,7 +126,8 @@ void PeriodicShardedIndexConsistencyChecker::_launchShardedIndexConsistencyCheck
try {
long long numShardedCollsWithInconsistentIndexes = 0;
- auto collections = Grid::get(opCtx)->catalogClient()->getCollections(
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
+ auto collections = catalogClient->getCollections(
opCtx, {}, repl::ReadConcernLevel::kLocalReadConcern);
for (const auto& coll : collections) {
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 87a6016e264..8e035f95367 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -589,7 +589,7 @@ void removeChunkAndTagsDocs(OperationContext* opCtx,
const auto chunksQuery = BSON(ChunkType::collectionUUID() << collUUID);
const auto tagDeleteOperationHint = BSON(TagsType::ns() << 1 << TagsType::min() << 1);
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
uassertStatusOK(catalogClient->removeConfigDocuments(
opCtx, ChunkType::ConfigNS, chunksQuery, kMajorityWriteConcern));
@@ -844,7 +844,7 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
// collection. So don't try to call remove as it will end up removing the metadata
// for the real collection.
if (!wasDecisionPersisted) {
- const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
uassertStatusOK(catalogClient->removeConfigDocuments(
opCtx,
@@ -2146,8 +2146,9 @@ void ReshardingCoordinator::_updateChunkImbalanceMetrics(const NamespaceString&
Grid::get(opCtx)->catalogCache()->getShardedCollectionPlacementInfoWithRefresh(opCtx,
nss));
+ const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient();
const auto collectionZones =
- uassertStatusOK(Grid::get(opCtx)->catalogClient()->getTagsForCollection(opCtx, nss));
+ uassertStatusOK(catalogClient->getTagsForCollection(opCtx, nss));
const auto& keyPattern = routingInfo.getShardKeyPattern().getKeyPattern();
@@ -2159,9 +2160,8 @@ void ReshardingCoordinator::_updateChunkImbalanceMetrics(const NamespaceString&
tag.getTag())));
}
- const auto allShardsWithOpTime =
- uassertStatusOK(Grid::get(opCtx)->catalogClient()->getAllShards(
- opCtx, repl::ReadConcernLevel::kLocalReadConcern));
+ const auto allShardsWithOpTime = uassertStatusOK(
+ catalogClient->getAllShards(opCtx, repl::ReadConcernLevel::kLocalReadConcern));
auto imbalanceCount =
getMaxChunkImbalanceCount(routingInfo, allShardsWithOpTime.value, zoneInfo);
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp
index be5483b21db..9a4caf90e93 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod.cpp
@@ -38,6 +38,7 @@
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/audit.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/catalog_shard_feature_flag_gen.h"
#include "mongo/db/client_metadata_propagation_egress_hook.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/dbhelpers.h"
@@ -48,6 +49,7 @@
#include "mongo/db/ops/update.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/chunk_splitter.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/s/periodic_balancer_config_refresher.h"
#include "mongo/db/s/read_only_catalog_cache_loader.h"
#include "mongo/db/s/shard_local.h"
@@ -55,9 +57,11 @@
#include "mongo/db/s/transaction_coordinator_service.h"
#include "mongo/db/server_options.h"
#include "mongo/db/vector_clock_metadata_hook.h"
+#include "mongo/executor/network_interface_factory.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
+#include "mongo/s/catalog/sharding_catalog_client_impl.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_factory.h"
#include "mongo/s/client/shard_remote.h"
@@ -118,6 +122,15 @@ public:
LOGV2(471692, "Unable to update the shard registry", "error"_attr = e);
}
+ auto const shardingState = ShardingState::get(_serviceContext);
+ if (!shardingState->enabled()) {
+ // If our sharding state isn't enabled, we don't have a shard identity document, so
+ // there's nothing to update. Note technically this may race with the config server
+ // being added as a shard, but that shouldn't be a problem since addShard will use a
+ // valid connection string and should serialize with a replica set reconfig.
+ return;
+ }
+
auto setName = connStr.getSetName();
bool updateInProgress = false;
{
@@ -489,8 +502,15 @@ void ShardingInitializationMongoD::updateShardIdentityConfigString(
}
void ShardingInitializationMongoD::onSetCurrentConfig(OperationContext* opCtx) {
- // TODO SERVER-72088: Use the connection string from the config to construct a ShardRemote for
- // the config server when in catalog shard mode.
+ if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer ||
+ !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) {
+ // Only config servers capable of acting as a shard set up the config shard in their shard
+ // registry with a real connection string.
+ return;
+ }
+
+ auto myConnectionString = repl::ReplicationCoordinator::get(opCtx)->getConfigConnectionString();
+ Grid::get(opCtx)->shardRegistry()->initConfigShardIfNecessary(myConnectionString);
}
void ShardingInitializationMongoD::onInitialDataAvailable(OperationContext* opCtx,
@@ -507,10 +527,42 @@ void ShardingInitializationMongoD::onInitialDataAvailable(OperationContext* opCt
}
}
+void initializeGlobalShardingStateForConfigServer(OperationContext* opCtx) {
+ ShardingInitializationMongoD::get(opCtx)->installReplicaSetChangeListener(
+ opCtx->getServiceContext());
+
+ auto configCS = []() -> boost::optional<ConnectionString> {
+ if (gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) {
+ // When the config server can operate as a shard, it sets up a ShardRemote for the
+ // config shard, which is created later after loading the local replica set config.
+ return boost::none;
+ }
+ return {ConnectionString::forLocal()};
+ }();
+
+ initializeGlobalShardingStateForMongoD(opCtx, ShardId::kConfigServerId, configCS);
+
+ // ShardLocal to use for explicitly local commands on the config server.
+ auto localConfigShard = Grid::get(opCtx)->shardRegistry()->createLocalConfigShard();
+ auto localCatalogClient = std::make_unique<ShardingCatalogClientImpl>(localConfigShard);
+
+ ShardingCatalogManager::create(
+ opCtx->getServiceContext(),
+ makeShardingTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor")),
+ std::move(localConfigShard),
+ std::move(localCatalogClient));
+
+ if (!gFeatureFlagCatalogShard.isEnabledAndIgnoreFCV()) {
+ Grid::get(opCtx)->setShardingInitialized();
+ }
+}
+
void initializeGlobalShardingStateForMongoD(OperationContext* opCtx,
const ShardId& shardId,
- const ConnectionString& configCS) {
- uassert(ErrorCodes::BadValue, "Unrecognized connection string.", configCS);
+ const boost::optional<ConnectionString>& configCS) {
+ if (configCS) {
+ uassert(ErrorCodes::BadValue, "Unrecognized connection string.", *configCS);
+ }
auto targeterFactory = std::make_unique<RemoteCommandTargeterFactoryImpl>();
auto targeterFactoryPtr = targeterFactory.get();
@@ -602,16 +654,20 @@ void initializeGlobalShardingStateForMongoD(OperationContext* opCtx,
}
}
+
+void ShardingInitializationMongoD::installReplicaSetChangeListener(ServiceContext* service) {
+ _replicaSetChangeListener =
+ ReplicaSetMonitor::getNotifier().makeListener<ShardingReplicaSetChangeListener>(service);
+}
+
void ShardingInitializationMongoD::_initializeShardingEnvironmentOnShardServer(
OperationContext* opCtx, const ShardIdentity& shardIdentity) {
- _replicaSetChangeListener =
- ReplicaSetMonitor::getNotifier().makeListener<ShardingReplicaSetChangeListener>(
- opCtx->getServiceContext());
+ installReplicaSetChangeListener(opCtx->getServiceContext());
initializeGlobalShardingStateForMongoD(opCtx,
shardIdentity.getShardName().toString(),
- shardIdentity.getConfigsvrConnectionString());
+ {shardIdentity.getConfigsvrConnectionString()});
// Determine primary/secondary/standalone state in order to properly initialize sharding
// components.
diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h
index a2c092fb3f2..78a56d48408 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.h
+++ b/src/mongo/db/s/sharding_initialization_mongod.h
@@ -106,6 +106,11 @@ public:
_initFunc = std::move(func);
}
+ /**
+ * Installs a listener for RSM change notifications.
+ */
+ void installReplicaSetChangeListener(ServiceContext* service);
+
private:
void _initializeShardingEnvironmentOnShardServer(OperationContext* opCtx,
const ShardIdentity& shardIdentity);
@@ -141,6 +146,11 @@ private:
*/
void initializeGlobalShardingStateForMongoD(OperationContext* opCtx,
const ShardId& shardId,
- const ConnectionString& configCS);
+ const boost::optional<ConnectionString>& configCS);
+
+/**
+ * Initialize the sharding components for a config server.
+ */
+void initializeGlobalShardingStateForConfigServer(OperationContext* opCtx);
} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index 2253e2ab55e..463e46ed9ff 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -36,6 +36,7 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
+#include "mongo/db/catalog_shard_feature_flag_gen.h"
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
@@ -323,11 +324,24 @@ std::vector<BSONObj> runCatalogAggregation(OperationContext* opCtx,
AggregateCommandRequest& aggRequest,
const repl::ReadConcernArgs& readConcern,
const Milliseconds& maxTimeout) {
+ // Reads on the config server may run on any node in its replica set. Such reads use the config
+ // time as an afterClusterTime token, but config time is only inclusive of majority committed
+ // data, so we should not use a weaker read concern. Note if the local node is a config server,
+ // it can use these concerns safely with a ShardLocal, which would require relaxing this
+ // invariant.
+ invariant(readConcern.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern ||
+ readConcern.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern ||
+ readConcern.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern,
+ str::stream() << "Disallowed read concern: " << readConcern.toBSONInner());
+
aggRequest.setReadConcern(readConcern.toBSONInner());
aggRequest.setWriteConcern(WriteConcernOptions());
const auto readPref = [&]() -> ReadPreferenceSetting {
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
+ !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) {
+ // When the feature flag is on, the config server may read from any node in its replica
+ // set, so we should use the typical config server read preference.
return {};
}
@@ -340,6 +354,7 @@ std::vector<BSONObj> runCatalogAggregation(OperationContext* opCtx,
aggRequest.setUnwrappedReadPref(readPref.toContainingBSON());
if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
+ // Don't use a timeout on the config server to guarantee it can always refresh.
const Milliseconds maxTimeMS = std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeout);
aggRequest.setMaxTimeMS(durationCount<Milliseconds>(maxTimeMS));
}
@@ -540,7 +555,10 @@ std::vector<ShardId> makeAndRunPlacementHistoryAggregation(
// Run the aggregation
const auto readConcern = [&]() -> repl::ReadConcernArgs {
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
+ !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) {
+ // When the feature flag is on, the config server may read from a secondary which may
+ // need to wait for replication, so we should use afterClusterTime.
return {repl::ReadConcernLevel::kMajorityReadConcern};
} else {
const auto time = VectorClock::get(opCtx)->getTime();
diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp
index 0bfb5078009..4c261b8b9cb 100644
--- a/src/mongo/s/catalog_cache.cpp
+++ b/src/mongo/s/catalog_cache.cpp
@@ -32,6 +32,7 @@
#include <fmt/format.h>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/catalog_shard_feature_flag_gen.h"
#include "mongo/db/curop.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/repl/optime_with.h"
@@ -1005,7 +1006,10 @@ CatalogCache::IndexCache::LookupResult CatalogCache::IndexCache::_lookupIndexes(
"timeInStore"_attr = previousVersion);
const auto readConcern = [&]() -> repl::ReadConcernArgs {
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
+ !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) {
+ // When the feature flag is on, the config server may read from a secondary which
+ // may need to wait for replication, so we should use afterClusterTime.
return {repl::ReadConcernLevel::kSnapshotReadConcern};
} else {
const auto vcTime = VectorClock::get(opCtx)->getTime();
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 120562f6c3d..ef939a50137 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -33,6 +33,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/client/replica_set_monitor.h"
+#include "mongo/db/catalog_shard_feature_flag_gen.h"
#include "mongo/db/client.h"
#include "mongo/db/vector_clock.h"
#include "mongo/db/vector_clock_metadata_hook.h"
@@ -65,7 +66,7 @@ using CallbackArgs = executor::TaskExecutor::CallbackArgs;
ShardRegistry::ShardRegistry(ServiceContext* service,
std::unique_ptr<ShardFactory> shardFactory,
- const ConnectionString& configServerCS,
+ const boost::optional<ConnectionString>& configServerCS,
std::vector<ShardRemovalHook> shardRemovalHooks)
: _service(service),
_shardFactory(std::move(shardFactory)),
@@ -88,7 +89,12 @@ ShardRegistry::ShardRegistry(ServiceContext* service,
const Time& timeInStore) { return _lookup(opCtx, key, cachedData, timeInStore); },
1 /* cacheSize */)) {
- invariant(_initConfigServerCS.isValid());
+ if (_initConfigServerCS) {
+ invariant(_initConfigServerCS->isValid());
+ } else {
+ invariant(gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV());
+ }
+
_threadPool.startup();
}
@@ -99,24 +105,24 @@ ShardRegistry::~ShardRegistry() {
void ShardRegistry::init() {
invariant(!_isInitialized.load());
- LOGV2_DEBUG(5123000,
- 1,
- "Initializing ShardRegistry",
- "configServers"_attr = _initConfigServerCS.toString());
-
/* The creation of the config shard object will intialize the associated RSM monitor that in
* turn will call ShardRegistry::updateReplSetHosts(). Hence the config shard object MUST be
* created after the ShardRegistry is fully constructed. This is why `_configShardData`
* is initialized here rather than in the ShardRegistry constructor.
*/
- {
+ if (_initConfigServerCS) {
+ LOGV2_DEBUG(
+ 5123000, 1, "Initializing ShardRegistry", "configServers"_attr = _initConfigServerCS);
+
stdx::lock_guard<Latch> lk(_mutex);
- _configShardData = ShardRegistryData::createWithConfigShardOnly(
- _shardFactory->createShard(ShardId::kConfigServerId, _initConfigServerCS));
- _latestConnStrings[_initConfigServerCS.getSetName()] = _initConfigServerCS;
+ _initConfigShard(lk, *_initConfigServerCS);
+ _isInitialized.store(true);
+ } else {
+ LOGV2_DEBUG(
+ 7208800,
+ 1,
+ "Deferring ShardRegistry initialization until local replica set config is known");
}
-
- _isInitialized.store(true);
}
ShardRegistry::Cache::LookupResult ShardRegistry::_lookup(OperationContext* opCtx,
@@ -271,7 +277,10 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const {
std::shared_ptr<Shard> ShardRegistry::getConfigShard() const {
stdx::lock_guard<Latch> lk(_mutex);
- return _configShardData.findShard(ShardId::kConfigServerId);
+ auto configShard = _configShardData.findShard(ShardId::kConfigServerId);
+ // Note this should only throw if the local node has not learned its replica set config yet.
+ uassert(ErrorCodes::NotYetInitialized, "Config shard has not been set up yet", configShard);
+ return configShard;
}
StatusWith<std::shared_ptr<Shard>> ShardRegistry::getShard(OperationContext* opCtx,
@@ -398,7 +407,6 @@ void ShardRegistry::updateReplSetHosts(const ConnectionString& givenConnString,
auto newData = ShardRegistryData::createFromExisting(
_configShardData, newConnString, _shardFactory.get());
_configShardData = newData;
-
} else {
auto value = _rsmIncrement.addAndFetch(1);
LOGV2_DEBUG(4620252,
@@ -410,17 +418,7 @@ void ShardRegistry::updateReplSetHosts(const ConnectionString& givenConnString,
}
// Schedule a lookup, to incorporate the new connection string.
- _getDataAsync()
- .thenRunOn(Grid::get(_service)->getExecutorPool()->getFixedExecutor())
- .ignoreValue()
- .getAsync([](const Status& status) {
- if (!status.isOK()) {
- LOGV2(4620201,
- "Error running reload of ShardRegistry for RSM update, caused by {error}",
- "Error running reload of ShardRegistry for RSM update",
- "error"_attr = redact(status));
- }
- });
+ _scheduleLookup();
}
std::unique_ptr<Shard> ShardRegistry::createConnection(const ConnectionString& connStr) const {
@@ -557,6 +555,41 @@ std::shared_ptr<Shard> ShardRegistry::getShardForHostNoReload(const HostAndPort&
return data->findByHostAndPort(host);
}
+void ShardRegistry::_scheduleLookup() {
+ _getDataAsync()
+ .thenRunOn(Grid::get(_service)->getExecutorPool()->getFixedExecutor())
+ .ignoreValue()
+ .getAsync([](const Status& status) {
+ if (!status.isOK()) {
+ LOGV2(4620201,
+ "Error running reload of ShardRegistry for RSM update, caused by {error}",
+ "Error running reload of ShardRegistry for RSM update",
+ "error"_attr = redact(status));
+ }
+ });
+}
+
+void ShardRegistry::_initConfigShard(WithLock wl, const ConnectionString& configCS) {
+ _configShardData = ShardRegistryData::createWithConfigShardOnly(
+ _shardFactory->createShard(ShardId::kConfigServerId, configCS));
+ _latestConnStrings[configCS.getSetName()] = configCS;
+}
+
+void ShardRegistry::initConfigShardIfNecessary(const ConnectionString& configCS) {
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (_isInitialized.load()) {
+ return;
+ }
+
+ _initConfigShard(lk, configCS);
+ _isInitialized.store(true);
+ }
+
+ // Lookup can succeed now that the registry has a real config shard, so schedule one right away.
+ _scheduleLookup();
+}
+
////////////// ShardRegistryData //////////////////
ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr<Shard> configShard) {
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index fa29fbcc146..13daa987df8 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -187,13 +187,14 @@ public:
*/
ShardRegistry(ServiceContext* service,
std::unique_ptr<ShardFactory> shardFactory,
- const ConnectionString& configServerCS,
+ const boost::optional<ConnectionString>& configServerCS,
std::vector<ShardRemovalHook> shardRemovalHooks = {});
~ShardRegistry();
/**
- * Initializes ShardRegistry with config shard.
+ * Initializes ShardRegistry with config shard, if a connection string was provided at
+ * construction.
*
* The creation of the config shard object will intialize the associated RSM monitor that in
* turn will call ShardRegistry::updateReplSetHosts(). Hence the config shard object MUST be
@@ -202,6 +203,12 @@ public:
void init();
/**
+ * Sets up the registry's config shard from the given connection string. Only takes effect if
+ * the registry has not already done this.
+ */
+ void initConfigShardIfNecessary(const ConnectionString& configCS);
+
+ /**
* Startup the periodic reloader of the ShardRegistry.
* Can be called only after ShardRegistry::init()
*/
@@ -412,6 +419,11 @@ private:
SharedSemiFuture<Cache::ValueHandle> _getDataAsync();
/**
+ * Triggers a reload without waiting for it to complete.
+ */
+ void _scheduleLookup();
+
+ /**
* Gets the latest-cached copy of the ShardRegistryData. Never fetches from the config servers.
* Only used by the "NoReload" accessors.
* TODO SERVER-50206: Remove usage of this non-causally consistent accessor.
@@ -426,6 +438,8 @@ private:
void _initializeCacheIfNecessary() const;
+ void _initConfigShard(WithLock, const ConnectionString& configCS);
+
SharedSemiFuture<Cache::ValueHandle> _reloadAsync();
ServiceContext* _service{nullptr};
@@ -439,7 +453,7 @@ private:
* Specified in the ShardRegistry c-tor. It's used only in init() to initialize the config
* shard.
*/
- const ConnectionString _initConfigServerCS;
+ const boost::optional<ConnectionString> _initConfigServerCS;
/**
* A list of callbacks to be called asynchronously when it has been discovered that a shard was
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 953d2f83612..658c6de68ed 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -264,8 +264,14 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand(
getMoreBob->append("collection", data.nss.coll());
};
- const Milliseconds requestTimeout =
- std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeMSOverride);
+ const Milliseconds requestTimeout = [&] {
+ auto minMaxTimeMS = std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeMSOverride);
+ if (minMaxTimeMS < Milliseconds::max()) {
+ return minMaxTimeMS;
+ }
+ // The Fetcher expects kNoTimeout when there is no maxTimeMS instead of Milliseconds::max().
+ return RemoteCommandRequest::kNoTimeout;
+ }();
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
Fetcher fetcher(executor.get(),
@@ -307,6 +313,17 @@ StatusWith<Shard::QueryResponse> ShardRemote::_runExhaustiveCursorCommand(
return response;
}
+Milliseconds getExhaustiveFindOnConfigMaxTimeMS(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ // Don't use a timeout on the config server to guarantee it can always refresh.
+ return Milliseconds::max();
+ }
+
+ return std::min(opCtx->getRemainingMaxTimeMillis(),
+ nss == ChunkType::ConfigNS ? Milliseconds(gFindChunksOnConfigTimeoutMS.load())
+ : Shard::kDefaultConfigCommandTimeout);
+}
StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig(
OperationContext* opCtx,
@@ -341,10 +358,7 @@ StatusWith<Shard::QueryResponse> ShardRemote::_exhaustiveFindOnConfig(
return bob.done().getObjectField(repl::ReadConcernArgs::kReadConcernFieldName).getOwned();
}();
- const Milliseconds maxTimeMS =
- std::min(opCtx->getRemainingMaxTimeMillis(),
- nss == ChunkType::ConfigNS ? Milliseconds(gFindChunksOnConfigTimeoutMS.load())
- : kDefaultConfigCommandTimeout);
+ const Milliseconds maxTimeMS = getExhaustiveFindOnConfigMaxTimeMS(opCtx, nss);
BSONObjBuilder findCmdBuilder;
diff --git a/src/mongo/s/cluster_identity_loader.cpp b/src/mongo/s/cluster_identity_loader.cpp
index b51d0854b90..67537e4cb2c 100644
--- a/src/mongo/s/cluster_identity_loader.cpp
+++ b/src/mongo/s/cluster_identity_loader.cpp
@@ -37,7 +37,6 @@
#include "mongo/db/service_context.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/catalog/type_config_version.h"
-#include "mongo/s/grid.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
@@ -64,6 +63,7 @@ OID ClusterIdentityLoader::getClusterId() {
}
Status ClusterIdentityLoader::loadClusterId(OperationContext* opCtx,
+ ShardingCatalogClient* catalogClient,
const repl::ReadConcernLevel& readConcernLevel) {
stdx::unique_lock<Latch> lk(_mutex);
if (_initializationState == InitializationState::kInitialized) {
@@ -82,7 +82,7 @@ Status ClusterIdentityLoader::loadClusterId(OperationContext* opCtx,
_initializationState = InitializationState::kLoading;
lk.unlock();
- auto loadStatus = _fetchClusterIdFromConfig(opCtx, readConcernLevel);
+ auto loadStatus = _fetchClusterIdFromConfig(opCtx, catalogClient, readConcernLevel);
lk.lock();
invariant(_initializationState == InitializationState::kLoading);
@@ -97,8 +97,9 @@ Status ClusterIdentityLoader::loadClusterId(OperationContext* opCtx,
}
StatusWith<OID> ClusterIdentityLoader::_fetchClusterIdFromConfig(
- OperationContext* opCtx, const repl::ReadConcernLevel& readConcernLevel) {
- auto catalogClient = Grid::get(opCtx)->catalogClient();
+ OperationContext* opCtx,
+ ShardingCatalogClient* catalogClient,
+ const repl::ReadConcernLevel& readConcernLevel) {
auto loadResult = catalogClient->getConfigVersion(opCtx, readConcernLevel);
if (!loadResult.isOK()) {
return loadResult.getStatus().withContext("Error loading clusterID");
diff --git a/src/mongo/s/cluster_identity_loader.h b/src/mongo/s/cluster_identity_loader.h
index 693c62483ad..43e51b9dc17 100644
--- a/src/mongo/s/cluster_identity_loader.h
+++ b/src/mongo/s/cluster_identity_loader.h
@@ -34,6 +34,7 @@
#include "mongo/bson/oid.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/platform/mutex.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/hierarchical_acquisition.h"
@@ -73,7 +74,9 @@ public:
* If another thread is already in the process of loading the cluster ID, concurrent calls will
* wait for that thread to finish and then return its results.
*/
- Status loadClusterId(OperationContext* opCtx, const repl::ReadConcernLevel& readConcernLevel);
+ Status loadClusterId(OperationContext* opCtx,
+ ShardingCatalogClient* catalogClient,
+ const repl::ReadConcernLevel& readConcernLevel);
/**
* Called if the config.version document is rolled back. Notifies the ClusterIdentityLoader
@@ -93,6 +96,7 @@ private:
* the version document, and returns it.
*/
StatusWith<OID> _fetchClusterIdFromConfig(OperationContext* opCtx,
+ ShardingCatalogClient* catalogClient,
const repl::ReadConcernLevel& readConcernLevel);
Mutex _mutex =
diff --git a/src/mongo/s/cluster_identity_loader_test.cpp b/src/mongo/s/cluster_identity_loader_test.cpp
index 363bb33ceb3..1a06260d134 100644
--- a/src/mongo/s/cluster_identity_loader_test.cpp
+++ b/src/mongo/s/cluster_identity_loader_test.cpp
@@ -104,9 +104,10 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) {
// The first time you ask for the cluster ID it will have to be loaded from the config servers.
auto future = launchAsync([&] {
- auto clusterIdStatus =
- ClusterIdentityLoader::get(operationContext())
- ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ auto clusterIdStatus = ClusterIdentityLoader::get(operationContext())
+ ->loadClusterId(operationContext(),
+ catalogClient(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
@@ -117,18 +118,20 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) {
// Subsequent requests for the cluster ID should not require any network traffic as we consult
// the cached version.
- ASSERT_OK(
- ClusterIdentityLoader::get(operationContext())
- ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern));
+ ASSERT_OK(ClusterIdentityLoader::get(operationContext())
+ ->loadClusterId(operationContext(),
+ catalogClient(),
+ repl::ReadConcernLevel::kMajorityReadConcern));
}
TEST_F(ClusterIdentityTest, NoConfigVersionDocument) {
// If no version document is found on config server loadClusterId will return an error
auto future = launchAsync([&] {
- ASSERT_EQ(
- ClusterIdentityLoader::get(operationContext())
- ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern),
- ErrorCodes::NoMatchingDocument);
+ ASSERT_EQ(ClusterIdentityLoader::get(operationContext())
+ ->loadClusterId(operationContext(),
+ catalogClient(),
+ repl::ReadConcernLevel::kMajorityReadConcern),
+ ErrorCodes::NoMatchingDocument);
});
expectConfigVersionLoad(
@@ -141,23 +144,26 @@ TEST_F(ClusterIdentityTest, MultipleThreadsLoadingSuccess) {
// Check that multiple threads calling getClusterId at once still results in only one network
// operation.
auto future1 = launchAsync([&] {
- auto clusterIdStatus =
- ClusterIdentityLoader::get(operationContext())
- ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ auto clusterIdStatus = ClusterIdentityLoader::get(operationContext())
+ ->loadClusterId(operationContext(),
+ catalogClient(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
auto future2 = launchAsync([&] {
- auto clusterIdStatus =
- ClusterIdentityLoader::get(operationContext())
- ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ auto clusterIdStatus = ClusterIdentityLoader::get(operationContext())
+ ->loadClusterId(operationContext(),
+ catalogClient(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
auto future3 = launchAsync([&] {
- auto clusterIdStatus =
- ClusterIdentityLoader::get(operationContext())
- ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ auto clusterIdStatus = ClusterIdentityLoader::get(operationContext())
+ ->loadClusterId(operationContext(),
+ catalogClient(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
@@ -173,9 +179,10 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) {
// The first time you ask for the cluster ID it will have to be loaded from the config servers.
auto future = launchAsync([&] {
- auto clusterIdStatus =
- ClusterIdentityLoader::get(operationContext())
- ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ auto clusterIdStatus = ClusterIdentityLoader::get(operationContext())
+ ->loadClusterId(operationContext(),
+ catalogClient(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus);
});
@@ -186,9 +193,10 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) {
// After a failure to load the cluster ID, subsequent attempts to get the cluster ID should
// retry loading it.
future = launchAsync([&] {
- auto clusterIdStatus =
- ClusterIdentityLoader::get(operationContext())
- ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern);
+ auto clusterIdStatus = ClusterIdentityLoader::get(operationContext())
+ ->loadClusterId(operationContext(),
+ catalogClient(),
+ repl::ReadConcernLevel::kMajorityReadConcern);
ASSERT_OK(clusterIdStatus);
ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId());
});
diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp
index 2fb23a43bf9..3be6957b8c6 100644
--- a/src/mongo/s/config_server_catalog_cache_loader.cpp
+++ b/src/mongo/s/config_server_catalog_cache_loader.cpp
@@ -34,6 +34,7 @@
#include <memory>
+#include "mongo/db/catalog_shard_feature_flag_gen.h"
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -58,7 +59,10 @@ CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx,
const NamespaceString& nss,
ChunkVersion sinceVersion) {
const auto readConcern = [&]() -> repl::ReadConcernArgs {
- if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer &&
+ !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) {
+ // When the feature flag is on, the config server may read from a secondary which may
+ // need to wait for replication, so we should use afterClusterTime.
return {repl::ReadConcernLevel::kSnapshotReadConcern};
} else {
const auto vcTime = VectorClock::get(opCtx)->getTime();
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 4df705e4a88..18ba621c157 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -239,8 +239,9 @@ Status loadGlobalSettingsFromConfigServer(OperationContext* opCtx) {
}
try {
+ auto catalogClient = Grid::get(opCtx)->catalogClient();
uassertStatusOK(ClusterIdentityLoader::get(opCtx)->loadClusterId(
- opCtx, repl::ReadConcernLevel::kMajorityReadConcern));
+ opCtx, catalogClient, repl::ReadConcernLevel::kMajorityReadConcern));
// Assert will be raised on failure to talk to config server.
loadCWWCFromConfigServerForReplication(opCtx);
return Status::OK();