diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-10 13:47:13 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2017-07-11 13:51:24 -0400 |
commit | 3bab15739e421e9eed4bf180cbcf5c7392a9a90d (patch) | |
tree | f346909f73f9cac8d1eaf3811944e521945cf8d8 /src/mongo/db/pipeline/document_source_cursor.cpp | |
parent | d712243cb381d5ae98d4bc132ace16aac91d0fe9 (diff) | |
download | mongo-3bab15739e421e9eed4bf180cbcf5c7392a9a90d.tar.gz |
SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries
Diffstat (limited to 'src/mongo/db/pipeline/document_source_cursor.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 9704d826e53..f15bafd5b8b 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -31,7 +31,6 @@ #include "mongo/db/pipeline/document_source_cursor.h" #include "mongo/db/catalog/collection.h" -#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/pipeline/document.h" @@ -99,12 +98,21 @@ void DocumentSourceCursor::loadBatch() { memUsageBytes += _currentBatch.back().getApproximateSize(); - if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { + // As long as we're waiting for inserts, we shouldn't do any batching at this level + // we need the whole pipeline to see each document to see if we should stop waiting. + if (shouldWaitForInserts(pExpCtx->opCtx) || + memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) { // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; } } + // Special case for tailable cursor -- EOF doesn't preclude more results, so keep + // the PlanExecutor alive. + if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) { + _exec->saveState(); + return; + } } } |