From 0567e419cf9217ae48b778381e511a22deb1c280 Mon Sep 17 00:00:00 2001 From: Allison Easton Date: Mon, 13 Mar 2023 07:21:14 +0000 Subject: SERVER-70799 Remove ShardVersion::IGNORED() --- src/mongo/s/query/cluster_aggregation_planner.cpp | 36 +++++++++++++++-------- 1 file changed, 23 insertions(+), 13 deletions(-) (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp') diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 4faa2681041..d6c84dd2842 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -59,6 +59,7 @@ #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/query_analysis_sampler_util.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/s/shard_version_factory.h" #include "mongo/s/transaction_router.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery @@ -128,6 +129,7 @@ ShardId pickMergingShard(OperationContext* opCtx, BSONObj createCommandForMergingShard(Document serializedCommand, const boost::intrusive_ptr& mergeCtx, const ShardId& shardId, + const boost::optional sii, bool mergingShardContributesData, const Pipeline* pipelineForMerging) { MutableDocument mergeCmd(serializedCommand); @@ -157,7 +159,11 @@ BSONObj createCommandForMergingShard(Document serializedCommand, // Attach the IGNORED chunk version to the command. On the shard, this will skip the actual // version check but will nonetheless mark the operation as versioned. - auto mergeCmdObj = appendShardVersion(mergeCmd.freeze().toBson(), ShardVersion::IGNORED()); + auto mergeCmdObj = appendShardVersion( + mergeCmd.freeze().toBson(), + ShardVersionFactory::make(ChunkVersion::IGNORED(), + sii ? boost::make_optional(sii->getCollectionIndexes()) + : boost::none)); // Attach the read and write concerns if needed, and return the final command object. return applyReadWriteConcern(mergeCtx->opCtx, @@ -170,7 +176,7 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr& ex const ClusterAggregate::Namespaces& namespaces, Document serializedCommand, long long batchSize, - const boost::optional& cm, + const boost::optional& cri, DispatchShardPipelineResults&& shardDispatchResults, BSONObjBuilder* result, const PrivilegeVector& privileges, @@ -209,16 +215,20 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr& ex // If we are not merging on mongoS, then this is not a $changeStream aggregation, and we // therefore must have a valid routing table. - invariant(cm); + invariant(cri); const ShardId mergingShardId = pickMergingShard( - opCtx, shardDispatchResults.needsPrimaryShardMerge, targetedShards, cm->dbPrimary()); + opCtx, shardDispatchResults.needsPrimaryShardMerge, targetedShards, cri->cm.dbPrimary()); const bool mergingShardContributesData = std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) != targetedShards.end(); - auto mergeCmdObj = createCommandForMergingShard( - serializedCommand, expCtx, mergingShardId, mergingShardContributesData, mergePipeline); + auto mergeCmdObj = createCommandForMergingShard(serializedCommand, + expCtx, + mergingShardId, + cri->sii, + mergingShardContributesData, + mergePipeline); LOGV2_DEBUG(22835, 1, @@ -580,14 +590,14 @@ AggregationTargeter AggregationTargeter::make( OperationContext* opCtx, const NamespaceString& executionNss, const std::function()> buildPipelineFn, - boost::optional cm, + boost::optional cri, stdx::unordered_set involvedNamespaces, bool hasChangeStream, bool startsWithDocuments, bool allowedToPassthrough, bool perShardCursor) { if (perShardCursor) { - return {TargetingPolicy::kSpecificShardOnly, nullptr, cm}; + return {TargetingPolicy::kSpecificShardOnly, nullptr, cri}; } // Check if any of the involved collections are sharded. @@ -608,7 +618,7 @@ AggregationTargeter AggregationTargeter::make( // If we don't have a routing table, then this is either a $changeStream which must run on all // shards or a $documents stage which must not. - invariant(cm || (mustRunOnAllShards && hasChangeStream) || + invariant(cri || (mustRunOnAllShards && hasChangeStream) || (startsWithDocuments && !mustRunOnAllShards)); // A pipeline is allowed to passthrough to the primary shard iff the following conditions are @@ -621,14 +631,14 @@ AggregationTargeter AggregationTargeter::make( // $currentOp. // 4. Doesn't need transformation via DocumentSource::serialize(). For example, list sessions // needs to include information about users that can only be deduced on mongos. - if (cm && !cm->isSharded() && !mustRunOnAllShards && allowedToPassthrough && + if (cri && !cri->cm.isSharded() && !mustRunOnAllShards && allowedToPassthrough && !involvesShardedCollections) { - return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, cm}; + return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, cri}; } else { auto pipeline = buildPipelineFn(); auto policy = pipeline->requiredToRunOnMongos() ? TargetingPolicy::kMongosRequired : TargetingPolicy::kAnyShard; - return AggregationTargeter{policy, std::move(pipeline), cm}; + return AggregationTargeter{policy, std::move(pipeline), cri}; } } @@ -739,7 +749,7 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx, namespaces, serializedCommand, batchSize, - targeter.cm, + targeter.cri, std::move(shardDispatchResults), result, privileges, -- cgit v1.2.1