diff options
Diffstat (limited to 'src/mongo/db/pipeline/cluster_aggregation_planner.h')
-rw-r--r-- | src/mongo/db/pipeline/cluster_aggregation_planner.h | 49 |
1 files changed, 42 insertions, 7 deletions
diff --git a/src/mongo/db/pipeline/cluster_aggregation_planner.h b/src/mongo/db/pipeline/cluster_aggregation_planner.h index f62f158f6ca..3b3aaa63df4 100644 --- a/src/mongo/db/pipeline/cluster_aggregation_planner.h +++ b/src/mongo/db/pipeline/cluster_aggregation_planner.h @@ -28,31 +28,66 @@ #pragma once +#include "mongo/db/pipeline/lite_parsed_pipeline.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/s/query/cluster_client_cursor_impl.h" namespace mongo { namespace cluster_aggregation_planner { /** - * Performs optimizations with the aim of reducing computing time and network traffic when a - * pipeline has been split into two pieces. Modifies 'shardPipeline' and 'mergingPipeline' such that - * they may contain different stages, but still compute the same results when executed. + * Represents the two halves of a pipeline that will execute in a sharded cluster. 'shardsPipeline' + * will execute in parallel on each shard, and 'mergePipeline' will execute on the merge host - + * either one of the shards or a mongos. */ -void performSplitPipelineOptimizations(Pipeline* shardPipeline, Pipeline* mergingPipeline); +struct SplitPipeline { + SplitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> shardsPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline, + boost::optional<BSONObj> shardCursorsSortSpec) + : shardsPipeline(std::move(shardsPipeline)), + mergePipeline(std::move(mergePipeline)), + shardCursorsSortSpec(std::move(shardCursorsSortSpec)) {} + + std::unique_ptr<Pipeline, PipelineDeleter> shardsPipeline; + std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; + + // If set, the cursors from the shards are expected to be sorted according to this spec, and to + // have populated a "$sortKey" metadata field which can be used to compare the results. + boost::optional<BSONObj> shardCursorsSortSpec; +}; /** - * Rips off an initial $sort stage that can be handled by cursor merging machinery. Returns the - * sort key pattern of such a $sort stage if there was one, and boost::none otherwise. + * Split the current Pipeline into a Pipeline for each shard, and a Pipeline that combines the + * results within a merging process. This call also performs optimizations with the aim of reducing + * computing time and network traffic when a pipeline has been split into two pieces. + * + * The 'mergePipeline' returned as part of the SplitPipeline here is not ready to execute until the + * 'shardsPipeline' has been sent to the shards and cursors have been established. Once cursors have + * been established, the merge pipeline can be made executable by calling 'addMergeCursorsSource()' */ -boost::optional<BSONObj> popLeadingMergeSort(Pipeline* mergePipeline); +SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline); /** * Creates a new DocumentSourceMergeCursors from the provided 'remoteCursors' and adds it to the * front of 'mergePipeline'. */ void addMergeCursorsSource(Pipeline* mergePipeline, + const LiteParsedPipeline&, + BSONObj cmdSentToShards, std::vector<RemoteCursor> remoteCursors, + const std::vector<ShardId>& targetedShards, + boost::optional<BSONObj> shardCursorsSortSpec, executor::TaskExecutor*); +/** + * Builds a ClusterClientCursor which will execute 'pipeline'. If 'pipeline' consists entirely of + * $skip and $limit stages, the pipeline is eliminated entirely and replaced with a RouterExecStage + * tree that does same thing but will avoid using a RouterStagePipeline. Avoiding a + * RouterStagePipeline will remove an expensive conversion from BSONObj -> Document for each result. + */ +ClusterClientCursorGuard buildClusterCursor(OperationContext* opCtx, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + ClusterClientCursorParams&&); + } // namespace cluster_aggregation_planner } // namespace mongo |