summaryrefslogtreecommitdiff
path: root/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
diff options
context:
space:
mode:
authorEnrico Golfieri <enrico.golfieri@mongodb.com>2022-09-28 15:27:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-28 16:18:58 +0000
commitc0953d92e0a07dd130a0f4a4ac26a9fc4d7ef7f7 (patch)
treeb162846f7bbef0088ca3da1270933e8f89167268 /src/mongo/s/catalog/sharding_catalog_client_impl.cpp
parent4845d8574060d8c756e454161285327f74295b6e (diff)
downloadmongo-c0953d92e0a07dd130a0f4a4ac26a9fc4d7ef7f7.tar.gz
SERVER-68929 Define a new method to query historical placement data on a namespace from a mongos process
Diffstat (limited to 'src/mongo/s/catalog/sharding_catalog_client_impl.cpp')
-rw-r--r--src/mongo/s/catalog/sharding_catalog_client_impl.cpp166
1 files changed, 166 insertions, 0 deletions
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
index 27e3ee8f319..f9ddf58e907 100644
--- a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp
@@ -40,8 +40,12 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/pipeline/document_source_add_fields.h"
+#include "mongo/db/pipeline/document_source_group.h"
#include "mongo/db/pipeline/document_source_lookup.h"
#include "mongo/db/pipeline/document_source_match.h"
+#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_union_with.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
@@ -1199,4 +1203,166 @@ StatusWith<std::vector<KeysCollectionDocument>> ShardingCatalogClientImpl::getNe
return keys;
}
+
+std::vector<ShardId> ShardingCatalogClientImpl::getShardsThatOwnDataForCollAtClusterTime(
+ OperationContext* opCtx, const NamespaceString& collName, const Timestamp& clusterTime) {
+ /*
+ Stage 1. Select only the entry with timestamp <= clusterTime and filter out all nss that are not
+ the collection or the database
+ Stage 2. sort by timestamp
+ Stage 3. Extract the first document for collection and database
+ Stage 4. Access to the list of shards currently active in the cluster
+ Stage 5. Count the number of currently active shards among the found ones
+ Stage 6. Do not return the list of active shards (used only for the count)
+ Stage 7. Discard the entries with empty shards (i.e. the collection was dropped or renamed)
+ Stage 8. Sort by namespace to ensure that the collection is before the database
+ Stage 9. Extract only one document (the collection or the database if the first was discarded at
+ Stage 7)
+
+ regex=^db(.collection)?$
+ [
+ {
+ "$match": { "timestamp": { "$lte": clusterTime } , "nss" : { $regex: regex} }
+ },
+ {
+ "$sort": { "timestamp": -1 }
+ },
+ {
+ "$group": {
+ _id: "$nss",
+ shards: { $first: "$shards" }
+ }
+ },
+ {
+ $lookup:
+ {
+ from: "shards",
+ localField: "shards",
+ foreignField: "_id",
+ as: "activeShards"
+ }
+ },
+ { "$set" : { "numActiveShards" : { "$size" : "$activeShards" } } },
+ { "$project": { "activeShards" : 0 } }
+ { "$match": { shards: { $not: { $size: 0 } } } },
+ { $sort: { _id: -1 } },
+ { $limit: 1 }
+ ]
+ */
+ uassert(ErrorCodes::InvalidOptions,
+ "A full collection namespace must be specified",
+ !collName.coll().empty());
+ auto expCtx = make_intrusive<ExpressionContext>(
+ opCtx, nullptr /*collator*/, NamespaceString::kConfigsvrPlacementHistoryNamespace);
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces[NamespaceString::kConfigsvrShardsNamespace.coll()] = {
+ NamespaceString::kConfigsvrShardsNamespace, std::vector<BSONObj>() /* pipeline */};
+ resolvedNamespaces[NamespaceString::kConfigsvrPlacementHistoryNamespace.coll()] = {
+ NamespaceString::kConfigsvrPlacementHistoryNamespace,
+ std::vector<BSONObj>() /* pipeline */};
+ expCtx->setResolvedNamespaces(resolvedNamespaces);
+
+ // 1. Get all the history entries prior to the requested time concerning either the collection
+ // or the parent database.
+ auto regex = "^" + collName.db() + "(." + collName.coll() + ")?$";
+ auto matchStage = DocumentSourceMatch::create(
+ BSON("timestamp" << BSON("$lte" << clusterTime) << "nss" << BSON("$regex" << regex)),
+ expCtx);
+
+ // 2 & 3. Sort by timestamp and extract the first document for collection and database
+ auto sortStage = DocumentSourceSort::create(expCtx, BSON("timestamp" << -1));
+ auto groupStageBson = BSON("_id"
+ << "$nss"
+ << "shards"
+ << BSON("$first"
+ << "$shards"));
+ auto groupStage = DocumentSourceGroup::createFromBson(
+ Document{{"$group", std::move(groupStageBson)}}.toBson().firstElement(), expCtx);
+
+ // 4. Lookup active shards with left outer join on config.shards
+ Document lookupStageDoc = {
+ {"from", NamespaceString::kConfigsvrShardsNamespace.coll().toString()},
+ {"localField", StringData("shards")},
+ {"foreignField", StringData("_id")},
+ {"as", StringData("activeShards")}};
+
+ auto lookupStage = DocumentSourceLookUp::createFromBson(
+ Document{{"$lookup", std::move(lookupStageDoc)}}.toBson().firstElement(), expCtx);
+
+ // 5. Count number of active shards
+ auto setStageDoc = Document(
+ {{"$set", Document{{"numActiveShards", Document{{"$size", "$activeShards"_sd}}}}}});
+ auto setStage =
+ DocumentSourceAddFields::createFromBson(setStageDoc.toBson().firstElement(), expCtx);
+
+ // 6. Disable activeShards field to avoid sending it to the client
+ auto projectStageDoc = Document{{"activeShards", 0}};
+ auto projectStage = DocumentSourceProject::create(
+ projectStageDoc.toBson(), expCtx, "getShardsThatOwnDataForCollAtClusterTime");
+
+ // Stage 7. Discard the entries with empty shards (i.e. the collection was dropped or renamed)
+ auto matchStage2 =
+ DocumentSourceMatch::create(BSON("shards" << BSON("$not" << BSON("$size" << 0))), expCtx);
+
+ // Stage 8. Sort by namespace to ensure that the collection is before the database
+ auto sortStage2 = DocumentSourceSort::create(expCtx, BSON("_id" << -1));
+
+ // Stage 9. Extract only one document (the collection or the database if the first was discarded
+ // at Stage 7)
+ auto limitStage = DocumentSourceLimit::create(expCtx, 1);
+
+ // Create pipeline
+ Pipeline::SourceContainer stages;
+ stages.emplace_back(std::move(matchStage));
+ stages.emplace_back(std::move(sortStage));
+ stages.emplace_back(std::move(groupStage));
+ stages.emplace_back(std::move(lookupStage));
+ stages.emplace_back(std::move(setStage));
+ stages.emplace_back(std::move(projectStage));
+ stages.emplace_back(std::move(matchStage2));
+ stages.emplace_back(std::move(sortStage2));
+ stages.emplace_back(std::move(limitStage));
+
+ const auto pipeline = Pipeline::create(stages, expCtx);
+ auto aggRequest = AggregateCommandRequest(NamespaceString::kConfigsvrPlacementHistoryNamespace,
+ pipeline->serializeToBson());
+
+ // Run the aggregation
+ const auto readConcern = [&]() -> repl::ReadConcernArgs {
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ return {repl::ReadConcernLevel::kMajorityReadConcern};
+ } else {
+ const auto time = VectorClock::get(opCtx)->getTime();
+ return {time.configTime(), repl::ReadConcernLevel::kMajorityReadConcern};
+ }
+ }();
+ auto aggrResult =
+ runCatalogAggregation(opCtx, aggRequest, readConcern, Shard::kDefaultConfigCommandTimeout);
+
+ // This may happen in case the placementHistory is empty
+ if (aggrResult.empty()) {
+ return {};
+ }
+
+ invariant(aggrResult.size() == 1);
+
+ // Extract the result
+ auto doc = aggrResult.front();
+ auto numActiveShards = doc.getField("numActiveShards").Int();
+ // Use Obj() instead of Array() to avoid instantiating a temporary std::vector.
+ auto shards = doc.getField("shards").Obj();
+
+ uassert(ErrorCodes::SnapshotTooOld,
+ "Part of the history may no longer be retrieved because of one or more removed "
+ "shards.",
+ numActiveShards == static_cast<int>(shards.nFields()));
+
+ std::vector<ShardId> activeShards;
+ for (const auto& shard : shards) {
+ activeShards.push_back(shard.String());
+ }
+
+ return activeShards;
+}
+
} // namespace mongo