diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-11-29 09:56:42 -0500 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2019-02-13 07:30:10 -0500 |
commit | 87194fbe0c24525bc1f2d674012fe6978eca77d2 (patch) | |
tree | 0f13b14046152f0b475ae16d9a95b5e2ba0c4cbb /src/mongo/s/query/cluster_aggregation_planner.cpp | |
parent | 69f26fa3798b0d7927858aa704243cdac676c6e9 (diff) | |
download | mongo-87194fbe0c24525bc1f2d674012fe6978eca77d2.tar.gz |
SERVER-38311 Change out merging strategy
Allows an $out stage to run in parallel on all shards if the target
collection is sharded and so is the input collection to the aggregate.
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 33 |
1 files changed, 13 insertions, 20 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 1cc5dca28fd..88e82b93f2e 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -67,8 +67,6 @@ namespace { * * It is not safe to call this optimization multiple times. * - * NOTE: looks for NeedsMergerDocumentSources and uses that API - * * Returns the sort specification if the input streams are sorted, and false otherwise. */ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pipeline* mergePipe) { @@ -76,28 +74,23 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront(); // Check if this source is splittable. - NeedsMergerDocumentSource* splittable = - dynamic_cast<NeedsMergerDocumentSource*>(current.get()); - - if (!splittable) { + auto mergeLogic = current->mergingLogic(); + if (!mergeLogic) { // Move the source from the merger _sources to the shard _sources. shardPipe->push_back(current); - } else { - // Split this source into 'merge' and 'shard' _sources. - boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); - auto mergeLogic = splittable->mergingLogic(); + continue; + } - // A source may not simultaneously be present on both sides of the split. - invariant(shardSource != mergeLogic.mergingStage); + // A source may not simultaneously be present on both sides of the split. + invariant(mergeLogic->shardsStage != mergeLogic->mergingStage); - if (shardSource) - shardPipe->push_back(std::move(shardSource)); + if (mergeLogic->shardsStage) + shardPipe->push_back(std::move(mergeLogic->shardsStage)); - if (mergeLogic.mergingStage) - mergePipe->addInitialSource(std::move(mergeLogic.mergingStage)); + if (mergeLogic->mergingStage) + mergePipe->addInitialSource(std::move(mergeLogic->mergingStage)); - return mergeLogic.inputSortPattern; - } + return mergeLogic->inputSortPattern; } return boost::none; } @@ -296,8 +289,8 @@ ClusterClientCursorGuard convertPipelineToRouterStages( bool stageCanRunInParallel(const boost::intrusive_ptr<DocumentSource>& stage, const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) { - if (auto needsMerger = dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) { - return needsMerger->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage); + if (stage->mergingLogic()) { + return stage->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage); } else { // This stage is fine to execute in parallel on each stream. For example, a $match can be // applied to each stream in parallel. |