summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_aggregate.cpp
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-02-25 14:52:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-25 15:56:46 +0000
commit586663fec7c3a7d4a8b0185ff24825bd15e80dff (patch)
tree57539dcde8d2a38184536582367a6c4f6c96a592 /src/mongo/s/query/cluster_aggregate.cpp
parentf01a90660cb0a0a22d6b2166cd8b70d7990a6b12 (diff)
downloadmongo-586663fec7c3a7d4a8b0185ff24825bd15e80dff.tar.gz
SERVER-62738 implement aggregate $_passthroughToShard option
Diffstat (limited to 'src/mongo/s/query/cluster_aggregate.cpp')
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp40
1 files changed, 38 insertions, 2 deletions
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 1950cbc0254..200cef980d2 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -325,14 +325,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
cm,
involvedNamespaces,
hasChangeStream,
- liteParsedPipeline.allowedToPassthroughFromMongos());
+ liteParsedPipeline.allowedToPassthroughFromMongos(),
+ request.getPassthroughToShard().has_value());
if (!expCtx) {
// When the AggregationTargeter chooses a "passthrough" policy, it does not call the
// 'pipelineBuilder' function, so we never get an expression context. Because this is a
// passthrough, we only need a bare minimum expression context anyway.
invariant(targeter.policy ==
- cluster_aggregation_planner::AggregationTargeter::kPassthrough);
+ cluster_aggregation_planner::AggregationTargeter::kPassthrough ||
+ targeter.policy ==
+ cluster_aggregation_planner::AggregationTargeter::kSpecificShardOnly);
expCtx = make_intrusive<ExpressionContext>(
opCtx, nullptr, namespaces.executionNss, boost::none, request.getLet());
}
@@ -390,6 +393,39 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
result,
hasChangeStream);
}
+ case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy::
+ kSpecificShardOnly: {
+ // Mark expCtx as tailable and await data so CCC behaves accordingly.
+ expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
+
+ uassert(6273801,
+ "per shard cursor pipeline must contain $changeStream",
+ hasChangeStream);
+
+ // Make sure the rest of the pipeline can be pushed down.
+ auto pipeline = request.getPipeline();
+ std::vector<BSONObj> nonChangeStreamPart(pipeline.begin() + 1, pipeline.end());
+ LiteParsedPipeline nonChangeStreamLite(request.getNamespace(), nonChangeStreamPart);
+ uassert(6273802,
+ "$_passthroughToShard specified with a stage that is not allowed to "
+ "passthrough from mongos",
+ nonChangeStreamLite.allowedToPassthroughFromMongos());
+ ShardId shardId(std::string(request.getPassthroughToShard()->getShard()));
+ uassert(6273803,
+ "$_passthroughToShard not supported for queries against config replica set",
+ shardId != ShardId::kConfigServerId);
+
+ return cluster_aggregation_planner::runPipelineOnSpecificShardOnly(
+ expCtx,
+ namespaces,
+ *targeter.cm,
+ request.getExplain(),
+ aggregation_request_helper::serializeToCommandDoc(request),
+ privileges,
+ shardId,
+ true,
+ result);
+ }
MONGO_UNREACHABLE;
}