diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2023-01-27 17:29:27 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-27 19:26:18 +0000 |
commit | 9c7c82b40b7493cb63f971429e04cfb419c62b93 (patch) | |
tree | cc33f021b19156e409a8618cae280020f8834a4a | |
parent | 25639888e57caae19d152e89c4d68aa935618142 (diff) | |
download | mongo-9c7c82b40b7493cb63f971429e04cfb419c62b93.tar.gz |
SERVER-72872 Introduce configsvr command to query placementHistory in controlled conditions
-rw-r--r-- | jstests/core/views/views_all_commands.js | 1 | ||||
-rw-r--r-- | jstests/replsets/all_commands_downgrading_to_upgraded.js | 1 | ||||
-rw-r--r-- | jstests/replsets/db_reads_while_recovering_all_commands.js | 1 | ||||
-rw-r--r-- | jstests/sharding/read_write_concern_defaults_application.js | 1 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_get_historical_placement_info.cpp | 123 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client.h | 13 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.cpp | 462 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.h | 15 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/s/request_types/get_historical_placement_info.idl | 77 |
13 files changed, 494 insertions, 215 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index 906886c51ba..e9891cb65eb 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -109,6 +109,7 @@ let viewsCommandTests = { _configsvrCreateDatabase: {skip: isAnInternalCommand}, _configsvrDropIndexCatalogEntry: {skip: isAnInternalCommand}, _configsvrEnsureChunkVersionIsGreaterThan: {skip: isAnInternalCommand}, + _configsvrGetHistoricalPlacement: {skip: isAnInternalCommand}, // TODO SERVER-73029 remove _configsvrMoveChunk: {skip: isAnInternalCommand}, // Can be removed once 6.0 is last LTS _configsvrMovePrimary: {skip: isAnInternalCommand}, _configsvrMoveRange: {skip: isAnInternalCommand}, diff --git a/jstests/replsets/all_commands_downgrading_to_upgraded.js b/jstests/replsets/all_commands_downgrading_to_upgraded.js index 322f9198f42..0e4d06fcfb0 100644 --- a/jstests/replsets/all_commands_downgrading_to_upgraded.js +++ b/jstests/replsets/all_commands_downgrading_to_upgraded.js @@ -63,6 +63,7 @@ const allCommands = { _configsvrCreateDatabase: {skip: isAnInternalCommand}, _configsvrDropIndexCatalogEntry: {skip: isAnInternalCommand}, _configsvrEnsureChunkVersionIsGreaterThan: {skip: isAnInternalCommand}, + _configsvrGetHistoricalPlacement: {skip: isAnInternalCommand}, // TODO SERVER-73029 remove _configsvrMoveChunk: {skip: isAnInternalCommand}, _configsvrMoveRange: {skip: isAnInternalCommand}, _configsvrRefineCollectionShardKey: {skip: isAnInternalCommand}, diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index 8e04bdfd7c0..d24dc1603ad 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -48,6 +48,7 @@ const allCommands = { _configsvrCreateDatabase: {skip: isPrimaryOnly}, _configsvrDropIndexCatalogEntry: {skip: isPrimaryOnly}, _configsvrEnsureChunkVersionIsGreaterThan: {skip: isPrimaryOnly}, + _configsvrGetHistoricalPlacement: {skip: isAnInternalCommand}, // TODO SERVER-73029 remove _configsvrMoveChunk: {skip: isPrimaryOnly}, _configsvrMoveRange: {skip: isPrimaryOnly}, _configsvrRefineCollectionShardKey: {skip: isPrimaryOnly}, diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js index 81de7608132..6a011fa767b 100644 --- a/jstests/sharding/read_write_concern_defaults_application.js +++ b/jstests/sharding/read_write_concern_defaults_application.js @@ -101,6 +101,7 @@ let testCases = { _configsvrCreateDatabase: {skip: "internal command"}, _configsvrDropIndexCatalogEntry: {skip: "internal command"}, _configsvrEnsureChunkVersionIsGreaterThan: {skip: "internal command"}, + _configsvrGetHistoricalPlacement: {skip: "internal command"}, // TODO SERVER-73029 remove _configsvrMoveChunk: {skip: "internal command"}, _configsvrMovePrimary: {skip: "internal command"}, // Can be removed once 6.0 is last LTS _configsvrMoveRange: {skip: "internal command"}, diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 144838d2097..688b4e251a6 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -445,6 +445,7 @@ env.Library( 'config/configsvr_drop_index_catalog_command.cpp', 'config/configsvr_create_database_command.cpp', 'config/configsvr_ensure_chunk_version_is_greater_than_command.cpp', + 'config/configsvr_get_historical_placement_info.cpp', 'config/configsvr_merge_all_chunks_on_shard_command.cpp', 'config/configsvr_merge_chunks_command.cpp', 'config/configsvr_move_chunk_command.cpp', diff --git a/src/mongo/db/s/config/configsvr_get_historical_placement_info.cpp b/src/mongo/db/s/config/configsvr_get_historical_placement_info.cpp new file mode 100644 index 00000000000..9e71224c7a0 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_get_historical_placement_info.cpp @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/s/request_types/get_historical_placement_info_gen.h" +#include "mongo/s/sharding_feature_flags_gen.h" + + +namespace mongo { +namespace { + +class ConfigsvrGetHistoricalPlacementCommand final + : public TypedCommand<ConfigsvrGetHistoricalPlacementCommand> { +public: + using Request = ConfigsvrGetHistoricalPlacement; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + ConfigsvrGetHistoricalPlacementResponse typedRun(OperationContext* opCtx) { + const NamespaceString& nss = ns(); + + uassert(ErrorCodes::IllegalOperation, + "_configsvrGetHistoricalPlacement can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + + // Set the operation context read concern level to majority for reads into the config + // database. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kMajorityReadConcern); + + const auto catalogClient = ShardingCatalogManager::get(opCtx)->localCatalogClient(); + + if (!feature_flags::gHistoricalPlacementShardingCatalog.isEnabled( + serverGlobalParams.featureCompatibility)) { + auto shardsWithOpTime = uassertStatusOK(catalogClient->getAllShards( + opCtx, repl::ReadConcernLevel::kMajorityReadConcern)); + std::vector<ShardId> shardIds; + std::transform(shardsWithOpTime.value.begin(), + shardsWithOpTime.value.end(), + std::back_inserter(shardIds), + [](const ShardType& s) { return s.getName(); }); + ConfigsvrGetHistoricalPlacementResponse response(std::move(shardIds)); + response.setIsExact(false); + return response; + } + + boost::optional<NamespaceString> targetedNs = request().getTargetWholeCluster() + ? (boost::optional<NamespaceString>)boost::none + : nss; + return ConfigsvrGetHistoricalPlacementResponse( + catalogClient->getHistoricalPlacement(opCtx, request().getAt(), targetedNs)); + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return false; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + + bool skipApiVersionCheck() const override { + // Internal command (server to server). + return true; + } + + std::string help() const override { + return "Internal command, which is exported by the sharding config server. Do not call " + "directly. Allows to run queries concerning historical placement of a namespace in " + "a controlled way."; + } + + bool adminOnly() const override { + return true; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } +} configsvrGetHistoricalPlacementCmd; + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 1a410bbeefd..ae3e0b0fc72 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -220,6 +220,7 @@ env.Library( 'request_types/flush_resharding_state_change.idl', 'request_types/flush_routing_table_cache_updates.idl', 'request_types/get_database_version.idl', + 'request_types/get_historical_placement_info.idl', 'request_types/shardsvr_join_migrations_request.idl', 'request_types/merge_chunk_request.idl', 'request_types/migration_secondary_throttle_options.cpp', diff --git a/src/mongo/s/catalog/sharding_catalog_client.h b/src/mongo/s/catalog/sharding_catalog_client.h index d93e8e9bddf..31a171b6c4c 100644 --- a/src/mongo/s/catalog/sharding_catalog_client.h +++ b/src/mongo/s/catalog/sharding_catalog_client.h @@ -360,6 +360,19 @@ public: virtual std::vector<ShardId> getShardsThatOwnDataAtClusterTime( OperationContext* opCtx, const Timestamp& clusterTime) = 0; + /** + * Queries config.placementHistory to retrieve placement metadata on the requested namespace at + * a specific point in time. When no namespace is specified, placement metadata on the whole + * cluster will be returned. This function is meant to be exclusively invoked by config server + * nodes. + * + * TODO (SERVER-73029): convert to private method of ShardingCatalogClientImpl + */ + virtual std::vector<ShardId> getHistoricalPlacement( + OperationContext* opCtx, + const Timestamp& atClusterTime, + const boost::optional<NamespaceString>& nss) = 0; + protected: ShardingCatalogClient() = default; diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 9afaf5e38ff..e3b78e19477 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -383,215 +383,6 @@ std::vector<BSONObj> runCatalogAggregation(OperationContext* opCtx, return aggResult; } -std::vector<ShardId> makeAndRunPlacementHistoryAggregation( - OperationContext* opCtx, - const std::shared_ptr<Shard>& configShard, - const Timestamp& minClusterTime, - const boost::optional<NamespaceString>& nss) { - /* - Stage 1. Select only the entry with timestamp <= clusterTime and filter out all nss that are - not the collection or the database - Stage 2. sort by timestamp - Stage 3. Extract the first document for each database and collection matching the received - namespace - Stage 4. Discard the entries with empty shards (i.e. the collection was dropped or - renamed) - Stage 5. Group all documents and concat shards (this will generate an array of arrays) - Stage 6. Flatten the array of arrays into a set (this will also remove duplicates) - - Stage 7. Access to the list of shards currently active in the cluster - Stage 8. Count the number of shards obtained on stage 6 that also appear in the list of - active shards - Stage 9. Do not return the list of active shards (used only for the count) - - regex=^db(\.collection)?$ // matches db or db.collection ( this is skipped in case the whole - cluster info is searched) - [ - { - "$match": { "timestamp": { "$lte": clusterTime } , "nss" : { $regex: regex} } - }, - { - "$sort": { "timestamp": -1 } - }, - { - "$group": { - _id: "$nss", - shards: { $first: "$shards" } - } - }, - { "$match": { shards: { $not: { $size: 0 } } } }, - { - "$group": { - _id: "", - shards: { $push: "$shards" } - } - }, - { - $project: { - "shards": { - $reduce: { - input: "$shards", - initialValue: [], - in: { "$setUnion": [ "$$this", "$$value"] } - } - } - } - }, - { - $lookup: - { - from: "shards", - localField: "shards", - foreignField: "_id", - as: "activeShards" - } - }, - { "$set" : { "numActiveShards" : { "$size" : "$activeShards" } } }, - { "$project": { "activeShards" : 0 } } - ] - */ - - auto expCtx = make_intrusive<ExpressionContext>( - opCtx, nullptr /*collator*/, NamespaceString::kConfigsvrPlacementHistoryNamespace); - StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; - resolvedNamespaces[NamespaceString::kConfigsvrShardsNamespace.coll()] = { - NamespaceString::kConfigsvrShardsNamespace, std::vector<BSONObj>() /* pipeline */}; - resolvedNamespaces[NamespaceString::kConfigsvrPlacementHistoryNamespace.coll()] = { - NamespaceString::kConfigsvrPlacementHistoryNamespace, - std::vector<BSONObj>() /* pipeline */}; - expCtx->setResolvedNamespaces(resolvedNamespaces); - - // 1. Get all the history entries prior to the requested time concerning either the collection - // or the parent database. - - - auto matchStage = [&]() { - bool isClusterSearch = !nss.has_value(); - if (isClusterSearch) - return DocumentSourceMatch::create(BSON("timestamp" << BSON("$lte" << minClusterTime)), - expCtx); - - bool isCollectionSearch = !nss->db().empty() && !nss->coll().empty(); - auto collMatchExpression = isCollectionSearch ? pcre_util::quoteMeta(nss->coll()) : ".*"; - auto regexString = - "^" + pcre_util::quoteMeta(nss->db()) + "(\\." + collMatchExpression + ")?$"; - return DocumentSourceMatch::create(BSON("timestamp" << BSON("$lte" << minClusterTime) - << "nss" - << BSON("$regex" << regexString)), - expCtx); - }(); - - // 2 & 3. Sort by timestamp and extract the first document for collection and database - auto sortStage = DocumentSourceSort::create(expCtx, BSON("timestamp" << -1)); - auto groupStageBson = BSON("_id" - << "$nss" - << "shards" - << BSON("$first" - << "$shards")); - auto groupStage = DocumentSourceGroup::createFromBson( - Document{{"$group", std::move(groupStageBson)}}.toBson().firstElement(), expCtx); - - // Stage 4. Discard the entries with empty shards (i.e. the collection was dropped or renamed) - auto noShardsFilter = - DocumentSourceMatch::create(BSON("shards" << BSON("$not" << BSON("$size" << 0))), expCtx); - - // Stage 5. Group all documents and concat shards (this will generate an array of arrays) - auto groupStageBson2 = BSON("_id" - << "" - << "shards" - << BSON("$push" - << "$shards")); - auto groupStageConcat = DocumentSourceGroup::createFromBson( - Document{{"$group", std::move(groupStageBson2)}}.toBson().firstElement(), expCtx); - - // Stage 6. Flatten the array of arrays into a set (this will also remove duplicates) - auto projectStageBson = - BSON("shards" << BSON("$reduce" << BSON("input" - << "$shards" - << "initialValue" << BSONArray() << "in" - << BSON("$setUnion" << BSON_ARRAY("$$this" - << "$$value"))))); - auto projectStageFlatten = DocumentSourceProject::createFromBson( - Document{{"$project", std::move(projectStageBson)}}.toBson().firstElement(), expCtx); - - - // Stage 7. Lookup active shards with left outer join on config.shards - Document lookupStageDoc = { - {"from", NamespaceString::kConfigsvrShardsNamespace.coll().toString()}, - {"localField", StringData("shards")}, - {"foreignField", StringData("_id")}, - {"as", StringData("activeShards")}}; - - auto lookupStage = DocumentSourceLookUp::createFromBson( - Document{{"$lookup", std::move(lookupStageDoc)}}.toBson().firstElement(), expCtx); - - // Stage 8. Count number of active shards - auto setStageDoc = Document( - {{"$set", Document{{"numActiveShards", Document{{"$size", "$activeShards"_sd}}}}}}); - auto setStage = - DocumentSourceAddFields::createFromBson(setStageDoc.toBson().firstElement(), expCtx); - - // Stage 9. Disable activeShards field to avoid sending it to the client - auto projectStageDoc = Document{{"activeShards", 0}}; - auto projectStage = DocumentSourceProject::create( - projectStageDoc.toBson(), expCtx, "getShardsThatOwnDataForNamespaceAtClusterTime"); - - // Create pipeline - Pipeline::SourceContainer stages; - stages.emplace_back(std::move(matchStage)); - stages.emplace_back(std::move(sortStage)); - stages.emplace_back(std::move(groupStage)); - stages.emplace_back(std::move(noShardsFilter)); - stages.emplace_back(std::move(groupStageConcat)); - stages.emplace_back(std::move(projectStageFlatten)); - stages.emplace_back(std::move(lookupStage)); - stages.emplace_back(std::move(setStage)); - stages.emplace_back(std::move(projectStage)); - - const auto pipeline = Pipeline::create(stages, expCtx); - auto aggRequest = AggregateCommandRequest(NamespaceString::kConfigsvrPlacementHistoryNamespace, - pipeline->serializeToBson()); - - - // Run the aggregation - const auto readConcern = [&]() -> repl::ReadConcernArgs { - if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && - !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) { - // When the feature flag is on, the config server may read from a secondary which may - // need to wait for replication, so we should use afterClusterTime. - return {repl::ReadConcernLevel::kMajorityReadConcern}; - } else { - const auto time = VectorClock::get(opCtx)->getTime(); - return {time.configTime(), repl::ReadConcernLevel::kMajorityReadConcern}; - } - }(); - - auto aggrResult = runCatalogAggregation( - opCtx, configShard, aggRequest, readConcern, Shard::kDefaultConfigCommandTimeout); - - // Parse the result - std::vector<ShardId> activeShards; - if (!aggrResult.empty()) { - invariant(aggrResult.size() == 1); - - // Extract the result - const auto& doc = aggrResult.front(); - auto numActiveShards = doc.getField("numActiveShards").Int(); - // Use Obj() instead of Array() to avoid instantiating a temporary std::vector. - const auto& shards = doc.getField("shards").Obj(); - - uassert(ErrorCodes::SnapshotTooOld, - "Part of the history may no longer be retrieved because of one or more removed " - "shards.", - numActiveShards == static_cast<int>(shards.nFields())); - - for (const auto& shard : shards) { - activeShards.push_back(shard.String()); - } - } - return activeShards; -} - } // namespace ShardingCatalogClientImpl::ShardingCatalogClientImpl(std::shared_ptr<Shard> overrideConfigShard) @@ -704,6 +495,24 @@ StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::_fetchData } } +std::vector<ShardId> ShardingCatalogClientImpl::_fetchPlacementMetadata( + OperationContext* opCtx, ConfigsvrGetHistoricalPlacement&& request) { + auto remoteResponse = uassertStatusOK(_getConfigShard(opCtx)->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + NamespaceString::kAdminDb.toString(), + request.toBSON(BSONObj()), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotentOrCursorInvalidated)); + + uassertStatusOK(remoteResponse.commandStatus); + + auto placementDetails = ConfigsvrGetHistoricalPlacementResponse::parse( + IDLParserContext("ShardingCatalogClient"), remoteResponse.response); + + return placementDetails.getShards(); +} + CollectionType ShardingCatalogClientImpl::getCollection(OperationContext* opCtx, const NamespaceString& nss, repl::ReadConcernLevel readConcernLevel) { @@ -1471,8 +1280,11 @@ std::vector<ShardId> ShardingCatalogClientImpl::getShardsThatOwnDataForCollAtClu "A full collection namespace must be specified", !collName.coll().empty()); - return makeAndRunPlacementHistoryAggregation( - opCtx, _getConfigShard(opCtx), clusterTime, collName); + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return getHistoricalPlacement(opCtx, clusterTime, collName); + } + + return _fetchPlacementMetadata(opCtx, ConfigsvrGetHistoricalPlacement(collName, clusterTime)); } @@ -1483,15 +1295,235 @@ std::vector<ShardId> ShardingCatalogClientImpl::getShardsThatOwnDataForDbAtClust "A full db namespace must be specified", dbName.coll().empty() && !dbName.db().empty()); - return makeAndRunPlacementHistoryAggregation( - opCtx, _getConfigShard(opCtx), clusterTime, dbName); + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return getHistoricalPlacement(opCtx, clusterTime, dbName); + } + + return _fetchPlacementMetadata(opCtx, ConfigsvrGetHistoricalPlacement(dbName, clusterTime)); } std::vector<ShardId> ShardingCatalogClientImpl::getShardsThatOwnDataAtClusterTime( OperationContext* opCtx, const Timestamp& clusterTime) { - return makeAndRunPlacementHistoryAggregation( - opCtx, _getConfigShard(opCtx), clusterTime, boost::none); + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return getHistoricalPlacement(opCtx, clusterTime, boost::none); + } + + ConfigsvrGetHistoricalPlacement request(NamespaceString(), clusterTime); + request.setTargetWholeCluster(true); + return _fetchPlacementMetadata(opCtx, std::move(request)); +} + +std::vector<ShardId> ShardingCatalogClientImpl::getHistoricalPlacement( + OperationContext* opCtx, + const Timestamp& atClusterTime, + const boost::optional<NamespaceString>& nss) { + + // TODO (SERVER-73029): Remove the invariant + invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + auto configShard = _getConfigShard(opCtx); + /* + Stage 1. Select only the entry with timestamp <= clusterTime and filter out all nss that are + not the collection or the database + Stage 2. sort by timestamp + Stage 3. Extract the first document for each database and collection matching the received + namespace + Stage 4. Discard the entries with empty shards (i.e. the collection was dropped or + renamed) + Stage 5. Group all documents and concat shards (this will generate an array of arrays) + Stage 6. Flatten the array of arrays into a set (this will also remove duplicates) + + Stage 7. Access to the list of shards currently active in the cluster + Stage 8. Count the number of shards obtained on stage 6 that also appear in the list of + active shards + Stage 9. Do not return the list of active shards (used only for the count) + + regex=^db(\.collection)?$ // matches db or db.collection ( this is skipped in case the whole + cluster info is searched) + [ + { + "$match": { "timestamp": { "$lte": clusterTime } , "nss" : { $regex: regex} } + }, + { + "$sort": { "timestamp": -1 } + }, + { + "$group": { + _id: "$nss", + shards: { $first: "$shards" } + } + }, + { "$match": { shards: { $not: { $size: 0 } } } }, + { + "$group": { + _id: "", + shards: { $push: "$shards" } + } + }, + { + $project: { + "shards": { + $reduce: { + input: "$shards", + initialValue: [], + in: { "$setUnion": [ "$$this", "$$value"] } + } + } + } + }, + { + $lookup: + { + from: "shards", + localField: "shards", + foreignField: "_id", + as: "activeShards" + } + }, + { "$set" : { "numActiveShards" : { "$size" : "$activeShards" } } }, + { "$project": { "activeShards" : 0 } } + ] + */ + + auto expCtx = make_intrusive<ExpressionContext>( + opCtx, nullptr /*collator*/, NamespaceString::kConfigsvrPlacementHistoryNamespace); + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + resolvedNamespaces[NamespaceString::kConfigsvrShardsNamespace.coll()] = { + NamespaceString::kConfigsvrShardsNamespace, std::vector<BSONObj>() /* pipeline */}; + resolvedNamespaces[NamespaceString::kConfigsvrPlacementHistoryNamespace.coll()] = { + NamespaceString::kConfigsvrPlacementHistoryNamespace, + std::vector<BSONObj>() /* pipeline */}; + expCtx->setResolvedNamespaces(resolvedNamespaces); + + // 1. Get all the history entries prior to the requested time concerning either the collection + // or the parent database. + + + auto matchStage = [&]() { + bool isClusterSearch = !nss.has_value(); + if (isClusterSearch) + return DocumentSourceMatch::create(BSON("timestamp" << BSON("$lte" << atClusterTime)), + expCtx); + + bool isCollectionSearch = !nss->db().empty() && !nss->coll().empty(); + auto collMatchExpression = isCollectionSearch ? pcre_util::quoteMeta(nss->coll()) : ".*"; + auto regexString = + "^" + pcre_util::quoteMeta(nss->db()) + "(\\." + collMatchExpression + ")?$"; + return DocumentSourceMatch::create(BSON("timestamp" << BSON("$lte" << atClusterTime) + << "nss" + << BSON("$regex" << regexString)), + expCtx); + }(); + + // 2 & 3. Sort by timestamp and extract the first document for collection and database + auto sortStage = DocumentSourceSort::create(expCtx, BSON("timestamp" << -1)); + auto groupStageBson = BSON("_id" + << "$nss" + << "shards" + << BSON("$first" + << "$shards")); + auto groupStage = DocumentSourceGroup::createFromBson( + Document{{"$group", std::move(groupStageBson)}}.toBson().firstElement(), expCtx); + + // Stage 4. Discard the entries with empty shards (i.e. the collection was dropped or renamed) + auto noShardsFilter = + DocumentSourceMatch::create(BSON("shards" << BSON("$not" << BSON("$size" << 0))), expCtx); + + // Stage 5. Group all documents and concat shards (this will generate an array of arrays) + auto groupStageBson2 = BSON("_id" + << "" + << "shards" + << BSON("$push" + << "$shards")); + auto groupStageConcat = DocumentSourceGroup::createFromBson( + Document{{"$group", std::move(groupStageBson2)}}.toBson().firstElement(), expCtx); + + // Stage 6. Flatten the array of arrays into a set (this will also remove duplicates) + auto projectStageBson = + BSON("shards" << BSON("$reduce" << BSON("input" + << "$shards" + << "initialValue" << BSONArray() << "in" + << BSON("$setUnion" << BSON_ARRAY("$$this" + << "$$value"))))); + auto projectStageFlatten = DocumentSourceProject::createFromBson( + Document{{"$project", std::move(projectStageBson)}}.toBson().firstElement(), expCtx); + + + // Stage 7. Lookup active shards with left outer join on config.shards + Document lookupStageDoc = { + {"from", NamespaceString::kConfigsvrShardsNamespace.coll().toString()}, + {"localField", StringData("shards")}, + {"foreignField", StringData("_id")}, + {"as", StringData("activeShards")}}; + + auto lookupStage = DocumentSourceLookUp::createFromBson( + Document{{"$lookup", std::move(lookupStageDoc)}}.toBson().firstElement(), expCtx); + + // Stage 8. Count number of active shards + auto setStageDoc = Document( + {{"$set", Document{{"numActiveShards", Document{{"$size", "$activeShards"_sd}}}}}}); + auto setStage = + DocumentSourceAddFields::createFromBson(setStageDoc.toBson().firstElement(), expCtx); + + // Stage 9. Disable activeShards field to avoid sending it to the client + auto projectStageDoc = Document{{"activeShards", 0}}; + auto projectStage = DocumentSourceProject::create( + projectStageDoc.toBson(), expCtx, "getShardsThatOwnDataForNamespaceAtClusterTime"); + + // Create pipeline + Pipeline::SourceContainer stages; + stages.emplace_back(std::move(matchStage)); + stages.emplace_back(std::move(sortStage)); + stages.emplace_back(std::move(groupStage)); + stages.emplace_back(std::move(noShardsFilter)); + stages.emplace_back(std::move(groupStageConcat)); + stages.emplace_back(std::move(projectStageFlatten)); + stages.emplace_back(std::move(lookupStage)); + stages.emplace_back(std::move(setStage)); + stages.emplace_back(std::move(projectStage)); + + const auto pipeline = Pipeline::create(stages, expCtx); + auto aggRequest = AggregateCommandRequest(NamespaceString::kConfigsvrPlacementHistoryNamespace, + pipeline->serializeToBson()); + + + // Run the aggregation + const auto readConcern = [&]() -> repl::ReadConcernArgs { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer && + !gFeatureFlagConfigServerAlwaysShardRemote.isEnabledAndIgnoreFCV()) { + // When the feature flag is on, the config server may read from a secondary which may + // need to wait for replication, so we should use afterClusterTime. + return {repl::ReadConcernLevel::kMajorityReadConcern}; + } else { + const auto time = VectorClock::get(opCtx)->getTime(); + return {time.configTime(), repl::ReadConcernLevel::kMajorityReadConcern}; + } + }(); + + auto aggrResult = runCatalogAggregation( + opCtx, configShard, aggRequest, readConcern, Shard::kDefaultConfigCommandTimeout); + + // Parse the result + std::vector<ShardId> activeShards; + if (!aggrResult.empty()) { + invariant(aggrResult.size() == 1); + + // Extract the result + const auto& doc = aggrResult.front(); + auto numActiveShards = doc.getField("numActiveShards").Int(); + // Use Obj() instead of Array() to avoid instantiating a temporary std::vector. + const auto& shards = doc.getField("shards").Obj(); + + uassert(ErrorCodes::SnapshotTooOld, + "Part of the history may no longer be retrieved because of one or more removed " + "shards.", + numActiveShards == static_cast<int>(shards.nFields())); + + for (const auto& shard : shards) { + activeShards.push_back(shard.String()); + } + } + return activeShards; } std::shared_ptr<Shard> ShardingCatalogClientImpl::_getConfigShard(OperationContext* opCtx) { diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.h b/src/mongo/s/catalog/sharding_catalog_client_impl.h index 49006332c4c..6bd6f16bcc3 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.h @@ -34,6 +34,7 @@ #include "mongo/platform/mutex.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/request_types/get_historical_placement_info_gen.h" namespace mongo { @@ -204,6 +205,11 @@ public: std::vector<ShardId> getShardsThatOwnDataAtClusterTime(OperationContext* opCtx, const Timestamp& clusterTime) override; + std::vector<ShardId> getHistoricalPlacement( + OperationContext* opCtx, + const Timestamp& atClusterTime, + const boost::optional<NamespaceString>& nss) override; + private: /** * Updates a single document (if useMultiUpdate is false) or multiple documents (if @@ -247,6 +253,15 @@ private: repl::ReadConcernLevel readConcernLevel); /** + * Queries the config server to retrieve placement data based on the Request object. + * TODO (SERVER-73029): Remove the method - and replace its invocations with + * runPlacementHistoryAggregation() + */ + std::vector<ShardId> _fetchPlacementMetadata(OperationContext* opCtx, + ConfigsvrGetHistoricalPlacement&& request); + + + /** * Returns the Shard type that should be used to access the config server. Unless an instance * was provided at construction, which may be done e.g. to force using local operations, falls * back to using the config shard from the ShardRegistry. diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp index 47c96b6c81c..60feb3833a3 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.cpp @@ -223,4 +223,11 @@ std::vector<ShardId> ShardingCatalogClientMock::getShardsThatOwnDataAtClusterTim OperationContext* opCtx, const Timestamp& clusterTime) { uasserted(ErrorCodes::InternalError, "Method not implemented"); } + +std::vector<ShardId> ShardingCatalogClientMock::getHistoricalPlacement( + OperationContext* opCtx, + const Timestamp& atClusterTime, + const boost::optional<NamespaceString>& nss) { + uasserted(ErrorCodes::InternalError, "Method not implemented"); +} } // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_client_mock.h b/src/mongo/s/catalog/sharding_catalog_client_mock.h index 888d00da2c5..30cf2ed0826 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_client_mock.h @@ -162,6 +162,12 @@ public: std::vector<ShardId> getShardsThatOwnDataAtClusterTime(OperationContext* opCtx, const Timestamp& clusterTime) override; + std::vector<ShardId> getHistoricalPlacement( + OperationContext* opCtx, + const Timestamp& atClusterTime, + const boost::optional<NamespaceString>& nss) override; + + private: StatusWith<repl::OpTimeWith<std::vector<BSONObj>>> _exhaustiveFindOnConfig( OperationContext* opCtx, diff --git a/src/mongo/s/request_types/get_historical_placement_info.idl b/src/mongo/s/request_types/get_historical_placement_info.idl new file mode 100644 index 00000000000..aca7f49a4cf --- /dev/null +++ b/src/mongo/s/request_types/get_historical_placement_info.idl @@ -0,0 +1,77 @@ +# Copyright (C) 2023-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +# TODO (SERVER-73029): remove this file and the class implementing _configsvrGetHistoricalPlacement + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/db/basic_types.idl" + - "mongo/s/sharding_types.idl" + +structs: + + ConfigsvrGetHistoricalPlacementResponse: + description: "Response for the _configsvrGetHistoricalPlacement command" + strict: false + is_command_reply: true + fields: + shards: + type: array<shard_id> + description: "The set of shard IDs containing data on the requested nss/cluster at + the point in time" + isExact: + type: bool + description: "When true, the returned list of shards is an accurate recording of + the placement info at the requested point in time. + When false, the result value represents an approximation based on + a present/past reading of config.shards" + default: true + + +commands: + + _configsvrGetHistoricalPlacement: + command_name: _configsvrGetHistoricalPlacement + cpp_name: ConfigsvrGetHistoricalPlacement + description: "Internal command to retrieve the list of shard IDs hosting data for the + namespace/cluster being targeted at a specific point in time" + namespace: type + api_version: "" + type: namespacestring + strict: false + fields: + at: + type: timestamp + description: "The requested point in time" + optional: false + targetWholeCluster: + type: bool + description: "When true, the command retrieves placement information concerning + the whole cluster (ignoring the namespace parameter)" + default: false |