summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2013-10-25 15:46:22 -0400
committerMathias Stearn <mathias@10gen.com>2014-01-21 12:55:49 -0500
commitc9fc8a468e1fa9d6421ef35f5a23db0e0f014b4f (patch)
tree4eb8943cc3c29091db59d0d8c7b72ee43532fcff /src/mongo/db/pipeline
parentd0037946dc103ffa648f7e8937f2c55351b03c53 (diff)
downloadmongo-c9fc8a468e1fa9d6421ef35f5a23db0e0f014b4f.tar.gz
SERVER-8033 Sharded agg should only send needed fields from shards to mongos
*** BEFORE *** > db.foo.aggregate([{$sort: {score:1}}, {$limit: 10}, {$project: {_id: 1}}], {explain:true}) { "splitPipeline" : { "shardsPart" : [ { "$sort" : { "sortKey" : { "score" : 1 }, "limit" : NumberLong(10) } } ], "mergerPart" : [ { "$sort" : { "sortKey" : { "score" : 1 }, "mergePresorted" : true, "limit" : NumberLong(10) } }, { "$project" : { "_id" : true } } ] }, "shards" : { "shard0000" : { "host" : "localhost:30001", "stages" : [ { "$cursor" : { "query" : { }, "indexOnly" : false, "cursorType" : "BasicCursor" } }, { "$sort" : { "sortKey" : { "score" : 1 }, "limit" : NumberLong(10) } } ] } }, "ok" : 1 } *** AFTER *** > db.foo.aggregate([{$sort: {score:1}}, {$limit: 10}, {$project: {_id: 1}}], {explain:true}) { "splitPipeline" : { "shardsPart" : [ { "$sort" : { "sortKey" : { "score" : 1 }, "limit" : NumberLong(10) } }, { "$project" : { "score" : true, "_id" : true } } ], "mergerPart" : [ { "$sort" : { "sortKey" : { "score" : 1 }, "mergePresorted" : true, "limit" : NumberLong(10) } }, { "$project" : { "_id" : true } } ] }, "shards" : { "shard0000" : { "host" : "localhost:30001", "stages" : [ { "$cursor" : { "query" : { }, "fields" : { "score" : 1, "_id" : 1 }, "indexOnly" : false, "cursorType" : "BasicCursor" } }, { "$sort" : { "sortKey" : { "score" : 1 }, "limit" : NumberLong(10) } }, { "$project" : { "score" : true, "_id" : true } } ] } }, "ok" : 1 }
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_project.cpp2
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp76
-rw-r--r--src/mongo/db/pipeline/pipeline.h16
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp40
-rw-r--r--src/mongo/db/pipeline/pipeline_optimizations.h8
5 files changed, 95 insertions, 47 deletions
diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp
index 8ef3e1c6433..fdf9acc5b16 100644
--- a/src/mongo/db/pipeline/document_source_project.cpp
+++ b/src/mongo/db/pipeline/document_source_project.cpp
@@ -127,7 +127,7 @@ namespace mongo {
pProject->_variables.reset(new Variables(idGenerator.getIdCount()));
BSONObj projectObj = elem.Obj();
- pProject->_raw = projectObj.getOwned(); // probably not necessary, but better to be safe
+ pProject->_raw = projectObj.getOwned();
#if defined(_DEBUG)
if (exprObj->isSimple()) {
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 1a9f2ec7270..5daa8b3df3b 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -359,6 +359,7 @@ namespace mongo {
// efficiency of the final pipeline. Be Careful!
Optimizations::Sharded::findSplitPoint(shardPipeline.get(), this);
Optimizations::Sharded::moveFinalUnwindFromShardsToMerger(shardPipeline.get(), this);
+ Optimizations::Sharded::limitFieldsSentFromShardsToMerger(shardPipeline.get(), this);
return shardPipeline;
}
@@ -398,6 +399,42 @@ namespace mongo {
}
}
+ void Pipeline::Optimizations::Sharded::limitFieldsSentFromShardsToMerger(Pipeline* shardPipe,
+ Pipeline* mergePipe) {
+ DepsTracker mergeDeps = mergePipe->getDependencies(shardPipe->getInitialQuery());
+ if (mergeDeps.needWholeDocument)
+ return; // the merge needs all fields, so nothing we can do.
+
+ // Empty project is "special" so if no fields are needed, we just ask for _id instead.
+ if (mergeDeps.fields.empty())
+ mergeDeps.fields.insert("_id");
+
+ // Remove metadata from dependencies since it automatically flows through projection and we
+ // don't want to project it in to the document.
+ mergeDeps.needTextScore = false;
+
+ // HEURISTIC: only apply optimization if none of the shard stages have an exhaustive list of
+ // field dependencies. While this may not be 100% ideal in all cases, it is simple and
+ // avoids the worst cases by ensuring that:
+ // 1) Optimization IS applied when the shards wouldn't have known their exhaustive list of
+ // dependencies. This situation can happen when a $sort is before the first $project or
+ // $group. Without the optimization, the shards would have to reify and transmit full
+ // objects even though only a subset of fields are needed.
+ // 2) Optimization IS NOT applied immediately following a $project or $group since it would
+ // add an unnecessary project (and therefore a deep-copy).
+ for (size_t i = 0; i < shardPipe->sources.size(); i++) {
+ DepsTracker dt; // ignored
+ if (shardPipe->sources[i]->getDependencies(&dt) & DocumentSource::EXHAUSTIVE_FIELDS)
+ return;
+ }
+
+ // if we get here, add the project.
+ shardPipe->sources.push_back(
+ DocumentSourceProject::createFromBson(
+ BSON("$project" << mergeDeps.toProjection()).firstElement(),
+ shardPipe->pCtx));
+ }
+
BSONObj Pipeline::getInitialQuery() const {
if (sources.empty())
return BSONObj();
@@ -502,4 +539,43 @@ namespace mongo {
return true;
}
+ DepsTracker Pipeline::getDependencies(const BSONObj& initialQuery) const {
+ DepsTracker deps;
+ bool knowAllFields = false;
+ bool knowAllMeta = false;
+ for (size_t i=0; i < sources.size() && !(knowAllFields && knowAllMeta); i++) {
+ DepsTracker localDeps;
+ DocumentSource::GetDepsReturn status = sources[i]->getDependencies(&localDeps);
+
+ if (status == DocumentSource::NOT_SUPPORTED) {
+ // Assume this stage needs everything. We may still know something about our
+ // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or
+ // EXHAUSTIVE_META.
+ break;
+ }
+
+ if (!knowAllFields) {
+ deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end());
+ if (localDeps.needWholeDocument)
+ deps.needWholeDocument = true;
+ knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS;
+ }
+
+ if (!knowAllMeta) {
+ if (localDeps.needTextScore)
+ deps.needTextScore = true;
+
+ knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
+ }
+ }
+
+ if (!knowAllFields)
+ deps.needWholeDocument = true; // don't know all fields we need
+
+ // If doing a text query, assume we need the score if we can't prove we don't.
+ if (!knowAllMeta && DocumentSourceMatch::isTextQuery(initialQuery))
+ deps.needTextScore = true;
+
+ return deps;
+ }
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 4cd506975c2..7404239b70c 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -30,8 +30,6 @@
#include <deque>
-#include "mongo/pch.h"
-
#include "mongo/db/pipeline/value.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/timer.h"
@@ -39,14 +37,10 @@
namespace mongo {
class BSONObj;
class BSONObjBuilder;
- class BSONArrayBuilder;
class Command;
class DocumentSource;
- class DocumentSourceProject;
- class Expression;
+ class DepsTracker;
struct ExpressionContext;
- class ExpressionNary;
- struct OpDesc; // local private struct
class Privilege;
/** mongodb "commands" (sent via db.$cmd.findOne(...))
@@ -132,6 +126,14 @@ namespace mongo {
* explain flag true (for DocumentSource::serializeToArray()).
*/
vector<Value> writeExplainOps() const;
+
+ /**
+ * Returns the dependencies needed by this pipeline.
+ *
+ * initialQuery is used as a fallback for metadata dependency detection. The assumption is
+ * that any metadata produced by the query is needed unless we can prove it isn't.
+ */
+ DepsTracker getDependencies(const BSONObj& initialQuery) const;
/**
The aggregation command name.
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 20fabe2a4e7..de595ebaf28 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -100,46 +100,8 @@ namespace {
sources.pop_front();
}
-
-
// Find the set of fields in the source documents depended on by this pipeline.
- DepsTracker deps; // should be considered const after the following block.
- {
- bool knowAllFields = false;
- bool knowAllMeta = false;
- for (size_t i=0; i < sources.size() && !(knowAllFields && knowAllMeta); i++) {
- DepsTracker localDeps;
- DocumentSource::GetDepsReturn status = sources[i]->getDependencies(&localDeps);
-
- if (status == DocumentSource::NOT_SUPPORTED) {
- // Assume this stage needs everything. We may still know something about our
- // dependencies if an earlier stage returned either EXHAUSTIVE_FIELDS or
- // EXHAUSTIVE_META.
- break;
- }
-
- if (!knowAllFields) {
- deps.fields.insert(localDeps.fields.begin(), localDeps.fields.end());
- if (localDeps.needWholeDocument)
- deps.needWholeDocument = true;
- knowAllFields = status & DocumentSource::EXHAUSTIVE_FIELDS;
- }
-
- if (!knowAllMeta) {
- if (localDeps.needTextScore)
- deps.needTextScore = true;
-
- knowAllMeta = status & DocumentSource::EXHAUSTIVE_META;
- }
- }
-
- if (!knowAllFields)
- deps.needWholeDocument = true; // don't know all fields we need
-
- // If doing a text query, assume we need the score if we can't prove we don't.
- if (!knowAllMeta && DocumentSourceMatch::isTextQuery(queryObj))
- deps.needTextScore = true;
- }
+ const DepsTracker deps = pPipeline->getDependencies(queryObj);
// Passing query an empty projection since it is faster to use ParsedDeps::extractFields().
// This will need to change to support covering indexes (SERVER-12015). There is an
diff --git a/src/mongo/db/pipeline/pipeline_optimizations.h b/src/mongo/db/pipeline/pipeline_optimizations.h
index fe75ba9655a..d8b89350c28 100644
--- a/src/mongo/db/pipeline/pipeline_optimizations.h
+++ b/src/mongo/db/pipeline/pipeline_optimizations.h
@@ -112,5 +112,13 @@ namespace mongo {
* unwind.
*/
static void moveFinalUnwindFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe);
+
+ /**
+ * Adds a stage to the end of shardPipe explicitly requesting all fields that mergePipe
+ * needs. This is only done if it heuristically determines that it is needed. This
+ * optimization can reduce the amount of network traffic and can also enable the shards to
+ * convert less source BSON into Documents.
+ */
+ static void limitFieldsSentFromShardsToMerger(Pipeline* shardPipe, Pipeline* mergePipe);
};
} // namespace mongo