diff options
author | Mathias Stearn <mathias@10gen.com> | 2013-07-18 16:49:14 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2013-07-22 15:42:39 -0400 |
commit | 6f1068dca58b4f51bf8859c98bba6fc28b10bdea (patch) | |
tree | ee41e07142fd3498044c6003a8332074d2660f31 | |
parent | 8f0c10ec3f576b9c44213114ce8540f8a6698206 (diff) | |
download | mongo-6f1068dca58b4f51bf8859c98bba6fc28b10bdea.tar.gz |
DocumentSourceCursor should be limit-aware
Follow-up to SERVER-6269 to prevent fetching more documents into a batch
than needed.
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 6 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 5 | ||||
-rw-r--r-- | src/mongo/dbtests/documentsourcetests.cpp | 39 |
4 files changed, 81 insertions, 1 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 47f56e2d66c..d336e3553e1 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -370,6 +370,7 @@ namespace mongo { virtual bool advance(); virtual Document getCurrent(); virtual void setSource(DocumentSource *pSource); + virtual bool coalesce(const intrusive_ptr<DocumentSource>& nextSource); /** * Release the Cursor and the read lock it requires, but without changing the other data. @@ -430,6 +431,9 @@ namespace mongo { void setSort(const BSONObj& sort) { _sort = sort; } void setProjection(const BSONObj& projection, const ParsedDeps& deps); + + /// returns -1 for no limit + long long getLimit() const; protected: // virtuals from DocumentSource virtual void sourceToBson(BSONObjBuilder *pBuilder, bool explain) const; @@ -450,6 +454,8 @@ namespace mongo { BSONObj _sort; shared_ptr<Projection> _projection; // shared with pClientCursor ParsedDeps _dependencies; + intrusive_ptr<DocumentSourceLimit> _limit; + long long _docsAddedToBatches; // for _limit enforcement string ns; // namespace CursorId _cursorId; diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index c432632d6fd..a93fd0032dc 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -149,6 +149,13 @@ namespace mongo { : Document(next)); } + if (_limit) { + if (++_docsAddedToBatches == _limit->getLimit()) { + break; + } + verify(_docsAddedToBatches < _limit->getLimit()); + } + memUsageBytes += _currentBatch.back().getApproximateSize(); if (memUsageBytes > MaxBytesToReturnToClientAtOnce) { @@ -179,6 +186,26 @@ namespace mongo { verify(false); } + long long DocumentSourceCursor::getLimit() const { + return _limit ? _limit->getLimit() : -1; + } + + bool DocumentSourceCursor::coalesce(const intrusive_ptr<DocumentSource>& nextSource) { + // Note: Currently we assume the $limit is logically after any $sort or + // $match. If we ever pull in $match or $sort using this method, we + // will need to keep track of the order of the sub-stages. + + if (!_limit) { + _limit = dynamic_cast<DocumentSourceLimit*>(nextSource.get()); + return _limit; // false if next is not a $limit + } + else { + return _limit->coalesce(nextSource); + } + + return false; + } + void DocumentSourceCursor::sourceToBson( BSONObjBuilder *pBuilder, bool explain) const { @@ -193,6 +220,10 @@ namespace mongo { pBuilder->append("sort", _sort); } + if (_limit) { + pBuilder->append("limit", _limit->getLimit()); + } + BSONObj projectionSpec; if (_projection) { projectionSpec = _projection->getSpec(); @@ -221,6 +252,7 @@ namespace mongo { const intrusive_ptr<ExpressionContext> &pCtx) : DocumentSource(pCtx) , unstarted(true) + , _docsAddedToBatches(0) , ns(ns) , _cursorId(cursorId) , _collMetadata(shardingState.needCollectionMetadata( ns ) diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 8be28311e8e..bd9e5d1e991 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -237,8 +237,11 @@ namespace mongo { pSource->setProjection(projection, dependencies); } + while (!sources.empty() && pSource->coalesce(sources.front())) { + sources.pop_front(); + } + // If we are in an explain, we won't actually use the created cursor so release it. - // This is important to avoid double locking when we use DBDirectClient to run explain. if (pPipeline->isExplain()) pSource->dispose(); diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index f3857bc72a2..77ee570a672 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -286,6 +286,44 @@ namespace DocumentSourceTests { WriterClientScope _writerScope; }; + /** Test coalescing a limit into a cursor */ + class LimitCoalesce : public Base { + public: + intrusive_ptr<DocumentSourceLimit> mkLimit(long long limit) { + return DocumentSourceLimit::create(ctx(), limit); + } + void run() { + client.insert( ns, BSON( "a" << 1 ) ); + client.insert( ns, BSON( "a" << 2 ) ); + client.insert( ns, BSON( "a" << 3 ) ); + createSource(); + + // initial limit becomes limit of cursor + ASSERT(source()->coalesce(mkLimit(10))); + ASSERT_EQUALS(source()->getLimit(), 10); + + // smaller limit lowers cursor limit + ASSERT(source()->coalesce(mkLimit(2))); + ASSERT_EQUALS(source()->getLimit(), 2); + + // higher limit doesn't effect cursor limit + ASSERT(source()->coalesce(mkLimit(3))); + ASSERT_EQUALS(source()->getLimit(), 2); + + // The cursor allows exactly 2 documents through + ASSERT( !source()->eof() ); + ASSERT_EQUALS( 1, source()->getCurrent()->getValue( "a" ).coerceToInt() ); + + ASSERT( source()->advance() ); + ASSERT( !source()->eof() ); + ASSERT_EQUALS( 2, source()->getCurrent()->getValue( "a" ).coerceToInt() ); + + ASSERT( !source()->advance() ); + ASSERT( source()->eof() ); + } + }; + + } // namespace DocumentSourceCursor namespace DocumentSourceLimit { @@ -1764,6 +1802,7 @@ namespace DocumentSourceTests { add<DocumentSourceCursor::Dispose>(); add<DocumentSourceCursor::IterateDispose>(); add<DocumentSourceCursor::Yield>(); + add<DocumentSourceCursor::LimitCoalesce>(); add<DocumentSourceLimit::DisposeSource>(); add<DocumentSourceLimit::DisposeSourceCascade>(); |