diff options
author | Mathias Stearn <mathias@10gen.com> | 2013-10-25 15:46:22 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2014-01-21 12:55:49 -0500 |
commit | c9fc8a468e1fa9d6421ef35f5a23db0e0f014b4f (patch) | |
tree | 4eb8943cc3c29091db59d0d8c7b72ee43532fcff /src/mongo/db/pipeline | |
parent | d0037946dc103ffa648f7e8937f2c55351b03c53 (diff) | |
download | mongo-c9fc8a468e1fa9d6421ef35f5a23db0e0f014b4f.tar.gz |
SERVER-8033 Sharded agg should only send needed fields from shards to mongos
*** BEFORE ***
> db.foo.aggregate([{$sort: {score:1}}, {$limit: 10}, {$project: {_id: 1}}], {explain:true})
{
"splitPipeline" : {
"shardsPart" : [
{
"$sort" : {
"sortKey" : {
"score" : 1
},
"limit" : NumberLong(10)
}
}
],
"mergerPart" : [
{
"$sort" : {
"sortKey" : {
"score" : 1
},
"mergePresorted" : true,
"limit" : NumberLong(10)
}
},
{
"$project" : {
"_id" : true
}
}
]
},
"shards" : {
"shard0000" : {
"host" : "localhost:30001",
"stages" : [
{
"$cursor" : {
"query" : {
},
"indexOnly" : false,
"cursorType" : "BasicCursor"
}
},
{
"$sort" : {
"sortKey" : {
"score" : 1
},
"limit" : NumberLong(10)
}
}
]
}
},
"ok" : 1
}
*** AFTER ***
> db.foo.aggregate([{$sort: {score:1}}, {$limit: 10}, {$project: {_id: 1}}], {explain:true})
{
"splitPipeline" : {
"shardsPart" : [
{
"$sort" : {
"sortKey" : {
"score" : 1
},
"limit" : NumberLong(10)
}
},
{
"$project" : {
"score" : true,
"_id" : true
}
}
],
"mergerPart" : [
{
"$sort" : {
"sortKey" : {
"score" : 1
},
"mergePresorted" : true,
"limit" : NumberLong(10)
}
},
{
"$project" : {
"_id" : true
}
}
]
},
"shards" : {
"shard0000" : {
"host" : "localhost:30001",
"stages" : [
{
"$cursor" : {
"query" : {
},
"fields" : {
"score" : 1,
"_id" : 1
},
"indexOnly" : false,
"cursorType" : "BasicCursor"
}
},
{
"$sort" : {
"sortKey" : {
"score" : 1
},
"limit" : NumberLong(10)
}
},
{
"$project" : {
"score" : true,
"_id" : true
}
}
]
}
},
"ok" : 1
}
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r-- | src/mongo/db/pipeline/document_source_project.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_optimizations.h | 8 |
5 files changed, 95 insertions, 47 deletions
diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp index 8ef3e1c6433..fdf9acc5b16 100644 --- a/src/mongo/db/pipeline/document_source_project.cpp +++ b/src/mongo/db/pipeline/document_source_project.cpp @@ -127,7 +127,7 @@ namespace mongo { pProject->_variables.reset(new Variables(idGenerator.getIdCount())); BSONObj projectObj = elem.Obj(); - pProject->_raw = projectObj.getOwned(); // probably not necessary, but better to be safe + pProject->_raw = projectObj.getOwned(); #if defined(_DEBUG) if (exprObj->isSimple()) { diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 1a9f2ec7270..5daa8b3df3b 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -359,6 +359,7 @@ namespace mongo { // efficiency of the final pipeline. Be Careful! Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this); Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this); + Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this); return shardPipeline; } @@ -398,6 +399,42 @@ namespace mongo { } } + void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, + Pipeline* mergePipe) { + DepsTracker mergeDeps = mergePipe->getDependencies(shardPipe->getInitialQuery()); + if (mergeDeps.needWholeDocument) + return; // the merge needs all fields, so nothing we can do. + + // Empty project is "special" so if no fields are needed, we just ask for _id instead. + if (mergeDeps.fields.empty()) + mergeDeps.fields.insert("_id"); + + // Remove metadata from dependencies since it automatically flows through projection and we + // don't want to project it in to the document. + mergeDeps.needTextScore = false; + + // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of + // field dependencies. While this may not be 100% ideal in all cases, it is simple and + // avoids the worst cases by ensuring that: + // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of + // dependencies. This situation can happen when a $sort is before the first $project or + // $group. Without the optimization, the shards would have to reify and transmit full + // objects even though only a subset of fields are needed. + // 2) Optimization IS NOT applied immediately following a $project or $group since it would + // add an unnecessary project (and therefore a deep-copy). + for (size_t i = 0; i < shardPipe->sources.size(); i++) { + DepsTracker dt; // ignored + if (shardPipe->sources[i]->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS) + return; + } + + // if we get here, add the project. + shardPipe->sources.push_back( + DocumentSourceProject::createFromBson( + BSON("$project" << mergeDeps.toProjection()).firstElement(), + shardPipe->pCtx)); + } + BSONObj Pipeline::getInitialQuery() const { if (sources.empty()) return BSONObj(); @@ -502,4 +539,43 @@ namespace mongo { return true; } + DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const { + DepsTracker deps; + bool knowAllFields = false; + bool knowAllMeta = false; + for (size_t i=0; i < sources.size() && !(knowAllFields && knowAllMeta); i++) { + DepsTracker localDeps; + DocumentSource::GetDepsReturn status = sources[i]->getDependencies(&localDeps); + + if (status == DocumentSource::NOT_SUPPORTED) { + // Assume this stage needs everything. We may still know something about our + // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or + // EXHAUSTIVE_META. + break; + } + + if (!knowAllFields) { + deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end()); + if (localDeps.needWholeDocument) + deps.needWholeDocument = true; + knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS; + } + + if (!knowAllMeta) { + if (localDeps.needTextScore) + deps.needTextScore = true; + + knowAllMeta = status & DocumentSource::EXHAUSTIVE_META; + } + } + + if (!knowAllFields) + deps.needWholeDocument = true; // don't know all fields we need + + // If doing a text query, assume we need the score if we can't prove we don't. + if (!knowAllMeta && DocumentSourceMatch::isTextQuery(initialQuery)) + deps.needTextScore = true; + + return deps; + } } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index 4cd506975c2..7404239b70c 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -30,8 +30,6 @@ #include <deque> -#include "mongo/pch.h" - #include "mongo/db/pipeline/value.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/timer.h" @@ -39,14 +37,10 @@ namespace mongo { class BSONObj; class BSONObjBuilder; - class BSONArrayBuilder; class Command; class DocumentSource; - class DocumentSourceProject; - class Expression; + class DepsTracker; struct ExpressionContext; - class ExpressionNary; - struct OpDesc; // local private struct class Privilege; /** mongodb "commands" (sent via db.$cmd.findOne(...)) @@ -132,6 +126,14 @@ namespace mongo { * explain flag true (for DocumentSource::serializeToArray()). */ vector<Value> writeExplainOps() const; + + /** + * Returns the dependencies needed by this pipeline. + * + * initialQuery is used as a fallback for metadata dependency detection. The assumption is + * that any metadata produced by the query is needed unless we can prove it isn't. + */ + DepsTracker getDependencies(const BSONObj& initialQuery) const; /** The aggregation command name. diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 20fabe2a4e7..de595ebaf28 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -100,46 +100,8 @@ namespace { sources.pop_front(); } - - // Find the set of fields in the source documents depended on by this pipeline. - DepsTracker deps; // should be considered const after the following block. - { - bool knowAllFields = false; - bool knowAllMeta = false; - for (size_t i=0; i < sources.size() && !(knowAllFields && knowAllMeta); i++) { - DepsTracker localDeps; - DocumentSource::GetDepsReturn status = sources[i]->getDependencies(&localDeps); - - if (status == DocumentSource::NOT_SUPPORTED) { - // Assume this stage needs everything. We may still know something about our - // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or - // EXHAUSTIVE_META. - break; - } - - if (!knowAllFields) { - deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end()); - if (localDeps.needWholeDocument) - deps.needWholeDocument = true; - knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS; - } - - if (!knowAllMeta) { - if (localDeps.needTextScore) - deps.needTextScore = true; - - knowAllMeta = status & DocumentSource::EXHAUSTIVE_META; - } - } - - if (!knowAllFields) - deps.needWholeDocument = true; // don't know all fields we need - - // If doing a text query, assume we need the score if we can't prove we don't. - if (!knowAllMeta && DocumentSourceMatch::isTextQuery(queryObj)) - deps.needTextScore = true; - } + const DepsTracker deps = pPipeline->getDependencies(queryObj); // Passing query an empty projection since it is faster to use ParsedDeps::extractFields(). // This will need to change to support covering indexes (SERVER-12015). There is an diff --git a/src/mongo/db/pipeline/pipeline_optimizations.h b/src/mongo/db/pipeline/pipeline_optimizations.h index fe75ba9655a..d8b89350c28 100644 --- a/src/mongo/db/pipeline/pipeline_optimizations.h +++ b/src/mongo/db/pipeline/pipeline_optimizations.h @@ -112,5 +112,13 @@ namespace mongo { * unwind. */ static void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe); + + /** + * Adds a stage to the end of shardPipe explicitly requesting all fields that mergePipe + * needs. This is only done if it heuristically determines that it is needed. This + * optimization can reduce the amount of network traffic and can also enable the shards to + * convert less source BSON into Documents. + */ + static void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe); }; } // namespace mongo |