summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_aggregation_planner.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-11-29 09:56:42 -0500
committerCharlie Swanson <charlie.swanson@mongodb.com>2019-02-13 07:30:10 -0500
commit87194fbe0c24525bc1f2d674012fe6978eca77d2 (patch)
tree0f13b14046152f0b475ae16d9a95b5e2ba0c4cbb /src/mongo/s/query/cluster_aggregation_planner.cpp
parent69f26fa3798b0d7927858aa704243cdac676c6e9 (diff)
downloadmongo-87194fbe0c24525bc1f2d674012fe6978eca77d2.tar.gz
SERVER-38311 Change out merging strategy
Allows an $out stage to run in parallel on all shards if the target collection is sharded and so is the input collection to the aggregate.
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp')
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp33
1 files changed, 13 insertions, 20 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 1cc5dca28fd..88e82b93f2e 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -67,8 +67,6 @@ namespace {
*
* It is not safe to call this optimization multiple times.
*
- * NOTE: looks for NeedsMergerDocumentSources and uses that API
- *
* Returns the sort specification if the input streams are sorted, and false otherwise.
*/
boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pipeline* mergePipe) {
@@ -76,28 +74,23 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi
boost::intrusive_ptr<DocumentSource> current = mergePipe->popFront();
// Check if this source is splittable.
- NeedsMergerDocumentSource* splittable =
- dynamic_cast<NeedsMergerDocumentSource*>(current.get());
-
- if (!splittable) {
+ auto mergeLogic = current->mergingLogic();
+ if (!mergeLogic) {
// Move the source from the merger _sources to the shard _sources.
shardPipe->push_back(current);
- } else {
- // Split this source into 'merge' and 'shard' _sources.
- boost::intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
- auto mergeLogic = splittable->mergingLogic();
+ continue;
+ }
- // A source may not simultaneously be present on both sides of the split.
- invariant(shardSource != mergeLogic.mergingStage);
+ // A source may not simultaneously be present on both sides of the split.
+ invariant(mergeLogic->shardsStage != mergeLogic->mergingStage);
- if (shardSource)
- shardPipe->push_back(std::move(shardSource));
+ if (mergeLogic->shardsStage)
+ shardPipe->push_back(std::move(mergeLogic->shardsStage));
- if (mergeLogic.mergingStage)
- mergePipe->addInitialSource(std::move(mergeLogic.mergingStage));
+ if (mergeLogic->mergingStage)
+ mergePipe->addInitialSource(std::move(mergeLogic->mergingStage));
- return mergeLogic.inputSortPattern;
- }
+ return mergeLogic->inputSortPattern;
}
return boost::none;
}
@@ -296,8 +289,8 @@ ClusterClientCursorGuard convertPipelineToRouterStages(
bool stageCanRunInParallel(const boost::intrusive_ptr<DocumentSource>& stage,
const std::set<std::string>& nameOfShardKeyFieldsUponEntryToStage) {
- if (auto needsMerger = dynamic_cast<NeedsMergerDocumentSource*>(stage.get())) {
- return needsMerger->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage);
+ if (stage->mergingLogic()) {
+ return stage->canRunInParallelBeforeOut(nameOfShardKeyFieldsUponEntryToStage);
} else {
// This stage is fine to execute in parallel on each stream. For example, a $match can be
// applied to each stream in parallel.