summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline.cpp
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2013-10-16 15:01:30 -0400
committerMathias Stearn <mathias@10gen.com>2013-10-23 19:28:02 -0400
commit18634c0b8798ddfd98d1370e038f82f8b372db0d (patch)
tree23ae13ccd175debd808dec4d5664bf55cf137193 /src/mongo/db/pipeline/pipeline.cpp
parent007665da8b3c2c622ab5fe112f64b7d475bb3220 (diff)
downloadmongo-18634c0b8798ddfd98d1370e038f82f8b372db0d.tar.gz
Split up Pipeline optimizations into separate functions
Diffstat (limited to 'src/mongo/db/pipeline/pipeline.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp145
1 files changed, 65 insertions, 80 deletions
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 7e4f80a34b1..b82f970c3e2 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -28,7 +28,9 @@
#include "mongo/pch.h"
+// This file defines functions from both of these headers
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/pipeline/pipeline_optimizations.h"
#include "mongo/db/auth/action_set.h"
#include "mongo/db/auth/privilege.h"
@@ -95,9 +97,9 @@ namespace mongo {
((const StageDesc *)pR)->pName);
}
- intrusive_ptr<Pipeline> Pipeline::parseCommand(
- string &errmsg, BSONObj &cmdObj,
- const intrusive_ptr<ExpressionContext> &pCtx) {
+ intrusive_ptr<Pipeline> Pipeline::parseCommand(string &errmsg,
+ BSONObj &cmdObj,
+ const intrusive_ptr<ExpressionContext> &pCtx) {
intrusive_ptr<Pipeline> pPipeline(new Pipeline(pCtx));
vector<BSONElement> pipeline;
@@ -199,28 +201,19 @@ namespace mongo {
}
}
- /* if there aren't any pipeline stages, there's nothing more to do */
- if (sources.empty())
- return pPipeline;
-
- // NOTE the rest of this function is all optimizations.
- // TODO find a clean way to break these up into separate functions.
-
- /*
- Move filters up where possible.
+ // The order in which optimizations are applied can have significant impact on the
+ // efficiency of the final pipeline. Be Careful!
+ Optimizations::Local::moveMatchBeforeSort(pPipeline.get());
+ Optimizations::Local::moveLimitBeforeSkip(pPipeline.get());
+ Optimizations::Local::coalesceAdjacent(pPipeline.get());
+ Optimizations::Local::optimizeEachDocumentSource(pPipeline.get());
+ Optimizations::Local::duplicateMatchBeforeInitalRedact(pPipeline.get());
- CW TODO -- move filter past projections where possible, and noting
- corresponding field renaming.
- */
-
- /*
- Wherever there is a match immediately following a sort, swap them.
- This means we sort fewer items. Neither changes the documents in
- the stream, so this transformation shouldn't affect the result.
+ return pPipeline;
+ }
- We do this first, because then when we coalesce operators below,
- any adjacent matches will be combined.
- */
+ void Pipeline::Optimizations::Local::moveMatchBeforeSort(Pipeline* pipeline) {
+ SourceContainer& sources = pipeline->sources;
for (size_t srcn = sources.size(), srci = 1; srci < srcn; ++srci) {
intrusive_ptr<DocumentSource> &pSource = sources[srci];
if (dynamic_cast<DocumentSourceMatch *>(pSource.get())) {
@@ -233,11 +226,13 @@ namespace mongo {
}
}
}
+ }
+
+ void Pipeline::Optimizations::Local::moveLimitBeforeSkip(Pipeline* pipeline) {
+ SourceContainer& sources = pipeline->sources;
+ if (sources.empty())
+ return;
- /* Move limits in front of skips. This is more optimal for sharding
- * since currently, we can only split the pipeline at a single source
- * and it is better to limit the results coming from each shard
- */
for(int i = sources.size() - 1; i >= 1 /* not looking at 0 */; i--) {
DocumentSourceLimit* limit =
dynamic_cast<DocumentSourceLimit*>(sources[i].get());
@@ -262,56 +257,44 @@ namespace mongo {
i = sources.size(); // decremented before next pass
}
}
+ }
- /*
- Coalesce adjacent filters where possible. Two adjacent filters
- are equivalent to one filter whose predicate is the conjunction of
- the two original filters' predicates. For now, capture this by
- giving any DocumentSource the option to absorb it's successor; this
- will also allow adjacent projections to coalesce when possible.
-
- Run through the DocumentSources, and give each one the opportunity
- to coalesce with its successor. If successful, remove the
- successor.
+ void Pipeline::Optimizations::Local::coalesceAdjacent(Pipeline* pipeline) {
+ SourceContainer& sources = pipeline->sources;
+ if (sources.empty())
+ return;
- Move all document sources to a temporary list.
- */
+ // move all sources to a temporary list
SourceContainer tempSources;
sources.swap(tempSources);
- /* move the first one to the final list */
+ // move the first one to the final list
sources.push_back(tempSources[0]);
- /* run through the sources, coalescing them or keeping them */
+ // run through the sources, coalescing them or keeping them
for (size_t tempn = tempSources.size(), tempi = 1; tempi < tempn; ++tempi) {
- /*
- If we can't coalesce the source with the last, then move it
- to the final list, and make it the new last. (If we succeeded,
- then we're still on the same last, and there's no need to move
- or do anything with the source -- the destruction of tempSources
- will take care of the rest.)
- */
+ // If we can't coalesce the source with the last, then move it
+ // to the final list, and make it the new last. (If we succeeded,
+ // then we're still on the same last, and there's no need to move
+ // or do anything with the source -- the destruction of tempSources
+ // will take care of the rest.)
intrusive_ptr<DocumentSource> &pLastSource = sources.back();
intrusive_ptr<DocumentSource> &pTemp = tempSources[tempi];
verify(pTemp && pLastSource);
if (!pLastSource->coalesce(pTemp))
sources.push_back(pTemp);
}
+ }
- /* optimize the elements in the pipeline */
- for(SourceContainer::iterator iter(sources.begin()),
- listEnd(sources.end());
- iter != listEnd;
- ++iter) {
- if (!*iter) {
- errmsg = "Pipeline received empty document as argument";
- return intrusive_ptr<Pipeline>();
- }
-
- (*iter)->optimize();
+ void Pipeline::Optimizations::Local::optimizeEachDocumentSource(Pipeline* pipeline) {
+ SourceContainer& sources = pipeline->sources;
+ for (SourceContainer::iterator it(sources.begin()); it != sources.end(); ++it) {
+ (*it)->optimize();
}
+ }
- // Optimize [$redact, $match] to [$match, $redact, $match] if possible
+ void Pipeline::Optimizations::Local::duplicateMatchBeforeInitalRedact(Pipeline* pipeline) {
+ SourceContainer& sources = pipeline->sources;
if (sources.size() >= 2 && dynamic_cast<DocumentSourceRedact*>(sources[0].get())) {
if (DocumentSourceMatch* match = dynamic_cast<DocumentSourceMatch*>(sources[1].get())) {
const BSONObj redactSafePortion = match->redactSafePortion();
@@ -319,12 +302,10 @@ namespace mongo {
sources.push_front(
DocumentSourceMatch::createFromBson(
BSON("$match" << redactSafePortion).firstElement(),
- pCtx));
+ pipeline->pCtx));
}
}
}
-
- return pPipeline;
}
void Pipeline::addRequiredPrivileges(Command* commandTemplate,
@@ -374,39 +355,43 @@ namespace mongo {
}
intrusive_ptr<Pipeline> Pipeline::splitForSharded() {
- /* create an initialize the shard spec we'll return */
- intrusive_ptr<Pipeline> pShardPipeline(new Pipeline(pCtx));
- pShardPipeline->explain = explain;
+ // Create and initialize the shard spec we'll return. We start with an empty pipeline on the
+ // shards and all work being done in the merger. Optimizations can move operations between
+ // the pipelines to be more efficient.
+ intrusive_ptr<Pipeline> shardPipeline(new Pipeline(pCtx));
+ shardPipeline->explain = explain;
- /*
- Run through the pipeline, looking for points to split it into
- shard pipelines, and the rest.
- */
- while (!sources.empty()) {
- // pop the first source
- intrusive_ptr<DocumentSource> pSource = sources.front();
- sources.pop_front();
+ // The order in which optimizations are applied can have significant impact on the
+ // efficiency of the final pipeline. Be Careful!
+ Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
+
+ return shardPipeline;
+ }
+
+ void Pipeline::Optimizations::Sharded::findSplitPoint(Pipeline* shardPipe,
+ Pipeline* mergePipe) {
+ while (!mergePipe->sources.empty()) {
+ intrusive_ptr<DocumentSource> current = mergePipe->sources.front();
+ mergePipe->sources.pop_front();
// Check if this source is splittable
SplittableDocumentSource* splittable =
- dynamic_cast<SplittableDocumentSource*>(pSource.get());
+ dynamic_cast<SplittableDocumentSource*>(current.get());
if (!splittable){
// move the source from the merger sources to the shard sources
- pShardPipeline->sources.push_back(pSource);
+ shardPipe->sources.push_back(current);
}
else {
// split this source into Merge and Shard sources
intrusive_ptr<DocumentSource> shardSource = splittable->getShardSource();
intrusive_ptr<DocumentSource> mergeSource = splittable->getMergeSource();
- if (shardSource) pShardPipeline->sources.push_back(shardSource);
- if (mergeSource) this->sources.push_front(mergeSource);
+ if (shardSource) shardPipe->sources.push_back(shardSource);
+ if (mergeSource) mergePipe->sources.push_front(mergeSource);
break;
}
}
-
- return pShardPipeline;
}
BSONObj Pipeline::getInitialQuery() const {