diff options
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 128 |
1 files changed, 34 insertions, 94 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 2f6fce9e526..335ca4f5655 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -28,9 +28,7 @@ #include "mongo/platform/basic.h" -// This file defines functions from both of these headers #include "mongo/db/pipeline/pipeline.h" -#include "mongo/db/pipeline/pipeline_optimizations.h" #include <algorithm> @@ -40,6 +38,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_geo_near.h" @@ -47,6 +46,7 @@ #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_project.h" +#include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" @@ -329,13 +329,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::splitForSharded() { // Keep a copy of the original source list in case we need to reset the pipeline from split to // unsplit later. shardPipeline->_unsplitSources.emplace(_sources); - - // The order in which optimizations are applied can have significant impact on the - // efficiency of the final pipeline. Be Careful! - Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this); - Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this); - Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this); - + cluster_aggregation_planner::performSplitPipelineOptimizations(shardPipeline.get(), this); shardPipeline->_splitState = SplitState::kSplitForShards; _splitState = SplitState::kSplitForMerge; @@ -366,87 +360,6 @@ void Pipeline::unsplitFromSharded( stitch(); } -void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, Pipeline* mergePipe) { - while (!mergePipe->_sources.empty()) { - intrusive_ptr<DocumentSource> current = mergePipe->_sources.front(); - mergePipe->_sources.pop_front(); - - // Check if this source is splittable. - SplittableDocumentSource* splittable = - dynamic_cast<SplittableDocumentSource*>(current.get()); - - if (!splittable) { - // Move the source from the merger _sources to the shard _sources. - shardPipe->_sources.push_back(current); - } else { - // Split this source into 'merge' and 'shard' _sources. - intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); - auto mergeSources = splittable->getMergeSources(); - - // A source may not simultaneously be present on both sides of the split. - invariant(std::find(mergeSources.begin(), mergeSources.end(), shardSource) == - mergeSources.end()); - - if (shardSource) - shardPipe->_sources.push_back(shardSource); - - // Add the stages in reverse order, so that they appear in the pipeline in the same - // order as they were returned by the stage. - for (auto it = mergeSources.rbegin(); it != mergeSources.rend(); ++it) { - mergePipe->_sources.push_front(*it); - } - - break; - } - } -} - -void Pipeline::Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, - Pipeline* mergePipe) { - while (!shardPipe->_sources.empty() && - dynamic_cast<DocumentSourceUnwind*>(shardPipe->_sources.back().get())) { - mergePipe->_sources.push_front(shardPipe->_sources.back()); - shardPipe->_sources.pop_back(); - } -} - -void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, - Pipeline* mergePipe) { - auto depsMetadata = DocumentSourceMatch::isTextQuery(shardPipe->getInitialQuery()) - ? DepsTracker::MetadataAvailable::kTextScore - : DepsTracker::MetadataAvailable::kNoMetadata; - DepsTracker mergeDeps(mergePipe->getDependencies(depsMetadata)); - if (mergeDeps.needWholeDocument) - return; // the merge needs all fields, so nothing we can do. - - // Empty project is "special" so if no fields are needed, we just ask for _id instead. - if (mergeDeps.fields.empty()) - mergeDeps.fields.insert("_id"); - - // Remove metadata from dependencies since it automatically flows through projection and we - // don't want to project it in to the document. - mergeDeps.setNeedTextScore(false); - - // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of - // field dependencies. While this may not be 100% ideal in all cases, it is simple and - // avoids the worst cases by ensuring that: - // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of - // dependencies. This situation can happen when a $sort is before the first $project or - // $group. Without the optimization, the shards would have to reify and transmit full - // objects even though only a subset of fields are needed. - // 2) Optimization IS NOT applied immediately following a $project or $group since it would - // add an unnecessary project (and therefore a deep-copy). - for (auto&& source : shardPipe->_sources) { - DepsTracker dt(depsMetadata); - if (source->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) - return; - } - // if we get here, add the project. - boost::intrusive_ptr<DocumentSource> project = DocumentSourceProject::createFromBson( - BSON("$project" << mergeDeps.toProjection()).firstElement(), shardPipe->pCtx); - shardPipe->_sources.push_back(project); -} - BSONObj Pipeline::getInitialQuery() const { if (_sources.empty()) return BSONObj(); @@ -699,7 +612,35 @@ Status Pipeline::_pipelineCanRunOnMongoS() const { return Status::OK(); } -boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria( +void Pipeline::pushBack(boost::intrusive_ptr<DocumentSource> newStage) { + newStage->setSource(_sources.back().get()); + _sources.push_back(std::move(newStage)); +} + +boost::intrusive_ptr<DocumentSource> Pipeline::popBack() { + if (_sources.empty()) { + return nullptr; + } + auto targetStage = _sources.back(); + _sources.pop_back(); + return targetStage; +} + +boost::intrusive_ptr<DocumentSource> Pipeline::popFront() { + if (_sources.empty()) { + return nullptr; + } + auto targetStage = _sources.front(); + _sources.pop_front(); + stitch(); + return targetStage; +} + +boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithName(StringData targetStageName) { + return popFrontWithNameAndCriteria(targetStageName, nullptr); +} + +boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithNameAndCriteria( StringData targetStageName, stdx::function<bool(const DocumentSource* const)> predicate) { if (_sources.empty() || _sources.front()->getSourceName() != targetStageName) { return nullptr; @@ -710,8 +651,7 @@ boost::intrusive_ptr<DocumentSource> Pipeline::popFrontWithCriteria( return nullptr; } - _sources.pop_front(); - stitch(); - return targetStage; + return popFront(); } + } // namespace mongo |