summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_aggregation_planner.cpp
diff options
context:
space:
mode:
authorAllison Easton <allison.easton@mongodb.com>2023-03-13 07:21:14 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-03-13 09:23:43 +0000
commit0567e419cf9217ae48b778381e511a22deb1c280 (patch)
tree11559e37426e4ca35a6dbb25df69e21fd0b00c67 /src/mongo/s/query/cluster_aggregation_planner.cpp
parenta9684a6176388f095a6a2b7857c042a89bf56530 (diff)
downloadmongo-0567e419cf9217ae48b778381e511a22deb1c280.tar.gz
SERVER-70799 Remove ShardVersion::IGNORED()
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp')
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp36
1 files changed, 23 insertions, 13 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 4faa2681041..d6c84dd2842 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -59,6 +59,7 @@
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/query_analysis_sampler_util.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/s/shard_version_factory.h"
#include "mongo/s/transaction_router.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kQuery
@@ -128,6 +129,7 @@ ShardId pickMergingShard(OperationContext* opCtx,
BSONObj createCommandForMergingShard(Document serializedCommand,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const ShardId& shardId,
+ const boost::optional<ShardingIndexesCatalogCache> sii,
bool mergingShardContributesData,
const Pipeline* pipelineForMerging) {
MutableDocument mergeCmd(serializedCommand);
@@ -157,7 +159,11 @@ BSONObj createCommandForMergingShard(Document serializedCommand,
// Attach the IGNORED chunk version to the command. On the shard, this will skip the actual
// version check but will nonetheless mark the operation as versioned.
- auto mergeCmdObj = appendShardVersion(mergeCmd.freeze().toBson(), ShardVersion::IGNORED());
+ auto mergeCmdObj = appendShardVersion(
+ mergeCmd.freeze().toBson(),
+ ShardVersionFactory::make(ChunkVersion::IGNORED(),
+ sii ? boost::make_optional(sii->getCollectionIndexes())
+ : boost::none));
// Attach the read and write concerns if needed, and return the final command object.
return applyReadWriteConcern(mergeCtx->opCtx,
@@ -170,7 +176,7 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
const ClusterAggregate::Namespaces& namespaces,
Document serializedCommand,
long long batchSize,
- const boost::optional<ChunkManager>& cm,
+ const boost::optional<CollectionRoutingInfo>& cri,
DispatchShardPipelineResults&& shardDispatchResults,
BSONObjBuilder* result,
const PrivilegeVector& privileges,
@@ -209,16 +215,20 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
// If we are not merging on mongoS, then this is not a $changeStream aggregation, and we
// therefore must have a valid routing table.
- invariant(cm);
+ invariant(cri);
const ShardId mergingShardId = pickMergingShard(
- opCtx, shardDispatchResults.needsPrimaryShardMerge, targetedShards, cm->dbPrimary());
+ opCtx, shardDispatchResults.needsPrimaryShardMerge, targetedShards, cri->cm.dbPrimary());
const bool mergingShardContributesData =
std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) !=
targetedShards.end();
- auto mergeCmdObj = createCommandForMergingShard(
- serializedCommand, expCtx, mergingShardId, mergingShardContributesData, mergePipeline);
+ auto mergeCmdObj = createCommandForMergingShard(serializedCommand,
+ expCtx,
+ mergingShardId,
+ cri->sii,
+ mergingShardContributesData,
+ mergePipeline);
LOGV2_DEBUG(22835,
1,
@@ -580,14 +590,14 @@ AggregationTargeter AggregationTargeter::make(
OperationContext* opCtx,
const NamespaceString& executionNss,
const std::function<std::unique_ptr<Pipeline, PipelineDeleter>()> buildPipelineFn,
- boost::optional<ChunkManager> cm,
+ boost::optional<CollectionRoutingInfo> cri,
stdx::unordered_set<NamespaceString> involvedNamespaces,
bool hasChangeStream,
bool startsWithDocuments,
bool allowedToPassthrough,
bool perShardCursor) {
if (perShardCursor) {
- return {TargetingPolicy::kSpecificShardOnly, nullptr, cm};
+ return {TargetingPolicy::kSpecificShardOnly, nullptr, cri};
}
// Check if any of the involved collections are sharded.
@@ -608,7 +618,7 @@ AggregationTargeter AggregationTargeter::make(
// If we don't have a routing table, then this is either a $changeStream which must run on all
// shards or a $documents stage which must not.
- invariant(cm || (mustRunOnAllShards && hasChangeStream) ||
+ invariant(cri || (mustRunOnAllShards && hasChangeStream) ||
(startsWithDocuments && !mustRunOnAllShards));
// A pipeline is allowed to passthrough to the primary shard iff the following conditions are
@@ -621,14 +631,14 @@ AggregationTargeter AggregationTargeter::make(
// $currentOp.
// 4. Doesn't need transformation via DocumentSource::serialize(). For example, list sessions
// needs to include information about users that can only be deduced on mongos.
- if (cm && !cm->isSharded() && !mustRunOnAllShards && allowedToPassthrough &&
+ if (cri && !cri->cm.isSharded() && !mustRunOnAllShards && allowedToPassthrough &&
!involvesShardedCollections) {
- return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, cm};
+ return AggregationTargeter{TargetingPolicy::kPassthrough, nullptr, cri};
} else {
auto pipeline = buildPipelineFn();
auto policy = pipeline->requiredToRunOnMongos() ? TargetingPolicy::kMongosRequired
: TargetingPolicy::kAnyShard;
- return AggregationTargeter{policy, std::move(pipeline), cm};
+ return AggregationTargeter{policy, std::move(pipeline), cri};
}
}
@@ -739,7 +749,7 @@ Status dispatchPipelineAndMerge(OperationContext* opCtx,
namespaces,
serializedCommand,
batchSize,
- targeter.cm,
+ targeter.cri,
std::move(shardDispatchResults),
result,
privileges,