From c0953d92e0a07dd130a0f4a4ac26a9fc4d7ef7f7 Mon Sep 17 00:00:00 2001 From: Enrico Golfieri Date: Wed, 28 Sep 2022 15:27:45 +0000 Subject: SERVER-68929 Define a new method to query historical placement data on a namespace from a mongos process --- .../s/catalog/sharding_catalog_client_impl.cpp | 166 +++++++++++++++++++++ 1 file changed, 166 insertions(+) (limited to 'src/mongo/s/catalog/sharding_catalog_client_impl.cpp') diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp index 27e3ee8f319..f9ddf58e907 100644 --- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -40,8 +40,12 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/aggregate_command_gen.h" +#include "mongo/db/pipeline/document_source_add_fields.h" +#include "mongo/db/pipeline/document_source_group.h" #include "mongo/db/pipeline/document_source_lookup.h" #include "mongo/db/pipeline/document_source_match.h" +#include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_union_with.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" @@ -1199,4 +1203,166 @@ StatusWith> ShardingCatalogClientImpl::getNe return keys; } + +std::vector ShardingCatalogClientImpl::getShardsThatOwnDataForCollAtClusterTime( + OperationContext* opCtx, const NamespaceString& collName, const Timestamp& clusterTime) { + /* + Stage 1. Select only the entry with timestamp <= clusterTime and filter out all nss that are not + the collection or the database + Stage 2. sort by timestamp + Stage 3. Extract the first document for collection and database + Stage 4. Access to the list of shards currently active in the cluster + Stage 5. Count the number of currently active shards among the found ones + Stage 6. Do not return the list of active shards (used only for the count) + Stage 7. Discard the entries with empty shards (i.e. the collection was dropped or renamed) + Stage 8. Sort by namespace to ensure that the collection is before the database + Stage 9. Extract only one document (the collection or the database if the first was discarded at + Stage 7) + + regex=^db(.collection)?$ + [ + { + "$match": { "timestamp": { "$lte": clusterTime } , "nss" : { $regex: regex} } + }, + { + "$sort": { "timestamp": -1 } + }, + { + "$group": { + _id: "$nss", + shards: { $first: "$shards" } + } + }, + { + $lookup: + { + from: "shards", + localField: "shards", + foreignField: "_id", + as: "activeShards" + } + }, + { "$set" : { "numActiveShards" : { "$size" : "$activeShards" } } }, + { "$project": { "activeShards" : 0 } } + { "$match": { shards: { $not: { $size: 0 } } } }, + { $sort: { _id: -1 } }, + { $limit: 1 } + ] + */ + uassert(ErrorCodes::InvalidOptions, + "A full collection namespace must be specified", + !collName.coll().empty()); + auto expCtx = make_intrusive( + opCtx, nullptr /*collator*/, NamespaceString::kConfigsvrPlacementHistoryNamespace); + StringMap resolvedNamespaces; + resolvedNamespaces[NamespaceString::kConfigsvrShardsNamespace.coll()] = { + NamespaceString::kConfigsvrShardsNamespace, std::vector() /* pipeline */}; + resolvedNamespaces[NamespaceString::kConfigsvrPlacementHistoryNamespace.coll()] = { + NamespaceString::kConfigsvrPlacementHistoryNamespace, + std::vector() /* pipeline */}; + expCtx->setResolvedNamespaces(resolvedNamespaces); + + // 1. Get all the history entries prior to the requested time concerning either the collection + // or the parent database. + auto regex = "^" + collName.db() + "(." + collName.coll() + ")?$"; + auto matchStage = DocumentSourceMatch::create( + BSON("timestamp" << BSON("$lte" << clusterTime) << "nss" << BSON("$regex" << regex)), + expCtx); + + // 2 & 3. Sort by timestamp and extract the first document for collection and database + auto sortStage = DocumentSourceSort::create(expCtx, BSON("timestamp" << -1)); + auto groupStageBson = BSON("_id" + << "$nss" + << "shards" + << BSON("$first" + << "$shards")); + auto groupStage = DocumentSourceGroup::createFromBson( + Document{{"$group", std::move(groupStageBson)}}.toBson().firstElement(), expCtx); + + // 4. Lookup active shards with left outer join on config.shards + Document lookupStageDoc = { + {"from", NamespaceString::kConfigsvrShardsNamespace.coll().toString()}, + {"localField", StringData("shards")}, + {"foreignField", StringData("_id")}, + {"as", StringData("activeShards")}}; + + auto lookupStage = DocumentSourceLookUp::createFromBson( + Document{{"$lookup", std::move(lookupStageDoc)}}.toBson().firstElement(), expCtx); + + // 5. Count number of active shards + auto setStageDoc = Document( + {{"$set", Document{{"numActiveShards", Document{{"$size", "$activeShards"_sd}}}}}}); + auto setStage = + DocumentSourceAddFields::createFromBson(setStageDoc.toBson().firstElement(), expCtx); + + // 6. Disable activeShards field to avoid sending it to the client + auto projectStageDoc = Document{{"activeShards", 0}}; + auto projectStage = DocumentSourceProject::create( + projectStageDoc.toBson(), expCtx, "getShardsThatOwnDataForCollAtClusterTime"); + + // Stage 7. Discard the entries with empty shards (i.e. the collection was dropped or renamed) + auto matchStage2 = + DocumentSourceMatch::create(BSON("shards" << BSON("$not" << BSON("$size" << 0))), expCtx); + + // Stage 8. Sort by namespace to ensure that the collection is before the database + auto sortStage2 = DocumentSourceSort::create(expCtx, BSON("_id" << -1)); + + // Stage 9. Extract only one document (the collection or the database if the first was discarded + // at Stage 7) + auto limitStage = DocumentSourceLimit::create(expCtx, 1); + + // Create pipeline + Pipeline::SourceContainer stages; + stages.emplace_back(std::move(matchStage)); + stages.emplace_back(std::move(sortStage)); + stages.emplace_back(std::move(groupStage)); + stages.emplace_back(std::move(lookupStage)); + stages.emplace_back(std::move(setStage)); + stages.emplace_back(std::move(projectStage)); + stages.emplace_back(std::move(matchStage2)); + stages.emplace_back(std::move(sortStage2)); + stages.emplace_back(std::move(limitStage)); + + const auto pipeline = Pipeline::create(stages, expCtx); + auto aggRequest = AggregateCommandRequest(NamespaceString::kConfigsvrPlacementHistoryNamespace, + pipeline->serializeToBson()); + + // Run the aggregation + const auto readConcern = [&]() -> repl::ReadConcernArgs { + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + return {repl::ReadConcernLevel::kMajorityReadConcern}; + } else { + const auto time = VectorClock::get(opCtx)->getTime(); + return {time.configTime(), repl::ReadConcernLevel::kMajorityReadConcern}; + } + }(); + auto aggrResult = + runCatalogAggregation(opCtx, aggRequest, readConcern, Shard::kDefaultConfigCommandTimeout); + + // This may happen in case the placementHistory is empty + if (aggrResult.empty()) { + return {}; + } + + invariant(aggrResult.size() == 1); + + // Extract the result + auto doc = aggrResult.front(); + auto numActiveShards = doc.getField("numActiveShards").Int(); + // Use Obj() instead of Array() to avoid instantiating a temporary std::vector. + auto shards = doc.getField("shards").Obj(); + + uassert(ErrorCodes::SnapshotTooOld, + "Part of the history may no longer be retrieved because of one or more removed " + "shards.", + numActiveShards == static_cast(shards.nFields())); + + std::vector activeShards; + for (const auto& shard : shards) { + activeShards.push_back(shard.String()); + } + + return activeShards; +} + } // namespace mongo -- cgit v1.2.1