diff options
author | Mathias Stearn <mathias@10gen.com> | 2013-10-16 15:01:30 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2013-10-23 19:28:02 -0400 |
commit | 18634c0b8798ddfd98d1370e038f82f8b372db0d (patch) | |
tree | 23ae13ccd175debd808dec4d5664bf55cf137193 /src/mongo/db/pipeline/pipeline.cpp | |
parent | 007665da8b3c2c622ab5fe112f64b7d475bb3220 (diff) | |
download | mongo-18634c0b8798ddfd98d1370e038f82f8b372db0d.tar.gz |
Split up Pipeline optimizations into separate functions
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 145 |
1 files changed, 65 insertions, 80 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 7e4f80a34b1..b82f970c3e2 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -28,7 +28,9 @@ #include "mongo/pch.h" +// This file defines functions from both of these headers #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/pipeline/pipeline_optimizations.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/privilege.h" @@ -95,9 +97,9 @@ namespace mongo { ((const StageDesc *)pR)->pName); } - intrusive_ptr<Pipeline> Pipeline::parseCommand( - string &errmsg, BSONObj &cmdObj, - const intrusive_ptr<ExpressionContext> &pCtx) { + intrusive_ptr<Pipeline> Pipeline::parseCommand(string &errmsg, + BSONObj &cmdObj, + const intrusive_ptr<ExpressionContext> &pCtx) { intrusive_ptr<Pipeline> pPipeline(new Pipeline(pCtx)); vector<BSONElement> pipeline; @@ -199,28 +201,19 @@ namespace mongo { } } - /* if there aren't any pipeline stages, there's nothing more to do */ - if (sources.empty()) - return pPipeline; - - // NOTE the rest of this function is all optimizations. - // TODO find a clean way to break these up into separate functions. - - /* - Move filters up where possible. + // The order in which optimizations are applied can have significant impact on the + // efficiency of the final pipeline. Be Careful! + Optimizations::Local::moveMatchBeforeSort(pPipeline.get()); + Optimizations::Local::moveLimitBeforeSkip(pPipeline.get()); + Optimizations::Local::coalesceAdjacent(pPipeline.get()); + Optimizations::Local::optimizeEachDocumentSource(pPipeline.get()); + Optimizations::Local::duplicateMatchBeforeInitalRedact(pPipeline.get()); - CW TODO -- move filter past projections where possible, and noting - corresponding field renaming. - */ - - /* - Wherever there is a match immediately following a sort, swap them. - This means we sort fewer items. Neither changes the documents in - the stream, so this transformation shouldn't affect the result. + return pPipeline; + } - We do this first, because then when we coalesce operators below, - any adjacent matches will be combined. - */ + void Pipeline::Optimizations::Local::moveMatchBeforeSort(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; for (size_t srcn = sources.size(), srci = 1; srci < srcn; ++srci) { intrusive_ptr<DocumentSource> &pSource = sources[srci]; if (dynamic_cast<DocumentSourceMatch *>(pSource.get())) { @@ -233,11 +226,13 @@ namespace mongo { } } } + } + + void Pipeline::Optimizations::Local::moveLimitBeforeSkip(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; + if (sources.empty()) + return; - /* Move limits in front of skips. This is more optimal for sharding - * since currently, we can only split the pipeline at a single source - * and it is better to limit the results coming from each shard - */ for(int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) { DocumentSourceLimit* limit = dynamic_cast<DocumentSourceLimit*>(sources[i].get()); @@ -262,56 +257,44 @@ namespace mongo { i = sources.size(); // decremented before next pass } } + } - /* - Coalesce adjacent filters where possible. Two adjacent filters - are equivalent to one filter whose predicate is the conjunction of - the two original filters' predicates. For now, capture this by - giving any DocumentSource the option to absorb it's successor; this - will also allow adjacent projections to coalesce when possible. - - Run through the DocumentSources, and give each one the opportunity - to coalesce with its successor. If successful, remove the - successor. + void Pipeline::Optimizations::Local::coalesceAdjacent(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; + if (sources.empty()) + return; - Move all document sources to a temporary list. - */ + // move all sources to a temporary list SourceContainer tempSources; sources.swap(tempSources); - /* move the first one to the final list */ + // move the first one to the final list sources.push_back(tempSources[0]); - /* run through the sources, coalescing them or keeping them */ + // run through the sources, coalescing them or keeping them for (size_t tempn = tempSources.size(), tempi = 1; tempi < tempn; ++tempi) { - /* - If we can't coalesce the source with the last, then move it - to the final list, and make it the new last. (If we succeeded, - then we're still on the same last, and there's no need to move - or do anything with the source -- the destruction of tempSources - will take care of the rest.) - */ + // If we can't coalesce the source with the last, then move it + // to the final list, and make it the new last. (If we succeeded, + // then we're still on the same last, and there's no need to move + // or do anything with the source -- the destruction of tempSources + // will take care of the rest.) intrusive_ptr<DocumentSource> &pLastSource = sources.back(); intrusive_ptr<DocumentSource> &pTemp = tempSources[tempi]; verify(pTemp && pLastSource); if (!pLastSource->coalesce(pTemp)) sources.push_back(pTemp); } + } - /* optimize the elements in the pipeline */ - for(SourceContainer::iterator iter(sources.begin()), - listEnd(sources.end()); - iter != listEnd; - ++iter) { - if (!*iter) { - errmsg = "Pipeline received empty document as argument"; - return intrusive_ptr<Pipeline>(); - } - - (*iter)->optimize(); + void Pipeline::Optimizations::Local::optimizeEachDocumentSource(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; + for (SourceContainer::iterator it(sources.begin()); it != sources.end(); ++it) { + (*it)->optimize(); } + } - // Optimize [$redact, $match] to [$match, $redact, $match] if possible + void Pipeline::Optimizations::Local::duplicateMatchBeforeInitalRedact(Pipeline* pipeline) { + SourceContainer& sources = pipeline->sources; if (sources.size() >= 2 && dynamic_cast<DocumentSourceRedact*>(sources[0].get())) { if (DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(sources[1].get())) { const BSONObj redactSafePortion = match->redactSafePortion(); @@ -319,12 +302,10 @@ namespace mongo { sources.push_front( DocumentSourceMatch::createFromBson( BSON("$match" << redactSafePortion).firstElement(), - pCtx)); + pipeline->pCtx)); } } } - - return pPipeline; } void Pipeline::addRequiredPrivileges(Command* commandTemplate, @@ -374,39 +355,43 @@ namespace mongo { } intrusive_ptr<Pipeline> Pipeline::splitForSharded() { - /* create an initialize the shard spec we'll return */ - intrusive_ptr<Pipeline> pShardPipeline(new Pipeline(pCtx)); - pShardPipeline->explain = explain; + // Create and initialize the shard spec we'll return. We start with an empty pipeline on the + // shards and all work being done in the merger. Optimizations can move operations between + // the pipelines to be more efficient. + intrusive_ptr<Pipeline> shardPipeline(new Pipeline(pCtx)); + shardPipeline->explain = explain; - /* - Run through the pipeline, looking for points to split it into - shard pipelines, and the rest. - */ - while (!sources.empty()) { - // pop the first source - intrusive_ptr<DocumentSource> pSource = sources.front(); - sources.pop_front(); + // The order in which optimizations are applied can have significant impact on the + // efficiency of the final pipeline. Be Careful! + Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this); + + return shardPipeline; + } + + void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe, + Pipeline* mergePipe) { + while (!mergePipe->sources.empty()) { + intrusive_ptr<DocumentSource> current = mergePipe->sources.front(); + mergePipe->sources.pop_front(); // Check if this source is splittable SplittableDocumentSource* splittable = - dynamic_cast<SplittableDocumentSource*>(pSource.get()); + dynamic_cast<SplittableDocumentSource*>(current.get()); if (!splittable){ // move the source from the merger sources to the shard sources - pShardPipeline->sources.push_back(pSource); + shardPipe->sources.push_back(current); } else { // split this source into Merge and Shard sources intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource(); intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource(); - if (shardSource) pShardPipeline->sources.push_back(shardSource); - if (mergeSource) this->sources.push_front(mergeSource); + if (shardSource) shardPipe->sources.push_back(shardSource); + if (mergeSource) mergePipe->sources.push_front(mergeSource); break; } } - - return pShardPipeline; } BSONObj Pipeline::getInitialQuery() const { |