From 76bf2602159dece2a833073924638aa3b0796205 Mon Sep 17 00:00:00 2001 From: Jack Mulrow Date: Tue, 20 Dec 2022 21:25:37 +0000 Subject: Revert "SERVER-72088 Use ShardRemote for config shard in ShardRegistry" This reverts commit 07118e1f61dddb68170b7aa095166028b284d687. --- src/mongo/db/catalog_shard_feature_flag.idl | 9 --- src/mongo/db/mongod_main.cpp | 19 ++++- ...replication_coordinator_external_state_impl.cpp | 4 +- src/mongo/db/s/balancer/balancer.cpp | 25 +++---- .../balancer_chunk_selection_policy_impl.cpp | 15 ++-- .../balancer_defragmentation_policy_impl.cpp | 48 ++++++------ src/mongo/db/s/balancer/balancer_policy.cpp | 3 +- .../db/s/balancer/cluster_statistics_impl.cpp | 6 +- .../configsvr_abort_reshard_collection_command.cpp | 4 +- ...onfigsvr_cleanup_reshard_collection_command.cpp | 2 +- .../config/configsvr_clear_jumbo_flag_command.cpp | 2 +- ...configsvr_commit_reshard_collection_command.cpp | 3 +- ...nfigsvr_refine_collection_shard_key_command.cpp | 2 +- .../s/config/configsvr_remove_chunks_command.cpp | 15 ++-- .../db/s/config/configsvr_remove_shard_command.cpp | 2 +- .../db/s/config/configsvr_remove_tags_command.cpp | 15 ++-- .../s/config/configsvr_reshard_collection_cmd.cpp | 3 +- .../sharding_catalog_manager_add_shard_test.cpp | 4 +- .../sharding_catalog_manager_chunk_operations.cpp | 17 +++-- .../sharding_catalog_manager_remove_shard_test.cpp | 4 +- src/mongo/db/s/config_server_op_observer_test.cpp | 4 +- .../periodic_sharded_index_consistency_checker.cpp | 4 +- .../resharding/resharding_coordinator_service.cpp | 12 +-- src/mongo/db/s/sharding_initialization_mongod.cpp | 72 ++---------------- src/mongo/db/s/sharding_initialization_mongod.h | 12 +-- .../s/catalog/sharding_catalog_client_impl.cpp | 22 +----- src/mongo/s/catalog_cache.cpp | 6 +- src/mongo/s/client/shard_registry.cpp | 85 +++++++--------------- src/mongo/s/client/shard_registry.h | 20 +---- src/mongo/s/client/shard_remote.cpp | 26 ++----- src/mongo/s/cluster_identity_loader.cpp | 9 +-- src/mongo/s/cluster_identity_loader.h | 6 +- src/mongo/s/cluster_identity_loader_test.cpp | 58 +++++++-------- src/mongo/s/config_server_catalog_cache_loader.cpp | 6 +- src/mongo/s/sharding_initialization.cpp | 3 +- 35 files changed, 182 insertions(+), 365 deletions(-) diff --git a/src/mongo/db/catalog_shard_feature_flag.idl b/src/mongo/db/catalog_shard_feature_flag.idl index 490ba239ad0..256489888e9 100644 --- a/src/mongo/db/catalog_shard_feature_flag.idl +++ b/src/mongo/db/catalog_shard_feature_flag.idl @@ -34,12 +34,3 @@ feature_flags: description: "Feature flag for enabling shared config server/shard server cluster role" cpp_varname: gFeatureFlagCatalogShard default: false - # TODO SERVER-72282: Replace with featureFlagCatalogShard once it is stable. - # - # Temporary feature flag to get coverage for always using config server ShardRemote in the all - # feature flags variants. Can be replaced with featureFlagCatalogShard once that flag is stable - # enough to run in the all feature flags variants. - featureFlagConfigServerAlwaysShardRemote: - description: "Feature flag for always using a ShardRemote for ShardRegistry config shard" - cpp_varname: gFeatureFlagConfigServerAlwaysShardRemote - default: false diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 062624f2d9f..afd65cb1266 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -137,6 +137,7 @@ #include "mongo/db/s/collection_sharding_state_factory_shard.h" #include "mongo/db/s/collection_sharding_state_factory_standalone.h" #include "mongo/db/s/config/configsvr_coordinator_service.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/config_server_op_observer.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/op_observer_sharding_impl.h" @@ -739,7 +740,23 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { } if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - initializeGlobalShardingStateForConfigServer(startupOpCtx.get()); + initializeGlobalShardingStateForMongoD( + startupOpCtx.get(), ShardId::kConfigServerId, ConnectionString::forLocal()); + + // ShardLocal to use for explicitly local commands on the config server. + auto localConfigShard = + Grid::get(serviceContext)->shardRegistry()->createLocalConfigShard(); + auto localCatalogClient = std::make_unique(localConfigShard); + + ShardingCatalogManager::create( + startupOpCtx->getServiceContext(), + makeShardingTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor")), + std::move(localConfigShard), + std::move(localCatalogClient)); + + if (!gFeatureFlagCatalogShard.isEnabledAndIgnoreFCV()) { + Grid::get(startupOpCtx.get())->setShardingInitialized(); + } } if (serverGlobalParams.clusterRole == ClusterRole::None && diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index f83fe715268..92b8d4d1dd3 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -917,9 +917,7 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook // to ShardingCatalogManager::initializeConfigDatabaseIfNeeded above), this read can // only meaningfully fail if the node is shutting down. status = ClusterIdentityLoader::get(opCtx)->loadClusterId( - opCtx, - ShardingCatalogManager::get(opCtx)->localCatalogClient(), - repl::ReadConcernLevel::kLocalReadConcern); + opCtx, repl::ReadConcernLevel::kLocalReadConcern); if (ErrorCodes::isShutdownError(status.code())) { return; diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index d7e3839b533..3b0a5777c9b 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -382,9 +382,8 @@ Status Balancer::rebalanceSingleChunk(OperationContext* opCtx, return refreshStatus; } - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - auto coll = - catalogClient->getCollection(opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern); + auto coll = Grid::get(opCtx)->catalogClient()->getCollection( + opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern); auto maxChunkSize = coll.getMaxChunkSizeBytes().value_or(balancerConfig->getMaxChunkSizeBytes()); @@ -409,9 +408,8 @@ Status Balancer::moveSingleChunk(OperationContext* opCtx, return moveAllowedStatus; } - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - auto coll = - catalogClient->getCollection(opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern); + auto coll = Grid::get(opCtx)->catalogClient()->getCollection( + opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern); const auto maxChunkSize = getMaxChunkSizeBytes(opCtx, coll); MoveChunkSettings settings(maxChunkSize, secondaryThrottle, waitForDelete); @@ -428,9 +426,8 @@ Status Balancer::moveRange(OperationContext* opCtx, const NamespaceString& nss, const ConfigsvrMoveRange& request, bool issuedByRemoteUser) { - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - auto coll = - catalogClient->getCollection(opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern); + auto coll = Grid::get(opCtx)->catalogClient()->getCollection( + opCtx, nss, repl::ReadConcernLevel::kMajorityReadConcern); const auto maxChunkSize = getMaxChunkSizeBytes(opCtx, coll); const auto [fromShardId, min] = [&]() { @@ -657,7 +654,6 @@ void Balancer::_mainThread() { Client::initThread("Balancer"); auto opCtx = cc().makeOperationContext(); auto shardingContext = Grid::get(opCtx.get()); - const auto catalogClient = ShardingCatalogManager::get(opCtx.get())->localCatalogClient(); LOGV2(21856, "CSRS balancer is starting"); @@ -771,7 +767,7 @@ void Balancer::_mainThread() { // Collect and apply up-to-date configuration values on the cluster collections. { OperationContext* ctx = opCtx.get(); - auto allCollections = catalogClient->getCollections(ctx, {}); + auto allCollections = Grid::get(ctx)->catalogClient()->getCollections(ctx, {}); for (const auto& coll : allCollections) { _defragmentationPolicy->startCollectionDefragmentation(ctx, coll); } @@ -1024,7 +1020,7 @@ int Balancer::_moveChunks(OperationContext* opCtx, const MigrateInfoVector& chunksToRebalance, const MigrateInfoVector& chunksToDefragment) { auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); - auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + auto catalogClient = Grid::get(opCtx)->catalogClient(); // If the balancer was disabled since we started this round, don't start new chunk moves if (_stopRequested() || !balancerConfig->shouldBalance() || @@ -1041,7 +1037,7 @@ int Balancer::_moveChunks(OperationContext* opCtx, return *migrateInfo.optMaxChunkSizeBytes; } - auto coll = catalogClient->getCollection( + auto coll = Grid::get(opCtx)->catalogClient()->getCollection( opCtx, migrateInfo.nss, repl::ReadConcernLevel::kMajorityReadConcern); return coll.getMaxChunkSizeBytes().value_or(balancerConfig->getMaxChunkSizeBytes()); }(); @@ -1160,10 +1156,9 @@ SharedSemiFuture Balancer::applyLegacyChunkSizeConstraintsOnClusterData( BalancerCollectionStatusResponse Balancer::getBalancerStatusForNs(OperationContext* opCtx, const NamespaceString& ns) { - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); CollectionType coll; try { - coll = catalogClient->getCollection(opCtx, ns, {}); + coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, ns, {}); } catch (const ExceptionFor&) { uasserted(ErrorCodes::NamespaceNotSharded, "Collection unsharded or undefined"); } diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index 0cdcb5727fb..98f9709efeb 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -38,7 +38,6 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj_comparator_interface.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/sharding_config_server_parameters_gen.h" #include "mongo/db/s/sharding_util.h" #include "mongo/logv2/log.h" @@ -176,8 +175,7 @@ getDataSizeInfoForCollections(OperationContext* opCtx, CollectionDataSizeInfoForBalancing getDataSizeInfoForCollection(OperationContext* opCtx, const NamespaceString& nss) { - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - const auto coll = catalogClient->getCollection(opCtx, nss); + const auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss); std::vector vec{coll}; return std::move(getDataSizeInfoForCollections(opCtx, vec).at(nss)); } @@ -350,8 +348,7 @@ StatusWith BalancerChunkSelectionPolicyImpl::selectChunksToSpli const auto& shardStats = shardStatsStatus.getValue(); - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - auto collections = catalogClient->getCollections(opCtx, {}); + auto collections = Grid::get(opCtx)->catalogClient()->getCollections(opCtx, {}); if (collections.empty()) { return SplitInfoVector{}; } @@ -415,8 +412,7 @@ StatusWith BalancerChunkSelectionPolicyImpl::selectChunksToMo return MigrateInfoVector{}; } - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - auto collections = catalogClient->getCollections(opCtx, {}); + auto collections = Grid::get(opCtx)->catalogClient()->getCollections(opCtx, {}); if (collections.empty()) { return MigrateInfoVector{}; } @@ -510,7 +506,7 @@ StatusWith BalancerChunkSelectionPolicyImpl::selectChunk // Used to check locally if the collection exists, it should trow NamespaceNotFound if it // doesn't. - ShardingCatalogManager::get(opCtx)->localCatalogClient()->getCollection(opCtx, nss); + Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss); stdx::unordered_set availableShards; std::transform(shardStats.begin(), @@ -573,8 +569,7 @@ Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* opCt return shardStatsStatus.getStatus(); } - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - const CollectionType collection = catalogClient->getCollection( + const CollectionType collection = Grid::get(opCtx)->catalogClient()->getCollection( opCtx, chunk.getCollectionUUID(), repl::ReadConcernLevel::kLocalReadConcern); const auto& nss = collection.getNss(); diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp index c7d43d723df..971c7eeddc9 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp @@ -69,17 +69,16 @@ ShardVersion getShardVersion(OperationContext* opCtx, } std::vector getCollectionChunks(OperationContext* opCtx, const CollectionType& coll) { - auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - return uassertStatusOK( - catalogClient->getChunks(opCtx, - BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/, - BSON(ChunkType::min() << 1) /*sort*/, - boost::none /*limit*/, - nullptr /*opTime*/, - coll.getEpoch(), - coll.getTimestamp(), - repl::ReadConcernLevel::kLocalReadConcern, - boost::none)); + return uassertStatusOK(Grid::get(opCtx)->catalogClient()->getChunks( + opCtx, + BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/, + BSON(ChunkType::min() << 1) /*sort*/, + boost::none /*limit*/, + nullptr /*opTime*/, + coll.getEpoch(), + coll.getTimestamp(), + repl::ReadConcernLevel::kLocalReadConcern, + boost::none)); } uint64_t getCollectionMaxChunkSizeBytes(OperationContext* opCtx, const CollectionType& coll) { @@ -1182,17 +1181,16 @@ class SplitChunksPhase : public DefragmentationPhase { public: static std::unique_ptr build(OperationContext* opCtx, const CollectionType& coll) { - auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - auto collectionChunks = uassertStatusOK( - catalogClient->getChunks(opCtx, - BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/, - BSON(ChunkType::min() << 1) /*sort*/, - boost::none /*limit*/, - nullptr /*opTime*/, - coll.getEpoch(), - coll.getTimestamp(), - repl::ReadConcernLevel::kLocalReadConcern, - boost::none)); + auto collectionChunks = uassertStatusOK(Grid::get(opCtx)->catalogClient()->getChunks( + opCtx, + BSON(ChunkType::collectionUUID() << coll.getUuid()) /*query*/, + BSON(ChunkType::min() << 1) /*sort*/, + boost::none /*limit*/, + nullptr /*opTime*/, + coll.getEpoch(), + coll.getTimestamp(), + repl::ReadConcernLevel::kLocalReadConcern, + boost::none)); stdx::unordered_map pendingActionsByShards; @@ -1423,8 +1421,7 @@ void BalancerDefragmentationPolicyImpl::startCollectionDefragmentation(Operation void BalancerDefragmentationPolicyImpl::abortCollectionDefragmentation(OperationContext* opCtx, const NamespaceString& nss) { stdx::lock_guard lk(_stateMutex); - auto coll = - ShardingCatalogManager::get(opCtx)->localCatalogClient()->getCollection(opCtx, nss, {}); + auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss, {}); if (coll.getDefragmentCollection()) { if (_defragmentationStates.contains(coll.getUuid())) { // Notify phase to abort current phase @@ -1596,8 +1593,7 @@ bool BalancerDefragmentationPolicyImpl::_advanceToNextActionablePhase(OperationC boost::optional coll(boost::none); while (phaseTransitionNeeded()) { if (!coll) { - coll = ShardingCatalogManager::get(opCtx)->localCatalogClient()->getCollection( - opCtx, collUuid); + coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, collUuid); } currentPhase = _transitionPhases(opCtx, *coll, currentPhase->getNextPhase()); advanced = true; diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp index 71d42629e52..8f28c3cef8f 100644 --- a/src/mongo/db/s/balancer/balancer_policy.cpp +++ b/src/mongo/db/s/balancer/balancer_policy.cpp @@ -35,7 +35,6 @@ #include #include "mongo/db/s/balancer/type_migration.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/logv2/log.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_shard.h" @@ -201,7 +200,7 @@ StatusWith ZoneInfo::getZonesForCollection(OperationContext* opCtx, const NamespaceString& nss, const KeyPattern& keyPattern) { const auto swCollectionZones = - ShardingCatalogManager::get(opCtx)->localCatalogClient()->getTagsForCollection(opCtx, nss); + Grid::get(opCtx)->catalogClient()->getTagsForCollection(opCtx, nss); if (!swCollectionZones.isOK()) { return swCollectionZones.getStatus().withContext( str::stream() << "Unable to load zones for collection " << nss); diff --git a/src/mongo/db/s/balancer/cluster_statistics_impl.cpp b/src/mongo/db/s/balancer/cluster_statistics_impl.cpp index 97e114b0f8d..d1bf5ee2e80 100644 --- a/src/mongo/db/s/balancer/cluster_statistics_impl.cpp +++ b/src/mongo/db/s/balancer/cluster_statistics_impl.cpp @@ -37,7 +37,6 @@ #include "mongo/base/status_with.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/read_preference.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" @@ -116,9 +115,8 @@ StatusWith> ClusterStatisticsImpl::_getStats( // db.serverStatus() (mem.mapped) to all shards. // // TODO: skip unresponsive shards and mark information as stale. - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - auto shardsStatus = - catalogClient->getAllShards(opCtx, repl::ReadConcernLevel::kMajorityReadConcern); + auto shardsStatus = Grid::get(opCtx)->catalogClient()->getAllShards( + opCtx, repl::ReadConcernLevel::kMajorityReadConcern); if (!shardsStatus.isOK()) { return shardsStatus.getStatus(); } diff --git a/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp b/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp index 57f11dd1472..bfedddd6a3a 100644 --- a/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_abort_reshard_collection_command.cpp @@ -51,8 +51,8 @@ UUID retrieveReshardingUUID(OperationContext* opCtx, const NamespaceString& ns) repl::ReadConcernArgs::get(opCtx) = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - const auto collEntry = - ShardingCatalogManager::get(opCtx)->localCatalogClient()->getCollection(opCtx, ns); + const auto catalogClient = Grid::get(opCtx)->catalogClient(); + const auto collEntry = catalogClient->getCollection(opCtx, ns); uassert(ErrorCodes::NoSuchReshardCollection, "Could not find resharding-related metadata that matches the given namespace", diff --git a/src/mongo/db/s/config/configsvr_cleanup_reshard_collection_command.cpp b/src/mongo/db/s/config/configsvr_cleanup_reshard_collection_command.cpp index dcaf9bb5b4d..d3a253f6610 100644 --- a/src/mongo/db/s/config/configsvr_cleanup_reshard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_cleanup_reshard_collection_command.cpp @@ -90,7 +90,7 @@ public: repl::ReadConcernArgs::get(opCtx) = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + const auto catalogClient = Grid::get(opCtx)->catalogClient(); auto collEntry = catalogClient->getCollection(opCtx, ns()); if (!collEntry.getReshardingFields()) { // If the collection entry doesn't have resharding fields, we assume that the diff --git a/src/mongo/db/s/config/configsvr_clear_jumbo_flag_command.cpp b/src/mongo/db/s/config/configsvr_clear_jumbo_flag_command.cpp index e8b3761e21a..baf26278f65 100644 --- a/src/mongo/db/s/config/configsvr_clear_jumbo_flag_command.cpp +++ b/src/mongo/db/s/config/configsvr_clear_jumbo_flag_command.cpp @@ -63,7 +63,7 @@ public: repl::ReadConcernArgs::get(opCtx) = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + const auto catalogClient = Grid::get(opCtx)->catalogClient(); CollectionType collType; try { diff --git a/src/mongo/db/s/config/configsvr_commit_reshard_collection_command.cpp b/src/mongo/db/s/config/configsvr_commit_reshard_collection_command.cpp index 0fa75ee14b3..8e151c60625 100644 --- a/src/mongo/db/s/config/configsvr_commit_reshard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_commit_reshard_collection_command.cpp @@ -33,7 +33,6 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/repl/primary_only_service.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/resharding/resharding_coordinator_service.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" #include "mongo/logv2/log.h" @@ -52,7 +51,7 @@ UUID retrieveReshardingUUID(OperationContext* opCtx, const NamespaceString& ns) repl::ReadConcernArgs::get(opCtx) = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + const auto catalogClient = Grid::get(opCtx)->catalogClient(); const auto collEntry = catalogClient->getCollection(opCtx, ns); uassert(ErrorCodes::NoSuchReshardCollection, diff --git a/src/mongo/db/s/config/configsvr_refine_collection_shard_key_command.cpp b/src/mongo/db/s/config/configsvr_refine_collection_shard_key_command.cpp index e952b0e9cd4..01c13128c15 100644 --- a/src/mongo/db/s/config/configsvr_refine_collection_shard_key_command.cpp +++ b/src/mongo/db/s/config/configsvr_refine_collection_shard_key_command.cpp @@ -74,7 +74,7 @@ public: repl::ReadConcernArgs::get(opCtx) = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + const auto catalogClient = Grid::get(opCtx)->catalogClient(); // Validate the given namespace is (i) sharded, (ii) doesn't already have the proposed // key, and (iii) has the same epoch as the router that received diff --git a/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp index a842c597425..c54390d759a 100644 --- a/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp +++ b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp @@ -35,7 +35,6 @@ #include "mongo/db/commands.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/remove_chunks_gen.h" #include "mongo/db/session/session_catalog_mongod.h" #include "mongo/db/transaction/transaction_participant.h" @@ -95,13 +94,13 @@ public: // Write with localWriteConcern because we cannot wait for replication with a // session checked out. The command will wait for majority WC on the epilogue after // the session has been checked in. - const auto catalogClient = - ShardingCatalogManager::get(newOpCtxPtr.get())->localCatalogClient(); - uassertStatusOK(catalogClient->removeConfigDocuments( - newOpCtxPtr.get(), - ChunkType::ConfigNS, - BSON(ChunkType::collectionUUID << collectionUUID), - ShardingCatalogClient::kLocalWriteConcern)); + uassertStatusOK( + Grid::get(newOpCtxPtr.get()) + ->catalogClient() + ->removeConfigDocuments(newOpCtxPtr.get(), + ChunkType::ConfigNS, + BSON(ChunkType::collectionUUID << collectionUUID), + ShardingCatalogClient::kLocalWriteConcern)); } // Since we no write happened on this txnNumber, we need to make a dummy write so that diff --git a/src/mongo/db/s/config/configsvr_remove_shard_command.cpp b/src/mongo/db/s/config/configsvr_remove_shard_command.cpp index 486de3cb501..efebf6f1cf9 100644 --- a/src/mongo/db/s/config/configsvr_remove_shard_command.cpp +++ b/src/mongo/db/s/config/configsvr_remove_shard_command.cpp @@ -118,7 +118,7 @@ public: return shard->getId(); }(); - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + const auto catalogClient = Grid::get(opCtx)->catalogClient(); const auto shardingCatalogManager = ShardingCatalogManager::get(opCtx); const auto shardDrainingStatus = [&] { diff --git a/src/mongo/db/s/config/configsvr_remove_tags_command.cpp b/src/mongo/db/s/config/configsvr_remove_tags_command.cpp index 8d0c530bc65..90b2317c001 100644 --- a/src/mongo/db/s/config/configsvr_remove_tags_command.cpp +++ b/src/mongo/db/s/config/configsvr_remove_tags_command.cpp @@ -35,7 +35,6 @@ #include "mongo/db/commands.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/remove_tags_gen.h" #include "mongo/db/session/session_catalog_mongod.h" #include "mongo/db/transaction/transaction_participant.h" @@ -90,13 +89,13 @@ public: auto newOpCtxPtr = CancelableOperationContext( cc().makeOperationContext(), opCtx->getCancellationToken(), executor); - const auto catalogClient = - ShardingCatalogManager::get(newOpCtxPtr.get())->localCatalogClient(); - uassertStatusOK(catalogClient->removeConfigDocuments( - newOpCtxPtr.get(), - TagsType::ConfigNS, - BSON(TagsType::ns(nss.ns())), - ShardingCatalogClient::kLocalWriteConcern)); + uassertStatusOK( + Grid::get(newOpCtxPtr.get()) + ->catalogClient() + ->removeConfigDocuments(newOpCtxPtr.get(), + TagsType::ConfigNS, + BSON(TagsType::ns(nss.ns())), + ShardingCatalogClient::kLocalWriteConcern)); } // Since we no write happened on this txnNumber, we need to make a dummy write so that diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index 77d3f19dded..87267b0fbf5 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -36,7 +36,6 @@ #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/resharding/coordinator_document_gen.h" #include "mongo/db/s/resharding/resharding_coordinator_service.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" @@ -78,7 +77,7 @@ public: const NamespaceString& nss = ns(); - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + const auto catalogClient = Grid::get(opCtx)->catalogClient(); try { const auto collEntry = catalogClient->getCollection(opCtx, nss); uassert(ErrorCodes::NotImplemented, diff --git a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp index a5f1703688f..5ac1e13ff2b 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_add_shard_test.cpp @@ -85,8 +85,8 @@ protected: setUpAndInitializeConfigDb(); auto clusterIdLoader = ClusterIdentityLoader::get(operationContext()); - ASSERT_OK(clusterIdLoader->loadClusterId( - operationContext(), catalogClient(), repl::ReadConcernLevel::kLocalReadConcern)); + ASSERT_OK(clusterIdLoader->loadClusterId(operationContext(), + repl::ReadConcernLevel::kLocalReadConcern)); _clusterId = clusterIdLoader->getClusterId(); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 80ffbc46248..960ae41c796 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -1949,14 +1949,15 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction( const auto chunkQuery = BSON(ChunkType::collectionUUID << migratedChunk->getCollectionUUID() << ChunkType::shard << migratedChunk->getShard()); - auto findResponse = uassertStatusOK(_localConfigShard->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - ChunkType::ConfigNS, - chunkQuery, - BSONObj(), - 1 /* limit */)); + auto findResponse = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + chunkQuery, + BSONObj(), + 1 /* limit */)); return findResponse.docs.empty(); }(); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp index 5d00ebc34be..37da8bfe872 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_remove_shard_test.cpp @@ -85,8 +85,8 @@ protected: setUpAndInitializeConfigDb(); auto clusterIdLoader = ClusterIdentityLoader::get(operationContext()); - ASSERT_OK(clusterIdLoader->loadClusterId( - operationContext(), catalogClient(), repl::ReadConcernLevel::kLocalReadConcern)); + ASSERT_OK(clusterIdLoader->loadClusterId(operationContext(), + repl::ReadConcernLevel::kLocalReadConcern)); _clusterId = clusterIdLoader->getClusterId(); ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn()); diff --git a/src/mongo/db/s/config_server_op_observer_test.cpp b/src/mongo/db/s/config_server_op_observer_test.cpp index e2ed5c1ff6d..83a1ef8694a 100644 --- a/src/mongo/db/s/config_server_op_observer_test.cpp +++ b/src/mongo/db/s/config_server_op_observer_test.cpp @@ -45,8 +45,8 @@ protected: setUpAndInitializeConfigDb(); auto clusterIdLoader = ClusterIdentityLoader::get(operationContext()); - ASSERT_OK(clusterIdLoader->loadClusterId( - operationContext(), catalogClient(), repl::ReadConcernLevel::kLocalReadConcern)); + ASSERT_OK(clusterIdLoader->loadClusterId(operationContext(), + repl::ReadConcernLevel::kLocalReadConcern)); _clusterId = clusterIdLoader->getClusterId(); } diff --git a/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp b/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp index fb5b113216d..b5378f38e02 100644 --- a/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp +++ b/src/mongo/db/s/periodic_sharded_index_consistency_checker.cpp @@ -35,7 +35,6 @@ #include "mongo/db/auth/privilege.h" #include "mongo/db/curop.h" #include "mongo/db/operation_context.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" @@ -126,8 +125,7 @@ void PeriodicShardedIndexConsistencyChecker::_launchShardedIndexConsistencyCheck try { long long numShardedCollsWithInconsistentIndexes = 0; - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); - auto collections = catalogClient->getCollections( + auto collections = Grid::get(opCtx)->catalogClient()->getCollections( opCtx, {}, repl::ReadConcernLevel::kLocalReadConcern); for (const auto& coll : collections) { diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 8e035f95367..87a6016e264 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -589,7 +589,7 @@ void removeChunkAndTagsDocs(OperationContext* opCtx, const auto chunksQuery = BSON(ChunkType::collectionUUID() << collUUID); const auto tagDeleteOperationHint = BSON(TagsType::ns() << 1 << TagsType::min() << 1); - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + const auto catalogClient = Grid::get(opCtx)->catalogClient(); uassertStatusOK(catalogClient->removeConfigDocuments( opCtx, ChunkType::ConfigNS, chunksQuery, kMajorityWriteConcern)); @@ -844,7 +844,7 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, // collection. So don't try to call remove as it will end up removing the metadata // for the real collection. if (!wasDecisionPersisted) { - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + const auto catalogClient = Grid::get(opCtx)->catalogClient(); uassertStatusOK(catalogClient->removeConfigDocuments( opCtx, @@ -2146,9 +2146,8 @@ void ReshardingCoordinator::_updateChunkImbalanceMetrics(const NamespaceString& Grid::get(opCtx)->catalogCache()->getShardedCollectionPlacementInfoWithRefresh(opCtx, nss)); - const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); const auto collectionZones = - uassertStatusOK(catalogClient->getTagsForCollection(opCtx, nss)); + uassertStatusOK(Grid::get(opCtx)->catalogClient()->getTagsForCollection(opCtx, nss)); const auto& keyPattern = routingInfo.getShardKeyPattern().getKeyPattern(); @@ -2160,8 +2159,9 @@ void ReshardingCoordinator::_updateChunkImbalanceMetrics(const NamespaceString& tag.getTag()))); } - const auto allShardsWithOpTime = uassertStatusOK( - catalogClient->getAllShards(opCtx, repl::ReadConcernLevel::kLocalReadConcern)); + const auto allShardsWithOpTime = + uassertStatusOK(Grid::get(opCtx)->catalogClient()->getAllShards( + opCtx, repl::ReadConcernLevel::kLocalReadConcern)); auto imbalanceCount = getMaxChunkImbalanceCount(routingInfo, allShardsWithOpTime.value, zoneInfo); diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 9a4caf90e93..be5483b21db 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -38,7 +38,6 @@ #include "mongo/client/replica_set_monitor.h" #include "mongo/db/audit.h" #include "mongo/db/catalog_raii.h" -#include "mongo/db/catalog_shard_feature_flag_gen.h" #include "mongo/db/client_metadata_propagation_egress_hook.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/dbhelpers.h" @@ -49,7 +48,6 @@ #include "mongo/db/ops/update.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/chunk_splitter.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/periodic_balancer_config_refresher.h" #include "mongo/db/s/read_only_catalog_cache_loader.h" #include "mongo/db/s/shard_local.h" @@ -57,11 +55,9 @@ #include "mongo/db/s/transaction_coordinator_service.h" #include "mongo/db/server_options.h" #include "mongo/db/vector_clock_metadata_hook.h" -#include "mongo/executor/network_interface_factory.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" -#include "mongo/s/catalog/sharding_catalog_client_impl.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_factory.h" #include "mongo/s/client/shard_remote.h" @@ -122,15 +118,6 @@ public: LOGV2(471692, "Unable to update the shard registry", "error"_attr = e); } - auto const shardingState = ShardingState::get(_serviceContext); - if (!shardingState->enabled()) { - // If our sharding state isn't enabled, we don't have a shard identity document, so - // there's nothing to update. Note technically this may race with the config server - // being added as a shard, but that shouldn't be a problem since addShard will use a - // valid connection string and should serialize with a replica set reconfig. - return; - } - auto setName = connStr.getSetName(); bool updateInProgress = false; { @@ -502,15 +489,8 @@ void ShardingInitializationMongoD::updateShardIdentityConfigString( } void ShardingInitializationMongoD::onSetCurrentConfig(OperationContext* opCtx) { - if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer || - !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) { - // Only config servers capable of acting as a shard set up the config shard in their shard - // registry with a real connection string. - return; - } - - auto myConnectionString = repl::ReplicationCoordinator::get(opCtx)->getConfigConnectionString(); - Grid::get(opCtx)->shardRegistry()->initConfigShardIfNecessary(myConnectionString); + // TODO SERVER-72088: Use the connection string from the config to construct a ShardRemote for + // the config server when in catalog shard mode. } void ShardingInitializationMongoD::onInitialDataAvailable(OperationContext* opCtx, @@ -527,42 +507,10 @@ void ShardingInitializationMongoD::onInitialDataAvailable(OperationContext* opCt } } -void initializeGlobalShardingStateForConfigServer(OperationContext* opCtx) { - ShardingInitializationMongoD::get(opCtx)->installReplicaSetChangeListener( - opCtx->getServiceContext()); - - auto configCS = []() -> boost::optional { - if (gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) { - // When the config server can operate as a shard, it sets up a ShardRemote for the - // config shard, which is created later after loading the local replica set config. - return boost::none; - } - return {ConnectionString::forLocal()}; - }(); - - initializeGlobalShardingStateForMongoD(opCtx, ShardId::kConfigServerId, configCS); - - // ShardLocal to use for explicitly local commands on the config server. - auto localConfigShard = Grid::get(opCtx)->shardRegistry()->createLocalConfigShard(); - auto localCatalogClient = std::make_unique(localConfigShard); - - ShardingCatalogManager::create( - opCtx->getServiceContext(), - makeShardingTaskExecutor(executor::makeNetworkInterface("AddShard-TaskExecutor")), - std::move(localConfigShard), - std::move(localCatalogClient)); - - if (!gFeatureFlagCatalogShard.isEnabledAndIgnoreFCV()) { - Grid::get(opCtx)->setShardingInitialized(); - } -} - void initializeGlobalShardingStateForMongoD(OperationContext* opCtx, const ShardId& shardId, - const boost::optional& configCS) { - if (configCS) { - uassert(ErrorCodes::BadValue, "Unrecognized connection string.", *configCS); - } + const ConnectionString& configCS) { + uassert(ErrorCodes::BadValue, "Unrecognized connection string.", configCS); auto targeterFactory = std::make_unique(); auto targeterFactoryPtr = targeterFactory.get(); @@ -654,20 +602,16 @@ void initializeGlobalShardingStateForMongoD(OperationContext* opCtx, } } - -void ShardingInitializationMongoD::installReplicaSetChangeListener(ServiceContext* service) { - _replicaSetChangeListener = - ReplicaSetMonitor::getNotifier().makeListener(service); -} - void ShardingInitializationMongoD::_initializeShardingEnvironmentOnShardServer( OperationContext* opCtx, const ShardIdentity& shardIdentity) { - installReplicaSetChangeListener(opCtx->getServiceContext()); + _replicaSetChangeListener = + ReplicaSetMonitor::getNotifier().makeListener( + opCtx->getServiceContext()); initializeGlobalShardingStateForMongoD(opCtx, shardIdentity.getShardName().toString(), - {shardIdentity.getConfigsvrConnectionString()}); + shardIdentity.getConfigsvrConnectionString()); // Determine primary/secondary/standalone state in order to properly initialize sharding // components. diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h index 78a56d48408..a2c092fb3f2 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.h +++ b/src/mongo/db/s/sharding_initialization_mongod.h @@ -106,11 +106,6 @@ public: _initFunc = std::move(func); } - /** - * Installs a listener for RSM change notifications. - */ - void installReplicaSetChangeListener(ServiceContext* service); - private: void _initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, const ShardIdentity& shardIdentity); @@ -146,11 +141,6 @@ private: */ void initializeGlobalShardingStateForMongoD(OperationContext* opCtx, const ShardId& shardId, - const boost::optional& configCS); - -/** - * Initialize the sharding components for a config server. - */ -void initializeGlobalShardingStateForConfigServer(OperationContext* opCtx); + const ConnectionString& configCS); } // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 463e46ed9ff..2253e2ab55e 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -36,7 +36,6 @@ #include "mongo/bson/util/bson_extract.h" #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" -#include "mongo/db/catalog_shard_feature_flag_gen.h" #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" @@ -324,24 +323,11 @@ std::vector runCatalogAggregation(OperationContext* opCtx, AggregateCommandRequest& aggRequest, const repl::ReadConcernArgs& readConcern, const Milliseconds& maxTimeout) { - // Reads on the config server may run on any node in its replica set. Such reads use the config - // time as an afterClusterTime token, but config time is only inclusive of majority committed - // data, so we should not use a weaker read concern. Note if the local node is a config server, - // it can use these concerns safely with a ShardLocal, which would require relaxing this - // invariant. - invariant(readConcern.getLevel() == repl::ReadConcernLevel::kMajorityReadConcern || - readConcern.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern || - readConcern.getLevel() == repl::ReadConcernLevel::kLinearizableReadConcern, - str::stream() << "Disallowed read concern: " << readConcern.toBSONInner()); - aggRequest.setReadConcern(readConcern.toBSONInner()); aggRequest.setWriteConcern(WriteConcernOptions()); const auto readPref = [&]() -> ReadPreferenceSetting { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && - !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) { - // When the feature flag is on, the config server may read from any node in its replica - // set, so we should use the typical config server read preference. + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { return {}; } @@ -354,7 +340,6 @@ std::vector runCatalogAggregation(OperationContext* opCtx, aggRequest.setUnwrappedReadPref(readPref.toContainingBSON()); if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { - // Don't use a timeout on the config server to guarantee it can always refresh. const Milliseconds maxTimeMS = std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeout); aggRequest.setMaxTimeMS(durationCount(maxTimeMS)); } @@ -555,10 +540,7 @@ std::vector makeAndRunPlacementHistoryAggregation( // Run the aggregation const auto readConcern = [&]() -> repl::ReadConcernArgs { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && - !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) { - // When the feature flag is on, the config server may read from a secondary which may - // need to wait for replication, so we should use afterClusterTime. + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { return {repl::ReadConcernLevel::kMajorityReadConcern}; } else { const auto time = VectorClock::get(opCtx)->getTime(); diff --git a/src/mongo/s/catalog_cache.cpp b/src/mongo/s/catalog_cache.cpp index 4c261b8b9cb..0bfb5078009 100644 --- a/src/mongo/s/catalog_cache.cpp +++ b/src/mongo/s/catalog_cache.cpp @@ -32,7 +32,6 @@ #include #include "mongo/bson/bsonobjbuilder.h" -#include "mongo/db/catalog_shard_feature_flag_gen.h" #include "mongo/db/curop.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/optime_with.h" @@ -1006,10 +1005,7 @@ CatalogCache::IndexCache::LookupResult CatalogCache::IndexCache::_lookupIndexes( "timeInStore"_attr = previousVersion); const auto readConcern = [&]() -> repl::ReadConcernArgs { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && - !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) { - // When the feature flag is on, the config server may read from a secondary which - // may need to wait for replication, so we should use afterClusterTime. + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { return {repl::ReadConcernLevel::kSnapshotReadConcern}; } else { const auto vcTime = VectorClock::get(opCtx)->getTime(); diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index ef939a50137..120562f6c3d 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -33,7 +33,6 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/client/replica_set_monitor.h" -#include "mongo/db/catalog_shard_feature_flag_gen.h" #include "mongo/db/client.h" #include "mongo/db/vector_clock.h" #include "mongo/db/vector_clock_metadata_hook.h" @@ -66,7 +65,7 @@ using CallbackArgs = executor::TaskExecutor::CallbackArgs; ShardRegistry::ShardRegistry(ServiceContext* service, std::unique_ptr shardFactory, - const boost::optional& configServerCS, + const ConnectionString& configServerCS, std::vector shardRemovalHooks) : _service(service), _shardFactory(std::move(shardFactory)), @@ -89,12 +88,7 @@ ShardRegistry::ShardRegistry(ServiceContext* service, const Time& timeInStore) { return _lookup(opCtx, key, cachedData, timeInStore); }, 1 /* cacheSize */)) { - if (_initConfigServerCS) { - invariant(_initConfigServerCS->isValid()); - } else { - invariant(gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()); - } - + invariant(_initConfigServerCS.isValid()); _threadPool.startup(); } @@ -105,24 +99,24 @@ ShardRegistry::~ShardRegistry() { void ShardRegistry::init() { invariant(!_isInitialized.load()); + LOGV2_DEBUG(5123000, + 1, + "Initializing ShardRegistry", + "configServers"_attr = _initConfigServerCS.toString()); + /* The creation of the config shard object will intialize the associated RSM monitor that in * turn will call ShardRegistry::updateReplSetHosts(). Hence the config shard object MUST be * created after the ShardRegistry is fully constructed. This is why `_configShardData` * is initialized here rather than in the ShardRegistry constructor. */ - if (_initConfigServerCS) { - LOGV2_DEBUG( - 5123000, 1, "Initializing ShardRegistry", "configServers"_attr = _initConfigServerCS); - + { stdx::lock_guard lk(_mutex); - _initConfigShard(lk, *_initConfigServerCS); - _isInitialized.store(true); - } else { - LOGV2_DEBUG( - 7208800, - 1, - "Deferring ShardRegistry initialization until local replica set config is known"); + _configShardData = ShardRegistryData::createWithConfigShardOnly( + _shardFactory->createShard(ShardId::kConfigServerId, _initConfigServerCS)); + _latestConnStrings[_initConfigServerCS.getSetName()] = _initConfigServerCS; } + + _isInitialized.store(true); } ShardRegistry::Cache::LookupResult ShardRegistry::_lookup(OperationContext* opCtx, @@ -277,10 +271,7 @@ ConnectionString ShardRegistry::getConfigServerConnectionString() const { std::shared_ptr ShardRegistry::getConfigShard() const { stdx::lock_guard lk(_mutex); - auto configShard = _configShardData.findShard(ShardId::kConfigServerId); - // Note this should only throw if the local node has not learned its replica set config yet. - uassert(ErrorCodes::NotYetInitialized, "Config shard has not been set up yet", configShard); - return configShard; + return _configShardData.findShard(ShardId::kConfigServerId); } StatusWith> ShardRegistry::getShard(OperationContext* opCtx, @@ -407,6 +398,7 @@ void ShardRegistry::updateReplSetHosts(const ConnectionString& givenConnString, auto newData = ShardRegistryData::createFromExisting( _configShardData, newConnString, _shardFactory.get()); _configShardData = newData; + } else { auto value = _rsmIncrement.addAndFetch(1); LOGV2_DEBUG(4620252, @@ -418,7 +410,17 @@ void ShardRegistry::updateReplSetHosts(const ConnectionString& givenConnString, } // Schedule a lookup, to incorporate the new connection string. - _scheduleLookup(); + _getDataAsync() + .thenRunOn(Grid::get(_service)->getExecutorPool()->getFixedExecutor()) + .ignoreValue() + .getAsync([](const Status& status) { + if (!status.isOK()) { + LOGV2(4620201, + "Error running reload of ShardRegistry for RSM update, caused by {error}", + "Error running reload of ShardRegistry for RSM update", + "error"_attr = redact(status)); + } + }); } std::unique_ptr ShardRegistry::createConnection(const ConnectionString& connStr) const { @@ -555,41 +557,6 @@ std::shared_ptr ShardRegistry::getShardForHostNoReload(const HostAndPort& return data->findByHostAndPort(host); } -void ShardRegistry::_scheduleLookup() { - _getDataAsync() - .thenRunOn(Grid::get(_service)->getExecutorPool()->getFixedExecutor()) - .ignoreValue() - .getAsync([](const Status& status) { - if (!status.isOK()) { - LOGV2(4620201, - "Error running reload of ShardRegistry for RSM update, caused by {error}", - "Error running reload of ShardRegistry for RSM update", - "error"_attr = redact(status)); - } - }); -} - -void ShardRegistry::_initConfigShard(WithLock wl, const ConnectionString& configCS) { - _configShardData = ShardRegistryData::createWithConfigShardOnly( - _shardFactory->createShard(ShardId::kConfigServerId, configCS)); - _latestConnStrings[configCS.getSetName()] = configCS; -} - -void ShardRegistry::initConfigShardIfNecessary(const ConnectionString& configCS) { - { - stdx::lock_guard lk(_mutex); - if (_isInitialized.load()) { - return; - } - - _initConfigShard(lk, configCS); - _isInitialized.store(true); - } - - // Lookup can succeed now that the registry has a real config shard, so schedule one right away. - _scheduleLookup(); -} - ////////////// ShardRegistryData ////////////////// ShardRegistryData ShardRegistryData::createWithConfigShardOnly(std::shared_ptr configShard) { diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 13daa987df8..fa29fbcc146 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -187,14 +187,13 @@ public: */ ShardRegistry(ServiceContext* service, std::unique_ptr shardFactory, - const boost::optional& configServerCS, + const ConnectionString& configServerCS, std::vector shardRemovalHooks = {}); ~ShardRegistry(); /** - * Initializes ShardRegistry with config shard, if a connection string was provided at - * construction. + * Initializes ShardRegistry with config shard. * * The creation of the config shard object will intialize the associated RSM monitor that in * turn will call ShardRegistry::updateReplSetHosts(). Hence the config shard object MUST be @@ -202,12 +201,6 @@ public: */ void init(); - /** - * Sets up the registry's config shard from the given connection string. Only takes effect if - * the registry has not already done this. - */ - void initConfigShardIfNecessary(const ConnectionString& configCS); - /** * Startup the periodic reloader of the ShardRegistry. * Can be called only after ShardRegistry::init() @@ -418,11 +411,6 @@ private: */ SharedSemiFuture _getDataAsync(); - /** - * Triggers a reload without waiting for it to complete. - */ - void _scheduleLookup(); - /** * Gets the latest-cached copy of the ShardRegistryData. Never fetches from the config servers. * Only used by the "NoReload" accessors. @@ -438,8 +426,6 @@ private: void _initializeCacheIfNecessary() const; - void _initConfigShard(WithLock, const ConnectionString& configCS); - SharedSemiFuture _reloadAsync(); ServiceContext* _service{nullptr}; @@ -453,7 +439,7 @@ private: * Specified in the ShardRegistry c-tor. It's used only in init() to initialize the config * shard. */ - const boost::optional _initConfigServerCS; + const ConnectionString _initConfigServerCS; /** * A list of callbacks to be called asynchronously when it has been discovered that a shard was diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 658c6de68ed..953d2f83612 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -264,14 +264,8 @@ StatusWith ShardRemote::_runExhaustiveCursorCommand( getMoreBob->append("collection", data.nss.coll()); }; - const Milliseconds requestTimeout = [&] { - auto minMaxTimeMS = std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeMSOverride); - if (minMaxTimeMS < Milliseconds::max()) { - return minMaxTimeMS; - } - // The Fetcher expects kNoTimeout when there is no maxTimeMS instead of Milliseconds::max(). - return RemoteCommandRequest::kNoTimeout; - }(); + const Milliseconds requestTimeout = + std::min(opCtx->getRemainingMaxTimeMillis(), maxTimeMSOverride); auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); Fetcher fetcher(executor.get(), @@ -313,17 +307,6 @@ StatusWith ShardRemote::_runExhaustiveCursorCommand( return response; } -Milliseconds getExhaustiveFindOnConfigMaxTimeMS(OperationContext* opCtx, - const NamespaceString& nss) { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - // Don't use a timeout on the config server to guarantee it can always refresh. - return Milliseconds::max(); - } - - return std::min(opCtx->getRemainingMaxTimeMillis(), - nss == ChunkType::ConfigNS ? Milliseconds(gFindChunksOnConfigTimeoutMS.load()) - : Shard::kDefaultConfigCommandTimeout); -} StatusWith ShardRemote::_exhaustiveFindOnConfig( OperationContext* opCtx, @@ -358,7 +341,10 @@ StatusWith ShardRemote::_exhaustiveFindOnConfig( return bob.done().getObjectField(repl::ReadConcernArgs::kReadConcernFieldName).getOwned(); }(); - const Milliseconds maxTimeMS = getExhaustiveFindOnConfigMaxTimeMS(opCtx, nss); + const Milliseconds maxTimeMS = + std::min(opCtx->getRemainingMaxTimeMillis(), + nss == ChunkType::ConfigNS ? Milliseconds(gFindChunksOnConfigTimeoutMS.load()) + : kDefaultConfigCommandTimeout); BSONObjBuilder findCmdBuilder; diff --git a/src/mongo/s/cluster_identity_loader.cpp b/src/mongo/s/cluster_identity_loader.cpp index 67537e4cb2c..b51d0854b90 100644 --- a/src/mongo/s/cluster_identity_loader.cpp +++ b/src/mongo/s/cluster_identity_loader.cpp @@ -37,6 +37,7 @@ #include "mongo/db/service_context.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_config_version.h" +#include "mongo/s/grid.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -63,7 +64,6 @@ OID ClusterIdentityLoader::getClusterId() { } Status ClusterIdentityLoader::loadClusterId(OperationContext* opCtx, - ShardingCatalogClient* catalogClient, const repl::ReadConcernLevel& readConcernLevel) { stdx::unique_lock lk(_mutex); if (_initializationState == InitializationState::kInitialized) { @@ -82,7 +82,7 @@ Status ClusterIdentityLoader::loadClusterId(OperationContext* opCtx, _initializationState = InitializationState::kLoading; lk.unlock(); - auto loadStatus = _fetchClusterIdFromConfig(opCtx, catalogClient, readConcernLevel); + auto loadStatus = _fetchClusterIdFromConfig(opCtx, readConcernLevel); lk.lock(); invariant(_initializationState == InitializationState::kLoading); @@ -97,9 +97,8 @@ Status ClusterIdentityLoader::loadClusterId(OperationContext* opCtx, } StatusWith ClusterIdentityLoader::_fetchClusterIdFromConfig( - OperationContext* opCtx, - ShardingCatalogClient* catalogClient, - const repl::ReadConcernLevel& readConcernLevel) { + OperationContext* opCtx, const repl::ReadConcernLevel& readConcernLevel) { + auto catalogClient = Grid::get(opCtx)->catalogClient(); auto loadResult = catalogClient->getConfigVersion(opCtx, readConcernLevel); if (!loadResult.isOK()) { return loadResult.getStatus().withContext("Error loading clusterID"); diff --git a/src/mongo/s/cluster_identity_loader.h b/src/mongo/s/cluster_identity_loader.h index 43e51b9dc17..693c62483ad 100644 --- a/src/mongo/s/cluster_identity_loader.h +++ b/src/mongo/s/cluster_identity_loader.h @@ -34,7 +34,6 @@ #include "mongo/bson/oid.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/platform/mutex.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/stdx/condition_variable.h" #include "mongo/util/hierarchical_acquisition.h" @@ -74,9 +73,7 @@ public: * If another thread is already in the process of loading the cluster ID, concurrent calls will * wait for that thread to finish and then return its results. */ - Status loadClusterId(OperationContext* opCtx, - ShardingCatalogClient* catalogClient, - const repl::ReadConcernLevel& readConcernLevel); + Status loadClusterId(OperationContext* opCtx, const repl::ReadConcernLevel& readConcernLevel); /** * Called if the config.version document is rolled back. Notifies the ClusterIdentityLoader @@ -96,7 +93,6 @@ private: * the version document, and returns it. */ StatusWith _fetchClusterIdFromConfig(OperationContext* opCtx, - ShardingCatalogClient* catalogClient, const repl::ReadConcernLevel& readConcernLevel); Mutex _mutex = diff --git a/src/mongo/s/cluster_identity_loader_test.cpp b/src/mongo/s/cluster_identity_loader_test.cpp index 1a06260d134..363bb33ceb3 100644 --- a/src/mongo/s/cluster_identity_loader_test.cpp +++ b/src/mongo/s/cluster_identity_loader_test.cpp @@ -104,10 +104,9 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) { // The first time you ask for the cluster ID it will have to be loaded from the config servers. auto future = launchAsync([&] { - auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->loadClusterId(operationContext(), - catalogClient(), - repl::ReadConcernLevel::kMajorityReadConcern); + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext()) + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); @@ -118,20 +117,18 @@ TEST_F(ClusterIdentityTest, BasicLoadSuccess) { // Subsequent requests for the cluster ID should not require any network traffic as we consult // the cached version. - ASSERT_OK(ClusterIdentityLoader::get(operationContext()) - ->loadClusterId(operationContext(), - catalogClient(), - repl::ReadConcernLevel::kMajorityReadConcern)); + ASSERT_OK( + ClusterIdentityLoader::get(operationContext()) + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern)); } TEST_F(ClusterIdentityTest, NoConfigVersionDocument) { // If no version document is found on config server loadClusterId will return an error auto future = launchAsync([&] { - ASSERT_EQ(ClusterIdentityLoader::get(operationContext()) - ->loadClusterId(operationContext(), - catalogClient(), - repl::ReadConcernLevel::kMajorityReadConcern), - ErrorCodes::NoMatchingDocument); + ASSERT_EQ( + ClusterIdentityLoader::get(operationContext()) + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern), + ErrorCodes::NoMatchingDocument); }); expectConfigVersionLoad( @@ -144,26 +141,23 @@ TEST_F(ClusterIdentityTest, MultipleThreadsLoadingSuccess) { // Check that multiple threads calling getClusterId at once still results in only one network // operation. auto future1 = launchAsync([&] { - auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->loadClusterId(operationContext(), - catalogClient(), - repl::ReadConcernLevel::kMajorityReadConcern); + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext()) + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); auto future2 = launchAsync([&] { - auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->loadClusterId(operationContext(), - catalogClient(), - repl::ReadConcernLevel::kMajorityReadConcern); + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext()) + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); auto future3 = launchAsync([&] { - auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->loadClusterId(operationContext(), - catalogClient(), - repl::ReadConcernLevel::kMajorityReadConcern); + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext()) + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); @@ -179,10 +173,9 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) { // The first time you ask for the cluster ID it will have to be loaded from the config servers. auto future = launchAsync([&] { - auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->loadClusterId(operationContext(), - catalogClient(), - repl::ReadConcernLevel::kMajorityReadConcern); + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext()) + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_EQUALS(ErrorCodes::Interrupted, clusterIdStatus); }); @@ -193,10 +186,9 @@ TEST_F(ClusterIdentityTest, BasicLoadFailureFollowedBySuccess) { // After a failure to load the cluster ID, subsequent attempts to get the cluster ID should // retry loading it. future = launchAsync([&] { - auto clusterIdStatus = ClusterIdentityLoader::get(operationContext()) - ->loadClusterId(operationContext(), - catalogClient(), - repl::ReadConcernLevel::kMajorityReadConcern); + auto clusterIdStatus = + ClusterIdentityLoader::get(operationContext()) + ->loadClusterId(operationContext(), repl::ReadConcernLevel::kMajorityReadConcern); ASSERT_OK(clusterIdStatus); ASSERT_EQUALS(clusterId, ClusterIdentityLoader::get(operationContext())->getClusterId()); }); diff --git a/src/mongo/s/config_server_catalog_cache_loader.cpp b/src/mongo/s/config_server_catalog_cache_loader.cpp index 3be6957b8c6..2fb23a43bf9 100644 --- a/src/mongo/s/config_server_catalog_cache_loader.cpp +++ b/src/mongo/s/config_server_catalog_cache_loader.cpp @@ -34,7 +34,6 @@ #include -#include "mongo/db/catalog_shard_feature_flag_gen.h" #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator.h" @@ -59,10 +58,7 @@ CollectionAndChangedChunks getChangedChunks(OperationContext* opCtx, const NamespaceString& nss, ChunkVersion sinceVersion) { const auto readConcern = [&]() -> repl::ReadConcernArgs { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && - !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) { - // When the feature flag is on, the config server may read from a secondary which may - // need to wait for replication, so we should use afterClusterTime. + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { return {repl::ReadConcernLevel::kSnapshotReadConcern}; } else { const auto vcTime = VectorClock::get(opCtx)->getTime(); diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 18ba621c157..4df705e4a88 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -239,9 +239,8 @@ Status loadGlobalSettingsFromConfigServer(OperationContext* opCtx) { } try { - auto catalogClient = Grid::get(opCtx)->catalogClient(); uassertStatusOK(ClusterIdentityLoader::get(opCtx)->loadClusterId( - opCtx, catalogClient, repl::ReadConcernLevel::kMajorityReadConcern)); + opCtx, repl::ReadConcernLevel::kMajorityReadConcern)); // Assert will be raised on failure to talk to config server. loadCWWCFromConfigServerForReplication(opCtx); return Status::OK(); -- cgit v1.2.1