diff options
author | Allison Easton <allison.easton@mongodb.com> | 2023-03-13 07:21:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-03-13 09:23:43 +0000 |
commit | 0567e419cf9217ae48b778381e511a22deb1c280 (patch) | |
tree | 11559e37426e4ca35a6dbb25df69e21fd0b00c67 /src/mongo/s/query/cluster_aggregation_planner.cpp | |
parent | a9684a6176388f095a6a2b7857c042a89bf56530 (diff) | |
download | mongo-0567e419cf9217ae48b778381e511a22deb1c280.tar.gz |
SERVER-70799 Remove ShardVersion::IGNORED()
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 4faa2681041..d6c84dd2842 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -59,6 +59,7 @@ #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/query_analysis_sampler_util.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/s/shard_version_factory.h" #include "mongo/s/transaction_router.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -128,6 +129,7 @@ ShardId pickMergingShard(OperationContext* opCtx, BSONObj createCommandForMergingShard(Document serializedCommand, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const ShardId& shardId, + const boost::optional<ShardingIndexesCatalogCache> sii, bool mergingShardContributesData, const Pipeline* pipelineForMerging) { MutableDocument mergeCmd(serializedCommand); @@ -157,7 +159,11 @@ BSONObj createCommandForMergingShard(Document serializedCommand, // Attach the IGNORED chunk version to the command. On the shard, this will skip the actual // version check but will nonetheless mark the operation as versioned. - auto mergeCmdObj = appendShardVersion(mergeCmd.freeze().toBson(), ShardVersion::IGNORED()); + auto mergeCmdObj = appendShardVersion( + mergeCmd.freeze().toBson(), + ShardVersionFactory::make(ChunkVersion::IGNORED(), + sii ? boost::make_optional(sii->getCollectionIndexes()) + : boost::none)); // Attach the read and write concerns if needed, and return the final command object. return applyReadWriteConcern(mergeCtx->opCtx, @@ -170,7 +176,7 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex const ClusterAggregate::Namespaces& namespaces, Document serializedCommand, long long batchSize, - const boost::optional<ChunkManager>& cm, + const boost::optional<CollectionRoutingInfo>& cri, DispatchShardPipelineResults&& shardDispatchResults, BSONObjBuilder* result, const PrivilegeVector& privileges, @@ -209,16 +215,20 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we // therefore must have a valid routing table. - invariant(cm); + invariant(cri); const ShardId mergingShardId = pickMergingShard( - opCtx, shardDispatchResults.needsPrimaryShardMerge, targetedShards, cm->dbPrimary()); + opCtx, shardDispatchResults.needsPrimaryShardMerge, targetedShards, cri->cm.dbPrimary()); const bool mergingShardContributesData = std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) != targetedShards.end(); - auto mergeCmdObj = createCommandForMergingShard( - serializedCommand, expCtx, mergingShardId, mergingShardContributesData, mergePipeline); + auto mergeCmdObj = createCommandForMergingShard(serializedCommand, + expCtx, + mergingShardId, + cri->sii, + mergingShardContributesData, + mergePipeline); LOGV2_DEBUG(22835, 1, @@ -580,14 +590,14 @@ AggregationTargeter AggregationTargeter::make( OperationContext* opCtx, const NamespaceString& executionNss, const std::function<std::unique_ptr<Pipeline, PipelineDeleter>()> buildPipelineFn, - boost::optional<ChunkManager> cm, + boost::optional<CollectionRoutingInfo> cri, stdx::unordered_set<NamespaceString> involvedNamespaces, bool hasChangeStream, bool startsWithDocuments, bool allowedToPassthrough, bool perShardCursor) { if (perShardCursor) { - return {TargetingPolicy::kSpecificShardOnly, nullptr, cm}; + return {TargetingPolicy::kSpecificShardOnly, nullptr, cri}; } // Check if any of the involved collections are sharded. @@ -608,7 +618,7 @@ AggregationTargeter AggregationTargeter::make( // If we don't have a routing table, then this is either a $changeStream which must run on all // shards or a $documents stage which must not. - invariant(cm || (mustRunOnAllShards && hasChangeStream) || + invariant(cri || (mustRunOnAllShards && hasChangeStream) || (startsWithDocuments && !mustRunOnAllShards)); // A pipeline is allowed to passthrough to the primary shard iff the following conditions are @@ -621,14 +631,14 @@ AggregationTargeter AggregationTargeter::make( // $currentOp. // 4. Doesn't need transformation via DocumentSource::serialize(). For example, list sessions // needs to include information about users that can only be deduced on mongos. - if (cm && !cm->isSharded() && !mustRunOnAllShards && allowedToPassthrough && + if (cri && !cri->cm.isSharded() && !mustRunOnAllShards && allowedToPassthrough && !involvesShardedCollections) { - return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, cm}; + return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, cri}; } else { auto pipeline = buildPipelineFn(); auto policy = pipeline->requiredToRunOnMongos() ? TargetingPolicy::kMongosRequired : TargetingPolicy::kAnyShard; - return AggregationTargeter{policy, std::move(pipeline), cm}; + return AggregationTargeter{policy, std::move(pipeline), cri}; } } @@ -739,7 +749,7 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, namespaces, serializedCommand, batchSize, - targeter.cm, + targeter.cri, std::move(shardDispatchResults), result, privileges, |