diff options
author | Mathias Stearn <mathias@10gen.com> | 2012-10-18 17:21:45 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2012-11-06 15:45:44 -0500 |
commit | f4270abac0d8ca752216e11256f93eb176c6ed8c (patch) | |
tree | 47b518d6ae9840fa250429fa382a27aa500037ab | |
parent | f43972dd586475132045bb11a96baac168f5b8cd (diff) | |
download | mongo-f4270abac0d8ca752216e11256f93eb176c6ed8c.tar.gz |
SERVER-7408 Correctly handle $skip and $limit in sharded agg
This bug only comes up if the first $skip or $limit precedes the first
$sort or $limit. This is very rare, but should still be handled
correctly.
-rw-r--r-- | jstests/aggregation/testshard1.js | 22 | ||||
-rwxr-xr-x | src/mongo/db/pipeline/document_source.h | 20 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_limit.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_skip.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline.cpp | 29 |
5 files changed, 73 insertions, 8 deletions
diff --git a/jstests/aggregation/testshard1.js b/jstests/aggregation/testshard1.js index 3bd0ff5f1bf..8d5d4f376c4 100644 --- a/jstests/aggregation/testshard1.js +++ b/jstests/aggregation/testshard1.js @@ -131,5 +131,27 @@ for(i = 0; i < 6; ++i) { 'agg sharded test simple match failed'); } +function testSkipLimit(ops, expectedCount) { + if (expectedCount > 10) { + // make shard -> mongos intermediate results less than 16MB + ops.unshift({$project: {_id:1}}) + } + + ops.push({$group: {_id:1, count: {$sum: 1}}}); + + var out = db.runCommand({aggregate:"ts1", pipeline:ops}); + assert.commandWorked(out); + assert.eq(out.result[0].count, expectedCount); +} + +testSkipLimit([], nItems); // control +testSkipLimit([{$skip:10}], nItems - 10); +testSkipLimit([{$limit:10}], 10); +testSkipLimit([{$skip:5}, {$limit:10}], 10); +testSkipLimit([{$limit:10}, {$skip:5}], 10 - 5); +testSkipLimit([{$skip:5}, {$skip: 3}, {$limit:10}], 10); +testSkipLimit([{$skip:5}, {$limit:10}, {$skip: 3}], 10 - 3); +testSkipLimit([{$limit:10}, {$skip:5}, {$skip: 3}], 10 - 3 - 5); + // shut everything down shardedAggTest.stop(); diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 329564ac490..e1e2cffcf96 100755 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -965,7 +965,7 @@ namespace mongo { class DocumentSourceLimit : - public DocumentSource { + public SplittableDocumentSource { public: // virtuals from DocumentSource virtual ~DocumentSourceLimit(); @@ -988,6 +988,14 @@ namespace mongo { static intrusive_ptr<DocumentSourceLimit> create( const intrusive_ptr<ExpressionContext> &pExpCtx); + // Virtuals for SplittableDocumentSource + // Need to run on rounter. Running on shard as well is an optimization. + virtual intrusive_ptr<DocumentSource> getShardSource() { return this; } + virtual intrusive_ptr<DocumentSource> getRouterSource() { return this; } + + long long getLimit() const { return limit; } + void setLimit(long long newLimit) { limit = newLimit; } + /** Create a limiting DocumentSource from BSON. @@ -1019,7 +1027,7 @@ namespace mongo { }; class DocumentSourceSkip : - public DocumentSource { + public SplittableDocumentSource { public: // virtuals from DocumentSource virtual ~DocumentSourceSkip(); @@ -1042,6 +1050,14 @@ namespace mongo { static intrusive_ptr<DocumentSourceSkip> create( const intrusive_ptr<ExpressionContext> &pExpCtx); + // Virtuals for SplittableDocumentSource + // Need to run on rounter. Can't run on shards. + virtual intrusive_ptr<DocumentSource> getShardSource() { return NULL; } + virtual intrusive_ptr<DocumentSource> getRouterSource() { return this; } + + long long getSkip() const { return skip; } + void setSkip(long long newSkip) { skip = newSkip; } + /** Create a skipping DocumentSource from BSON. diff --git a/src/mongo/db/pipeline/document_source_limit.cpp b/src/mongo/db/pipeline/document_source_limit.cpp index 8bbcaff2113..48d12d5850f 100644 --- a/src/mongo/db/pipeline/document_source_limit.cpp +++ b/src/mongo/db/pipeline/document_source_limit.cpp @@ -27,9 +27,8 @@ namespace mongo { const char DocumentSourceLimit::limitName[] = "$limit"; - DocumentSourceLimit::DocumentSourceLimit( - const intrusive_ptr<ExpressionContext> &pExpCtx): - DocumentSource(pExpCtx), + DocumentSourceLimit::DocumentSourceLimit(const intrusive_ptr<ExpressionContext> &pExpCtx): + SplittableDocumentSource(pExpCtx), limit(0), count(0) { } diff --git a/src/mongo/db/pipeline/document_source_skip.cpp b/src/mongo/db/pipeline/document_source_skip.cpp index d4c1fc2caa6..5024b817a90 100644 --- a/src/mongo/db/pipeline/document_source_skip.cpp +++ b/src/mongo/db/pipeline/document_source_skip.cpp @@ -28,9 +28,8 @@ namespace mongo { const char DocumentSourceSkip::skipName[] = "$skip"; - DocumentSourceSkip::DocumentSourceSkip( - const intrusive_ptr<ExpressionContext> &pExpCtx): - DocumentSource(pExpCtx), + DocumentSourceSkip::DocumentSourceSkip(const intrusive_ptr<ExpressionContext> &pExpCtx): + SplittableDocumentSource(pExpCtx), skip(0), count(0) { } diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 0bd4096fe78..52f725887fc 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -227,6 +227,35 @@ namespace mongo { } } + /* Move limits in front of skips. This is more optimal for sharding + * since currently, we can only split the pipeline at a single source + * and it is better to limit the results coming from each shard + */ + for(int i = pSourceVector->size() - 1; i >= 1 /* not looking at 0 */; i--) { + DocumentSourceLimit* limit = + dynamic_cast<DocumentSourceLimit*>((*pSourceVector)[i].get()); + DocumentSourceSkip* skip = + dynamic_cast<DocumentSourceSkip*>((*pSourceVector)[i-1].get()); + if (limit && skip) { + // Increase limit by skip since the skipped docs now pass through the $limit + limit->setLimit(limit->getLimit() + skip->getSkip()); + swap((*pSourceVector)[i], (*pSourceVector)[i-1]); + + // Start at back again. This is needed to handle cases with more than 1 $limit + // (S means skip, L means limit) + // + // These two would work without second pass (assuming back to front ordering) + // SL -> LS + // SSL -> LSS + // + // The following cases need a second pass to handle the second limit + // SLL -> LLS + // SSLL -> LLSS + // SLSL -> LLSS + i = pSourceVector->size(); // decremented before next pass + } + } + /* Coalesce adjacent filters where possible. Two adjacent filters are equivalent to one filter whose predicate is the conjunction of |