diff options
author | Mickey. J Winters <mickey.winters@mongodb.com> | 2022-02-25 14:52:13 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-25 15:56:46 +0000 |
commit | 586663fec7c3a7d4a8b0185ff24825bd15e80dff (patch) | |
tree | 57539dcde8d2a38184536582367a6c4f6c96a592 /src/mongo/s/query/cluster_aggregate.cpp | |
parent | f01a90660cb0a0a22d6b2166cd8b70d7990a6b12 (diff) | |
download | mongo-586663fec7c3a7d4a8b0185ff24825bd15e80dff.tar.gz |
SERVER-62738 implement aggregate $_passthroughToShard option
Diffstat (limited to 'src/mongo/s/query/cluster_aggregate.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 40 |
1 files changed, 38 insertions, 2 deletions
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 1950cbc0254..200cef980d2 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -325,14 +325,17 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, cm, involvedNamespaces, hasChangeStream, - liteParsedPipeline.allowedToPassthroughFromMongos()); + liteParsedPipeline.allowedToPassthroughFromMongos(), + request.getPassthroughToShard().has_value()); if (!expCtx) { // When the AggregationTargeter chooses a "passthrough" policy, it does not call the // 'pipelineBuilder' function, so we never get an expression context. Because this is a // passthrough, we only need a bare minimum expression context anyway. invariant(targeter.policy == - cluster_aggregation_planner::AggregationTargeter::kPassthrough); + cluster_aggregation_planner::AggregationTargeter::kPassthrough || + targeter.policy == + cluster_aggregation_planner::AggregationTargeter::kSpecificShardOnly); expCtx = make_intrusive<ExpressionContext>( opCtx, nullptr, namespaces.executionNss, boost::none, request.getLet()); } @@ -390,6 +393,39 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, result, hasChangeStream); } + case cluster_aggregation_planner::AggregationTargeter::TargetingPolicy:: + kSpecificShardOnly: { + // Mark expCtx as tailable and await data so CCC behaves accordingly. + expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData; + + uassert(6273801, + "per shard cursor pipeline must contain $changeStream", + hasChangeStream); + + // Make sure the rest of the pipeline can be pushed down. + auto pipeline = request.getPipeline(); + std::vector<BSONObj> nonChangeStreamPart(pipeline.begin() + 1, pipeline.end()); + LiteParsedPipeline nonChangeStreamLite(request.getNamespace(), nonChangeStreamPart); + uassert(6273802, + "$_passthroughToShard specified with a stage that is not allowed to " + "passthrough from mongos", + nonChangeStreamLite.allowedToPassthroughFromMongos()); + ShardId shardId(std::string(request.getPassthroughToShard()->getShard())); + uassert(6273803, + "$_passthroughToShard not supported for queries against config replica set", + shardId != ShardId::kConfigServerId); + + return cluster_aggregation_planner::runPipelineOnSpecificShardOnly( + expCtx, + namespaces, + *targeter.cm, + request.getExplain(), + aggregation_request_helper::serializeToCommandDoc(request), + privileges, + shardId, + true, + result); + } MONGO_UNREACHABLE; } |