summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp128
1 files changed, 34 insertions, 94 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 2f6fce9e526..335ca4f5655 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -28,9 +28,7 @@
#include "mongo/platform/basic.h"
-// This file defines functions from both of these headers
#include "mongo/db/pipeline/pipeline.h"
-#include "mongo/db/pipeline/pipeline_optimizations.h"
#include <algorithm>
@@ -40,6 +38,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_geo_near.h"
@@ -47,6 +46,7 @@
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_project.h"
+#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
@@ -329,13 +329,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() {
// Keep a copy of the original source list in case we need to reset the pipeline from split to
// unsplit later.
shardPipeline->_unsplitSources.emplace(_sources);
-
- // The order in which optimizations are applied can have significant impact on the
- // efficiency of the final pipeline. Be Careful!
- Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
- Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
- Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
-
+ cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this);
shardPipeline->_splitState = SplitState::kSplitForShards;
_splitState = SplitState::kSplitForMerge;
@@ -366,87 +360,6 @@ void Pipeline::unsplitFromSharded(
stitch();
}
-void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) {
- while (!mergePipe->_sources.empty()) {
- intrusive_ptr<DocumentSource> current = mergePipe->_sources.front();
- mergePipe->_sources.pop_front();
-
- // Check if this source is splittable.
- SplittableDocumentSource* splittable =
- dynamic_cast<SplittableDocumentSource*>(current.get());
-
- if (!splittable) {
- // Move the source from the merger _sources to the shard _sources.
- shardPipe->_sources.push_back(current);
- } else {
- // Split this source into 'merge' and 'shard' _sources.
- intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
- auto mergeSources = splittable->getMergeSources();
-
- // A source may not simultaneously be present on both sides of the split.
- invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) ==
- mergeSources.end());
-
- if (shardSource)
- shardPipe->_sources.push_back(shardSource);
-
- // Add the stages in reverse order, so that they appear in the pipeline in the same
- // order as they were returned by the stage.
- for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) {
- mergePipe->_sources.push_front(*it);
- }
-
- break;
- }
- }
-}
-
-void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe,
- Pipeline* mergePipe) {
- while (!shardPipe->_sources.empty() &&
- dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) {
- mergePipe->_sources.push_front(shardPipe->_sources.back());
- shardPipe->_sources.pop_back();
- }
-}
-
-void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
- Pipeline* mergePipe) {
- auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery())
- ? DepsTracker::MetadataAvailable::kTextScore
- : DepsTracker::MetadataAvailable::kNoMetadata;
- DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata));
- if (mergeDeps.needWholeDocument)
- return; // the merge needs all fields, so nothing we can do.
-
- // Empty project is "special" so if no fields are needed, we just ask for _id instead.
- if (mergeDeps.fields.empty())
- mergeDeps.fields.insert("_id");
-
- // Remove metadata from dependencies since it automatically flows through projection and we
- // don't want to project it in to the document.
- mergeDeps.setNeedTextScore(false);
-
- // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
- // field dependencies. While this may not be 100% ideal in all cases, it is simple and
- // avoids the worst cases by ensuring that:
- // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
- // dependencies. This situation can happen when a $sort is before the first $project or
- // $group. Without the optimization, the shards would have to reify and transmit full
- // objects even though only a subset of fields are needed.
- // 2) Optimization IS NOT applied immediately following a $project or $group since it would
- // add an unnecessary project (and therefore a deep-copy).
- for (auto&& source : shardPipe->_sources) {
- DepsTracker dt(depsMetadata);
- if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
- return;
- }
- // if we get here, add the project.
- boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson(
- BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx);
- shardPipe->_sources.push_back(project);
-}
-
BSONObj Pipeline::getInitialQuery() const {
if (_sources.empty())
return BSONObj();
@@ -699,7 +612,35 @@ Status Pipeline::_pipelineCanRunOnMongoS() const {
return Status::OK();
}
-boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria(
+void Pipeline::pushBack(boost::intrusive_ptr<DocumentSource> newStage) {
+ newStage->setSource(_sources.back().get());
+ _sources.push_back(std::move(newStage));
+}
+
+boost::intrusive_ptr<DocumentSource> Pipeline::popBack() {
+ if (_sources.empty()) {
+ return nullptr;
+ }
+ auto targetStage = _sources.back();
+ _sources.pop_back();
+ return targetStage;
+}
+
+boost::intrusive_ptr<DocumentSource> Pipeline::popFront() {
+ if (_sources.empty()) {
+ return nullptr;
+ }
+ auto targetStage = _sources.front();
+ _sources.pop_front();
+ stitch();
+ return targetStage;
+}
+
+boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithName(StringData targetStageName) {
+ return popFrontWithNameAndCriteria(targetStageName, nullptr);
+}
+
+boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria(
StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) {
if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) {
return nullptr;
@@ -710,8 +651,7 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria(
return nullptr;
}
- _sources.pop_front();
- stitch();
- return targetStage;
+ return popFront();
}
+
} // namespace mongo