summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2013-07-18 16:49:14 -0400
committerMathias Stearn <mathias@10gen.com>2013-07-22 15:42:39 -0400
commit6f1068dca58b4f51bf8859c98bba6fc28b10bdea (patch)
treeee41e07142fd3498044c6003a8332074d2660f31
parent8f0c10ec3f576b9c44213114ce8540f8a6698206 (diff)
downloadmongo-6f1068dca58b4f51bf8859c98bba6fc28b10bdea.tar.gz
DocumentSourceCursor should be limit-aware
Follow-up to SERVER-6269 to prevent fetching more documents into a batch than needed.
-rw-r--r--src/mongo/db/pipeline/document_source.h6
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp32
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp5
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp39
4 files changed, 81 insertions, 1 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 47f56e2d66c..d336e3553e1 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -370,6 +370,7 @@ namespace mongo {
virtual bool advance();
virtual Document getCurrent();
virtual void setSource(DocumentSource *pSource);
+ virtual bool coalesce(const intrusive_ptr<DocumentSource>& nextSource);
/**
* Release the Cursor and the read lock it requires, but without changing the other data.
@@ -430,6 +431,9 @@ namespace mongo {
void setSort(const BSONObj& sort) { _sort = sort; }
void setProjection(const BSONObj& projection, const ParsedDeps& deps);
+
+ /// returns -1 for no limit
+ long long getLimit() const;
protected:
// virtuals from DocumentSource
virtual void sourceToBson(BSONObjBuilder *pBuilder, bool explain) const;
@@ -450,6 +454,8 @@ namespace mongo {
BSONObj _sort;
shared_ptr<Projection> _projection; // shared with pClientCursor
ParsedDeps _dependencies;
+ intrusive_ptr<DocumentSourceLimit> _limit;
+ long long _docsAddedToBatches; // for _limit enforcement
string ns; // namespace
CursorId _cursorId;
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index c432632d6fd..a93fd0032dc 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -149,6 +149,13 @@ namespace mongo {
: Document(next));
}
+ if (_limit) {
+ if (++_docsAddedToBatches == _limit->getLimit()) {
+ break;
+ }
+ verify(_docsAddedToBatches < _limit->getLimit());
+ }
+
memUsageBytes += _currentBatch.back().getApproximateSize();
if (memUsageBytes > MaxBytesToReturnToClientAtOnce) {
@@ -179,6 +186,26 @@ namespace mongo {
verify(false);
}
+ long long DocumentSourceCursor::getLimit() const {
+ return _limit ? _limit->getLimit() : -1;
+ }
+
+ bool DocumentSourceCursor::coalesce(const intrusive_ptr<DocumentSource>& nextSource) {
+ // Note: Currently we assume the $limit is logically after any $sort or
+ // $match. If we ever pull in $match or $sort using this method, we
+ // will need to keep track of the order of the sub-stages.
+
+ if (!_limit) {
+ _limit = dynamic_cast<DocumentSourceLimit*>(nextSource.get());
+ return _limit; // false if next is not a $limit
+ }
+ else {
+ return _limit->coalesce(nextSource);
+ }
+
+ return false;
+ }
+
void DocumentSourceCursor::sourceToBson(
BSONObjBuilder *pBuilder, bool explain) const {
@@ -193,6 +220,10 @@ namespace mongo {
pBuilder->append("sort", _sort);
}
+ if (_limit) {
+ pBuilder->append("limit", _limit->getLimit());
+ }
+
BSONObj projectionSpec;
if (_projection) {
projectionSpec = _projection->getSpec();
@@ -221,6 +252,7 @@ namespace mongo {
const intrusive_ptr<ExpressionContext> &pCtx)
: DocumentSource(pCtx)
, unstarted(true)
+ , _docsAddedToBatches(0)
, ns(ns)
, _cursorId(cursorId)
, _collMetadata(shardingState.needCollectionMetadata( ns )
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 8be28311e8e..bd9e5d1e991 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -237,8 +237,11 @@ namespace mongo {
pSource->setProjection(projection, dependencies);
}
+ while (!sources.empty() && pSource->coalesce(sources.front())) {
+ sources.pop_front();
+ }
+
// If we are in an explain, we won't actually use the created cursor so release it.
- // This is important to avoid double locking when we use DBDirectClient to run explain.
if (pPipeline->isExplain())
pSource->dispose();
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index f3857bc72a2..77ee570a672 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -286,6 +286,44 @@ namespace DocumentSourceTests {
WriterClientScope _writerScope;
};
+ /** Test coalescing a limit into a cursor */
+ class LimitCoalesce : public Base {
+ public:
+ intrusive_ptr<DocumentSourceLimit> mkLimit(long long limit) {
+ return DocumentSourceLimit::create(ctx(), limit);
+ }
+ void run() {
+ client.insert( ns, BSON( "a" << 1 ) );
+ client.insert( ns, BSON( "a" << 2 ) );
+ client.insert( ns, BSON( "a" << 3 ) );
+ createSource();
+
+ // initial limit becomes limit of cursor
+ ASSERT(source()->coalesce(mkLimit(10)));
+ ASSERT_EQUALS(source()->getLimit(), 10);
+
+ // smaller limit lowers cursor limit
+ ASSERT(source()->coalesce(mkLimit(2)));
+ ASSERT_EQUALS(source()->getLimit(), 2);
+
+ // higher limit doesn't effect cursor limit
+ ASSERT(source()->coalesce(mkLimit(3)));
+ ASSERT_EQUALS(source()->getLimit(), 2);
+
+ // The cursor allows exactly 2 documents through
+ ASSERT( !source()->eof() );
+ ASSERT_EQUALS( 1, source()->getCurrent()->getValue( "a" ).coerceToInt() );
+
+ ASSERT( source()->advance() );
+ ASSERT( !source()->eof() );
+ ASSERT_EQUALS( 2, source()->getCurrent()->getValue( "a" ).coerceToInt() );
+
+ ASSERT( !source()->advance() );
+ ASSERT( source()->eof() );
+ }
+ };
+
+
} // namespace DocumentSourceCursor
namespace DocumentSourceLimit {
@@ -1764,6 +1802,7 @@ namespace DocumentSourceTests {
add<DocumentSourceCursor::Dispose>();
add<DocumentSourceCursor::IterateDispose>();
add<DocumentSourceCursor::Yield>();
+ add<DocumentSourceCursor::LimitCoalesce>();
add<DocumentSourceLimit::DisposeSource>();
add<DocumentSourceLimit::DisposeSourceCascade>();