summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2012-10-18 17:21:45 -0400
committerEric Milkie <milkie@10gen.com>2012-11-06 15:45:44 -0500
commitf4270abac0d8ca752216e11256f93eb176c6ed8c (patch)
tree47b518d6ae9840fa250429fa382a27aa500037ab
parentf43972dd586475132045bb11a96baac168f5b8cd (diff)
downloadmongo-f4270abac0d8ca752216e11256f93eb176c6ed8c.tar.gz
SERVER-7408 Correctly handle $skip and $limit in sharded agg
This bug only comes up if the first $skip or $limit precedes the first $sort or $limit. This is very rare, but should still be handled correctly.
-rw-r--r--jstests/aggregation/testshard1.js22
-rwxr-xr-xsrc/mongo/db/pipeline/document_source.h20
-rw-r--r--src/mongo/db/pipeline/document_source_limit.cpp5
-rw-r--r--src/mongo/db/pipeline/document_source_skip.cpp5
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp29
5 files changed, 73 insertions, 8 deletions
diff --git a/jstests/aggregation/testshard1.js b/jstests/aggregation/testshard1.js
index 3bd0ff5f1bf..8d5d4f376c4 100644
--- a/jstests/aggregation/testshard1.js
+++ b/jstests/aggregation/testshard1.js
@@ -131,5 +131,27 @@ for(i = 0; i < 6; ++i) {
'agg sharded test simple match failed');
}
+function testSkipLimit(ops, expectedCount) {
+ if (expectedCount > 10) {
+ // make shard -> mongos intermediate results less than 16MB
+ ops.unshift({$project: {_id:1}})
+ }
+
+ ops.push({$group: {_id:1, count: {$sum: 1}}});
+
+ var out = db.runCommand({aggregate:"ts1", pipeline:ops});
+ assert.commandWorked(out);
+ assert.eq(out.result[0].count, expectedCount);
+}
+
+testSkipLimit([], nItems); // control
+testSkipLimit([{$skip:10}], nItems - 10);
+testSkipLimit([{$limit:10}], 10);
+testSkipLimit([{$skip:5}, {$limit:10}], 10);
+testSkipLimit([{$limit:10}, {$skip:5}], 10 - 5);
+testSkipLimit([{$skip:5}, {$skip: 3}, {$limit:10}], 10);
+testSkipLimit([{$skip:5}, {$limit:10}, {$skip: 3}], 10 - 3);
+testSkipLimit([{$limit:10}, {$skip:5}, {$skip: 3}], 10 - 3 - 5);
+
// shut everything down
shardedAggTest.stop();
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 329564ac490..e1e2cffcf96 100755
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -965,7 +965,7 @@ namespace mongo {
class DocumentSourceLimit :
- public DocumentSource {
+ public SplittableDocumentSource {
public:
// virtuals from DocumentSource
virtual ~DocumentSourceLimit();
@@ -988,6 +988,14 @@ namespace mongo {
static intrusive_ptr<DocumentSourceLimit> create(
const intrusive_ptr<ExpressionContext> &pExpCtx);
+ // Virtuals for SplittableDocumentSource
+ // Need to run on rounter. Running on shard as well is an optimization.
+ virtual intrusive_ptr<DocumentSource> getShardSource() { return this; }
+ virtual intrusive_ptr<DocumentSource> getRouterSource() { return this; }
+
+ long long getLimit() const { return limit; }
+ void setLimit(long long newLimit) { limit = newLimit; }
+
/**
Create a limiting DocumentSource from BSON.
@@ -1019,7 +1027,7 @@ namespace mongo {
};
class DocumentSourceSkip :
- public DocumentSource {
+ public SplittableDocumentSource {
public:
// virtuals from DocumentSource
virtual ~DocumentSourceSkip();
@@ -1042,6 +1050,14 @@ namespace mongo {
static intrusive_ptr<DocumentSourceSkip> create(
const intrusive_ptr<ExpressionContext> &pExpCtx);
+ // Virtuals for SplittableDocumentSource
+ // Need to run on rounter. Can't run on shards.
+ virtual intrusive_ptr<DocumentSource> getShardSource() { return NULL; }
+ virtual intrusive_ptr<DocumentSource> getRouterSource() { return this; }
+
+ long long getSkip() const { return skip; }
+ void setSkip(long long newSkip) { skip = newSkip; }
+
/**
Create a skipping DocumentSource from BSON.
diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp
index 8bbcaff2113..48d12d5850f 100644
--- a/src/mongo/db/pipeline/document_source_limit.cpp
+++ b/src/mongo/db/pipeline/document_source_limit.cpp
@@ -27,9 +27,8 @@
namespace mongo {
const char DocumentSourceLimit::limitName[] = "$limit";
- DocumentSourceLimit::DocumentSourceLimit(
- const intrusive_ptr<ExpressionContext> &pExpCtx):
- DocumentSource(pExpCtx),
+ DocumentSourceLimit::DocumentSourceLimit(const intrusive_ptr<ExpressionContext> &pExpCtx):
+ SplittableDocumentSource(pExpCtx),
limit(0),
count(0) {
}
diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp
index d4c1fc2caa6..5024b817a90 100644
--- a/src/mongo/db/pipeline/document_source_skip.cpp
+++ b/src/mongo/db/pipeline/document_source_skip.cpp
@@ -28,9 +28,8 @@ namespace mongo {
const char DocumentSourceSkip::skipName[] = "$skip";
- DocumentSourceSkip::DocumentSourceSkip(
- const intrusive_ptr<ExpressionContext> &pExpCtx):
- DocumentSource(pExpCtx),
+ DocumentSourceSkip::DocumentSourceSkip(const intrusive_ptr<ExpressionContext> &pExpCtx):
+ SplittableDocumentSource(pExpCtx),
skip(0),
count(0) {
}
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 0bd4096fe78..52f725887fc 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -227,6 +227,35 @@ namespace mongo {
}
}
+ /* Move limits in front of skips. This is more optimal for sharding
+ * since currently, we can only split the pipeline at a single source
+ * and it is better to limit the results coming from each shard
+ */
+ for(int i = pSourceVector->size() - 1; i >= 1 /* not looking at 0 */; i--) {
+ DocumentSourceLimit* limit =
+ dynamic_cast<DocumentSourceLimit*>((*pSourceVector)[i].get());
+ DocumentSourceSkip* skip =
+ dynamic_cast<DocumentSourceSkip*>((*pSourceVector)[i-1].get());
+ if (limit && skip) {
+ // Increase limit by skip since the skipped docs now pass through the $limit
+ limit->setLimit(limit->getLimit() + skip->getSkip());
+ swap((*pSourceVector)[i], (*pSourceVector)[i-1]);
+
+ // Start at back again. This is needed to handle cases with more than 1 $limit
+ // (S means skip, L means limit)
+ //
+ // These two would work without second pass (assuming back to front ordering)
+ // SL -> LS
+ // SSL -> LSS
+ //
+ // The following cases need a second pass to handle the second limit
+ // SLL -> LLS
+ // SSLL -> LLSS
+ // SLSL -> LLSS
+ i = pSourceVector->size(); // decremented before next pass
+ }
+ }
+
/*
Coalesce adjacent filters where possible. Two adjacent filters
are equivalent to one filter whose predicate is the conjunction of