summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_cursor.cpp
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-07-10 13:47:13 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2017-07-11 13:51:24 -0400
commit3bab15739e421e9eed4bf180cbcf5c7392a9a90d (patch)
treef346909f73f9cac8d1eaf3811944e521945cf8d8 /src/mongo/db/pipeline/document_source_cursor.cpp
parentd712243cb381d5ae98d4bc132ace16aac91d0fe9 (diff)
downloadmongo-3bab15739e421e9eed4bf180cbcf5c7392a9a90d.tar.gz
SERVER-29128 Make $changeNotification stage return a tailable, awaitData cursor that continuously gives out oplog entries
Diffstat (limited to 'src/mongo/db/pipeline/document_source_cursor.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp12
1 files changed, 10 insertions, 2 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 9704d826e53..f15bafd5b8b 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -31,7 +31,6 @@
#include "mongo/db/pipeline/document_source_cursor.h"
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/pipeline/document.h"
@@ -99,12 +98,21 @@ void DocumentSourceCursor::loadBatch() {
memUsageBytes += _currentBatch.back().getApproximateSize();
- if (memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
+ // As long as we're waiting for inserts, we shouldn't do any batching at this level
+ // we need the whole pipeline to see each document to see if we should stop waiting.
+ if (shouldWaitForInserts(pExpCtx->opCtx) ||
+ memUsageBytes > internalDocumentSourceCursorBatchSizeBytes.load()) {
// End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
}
}
+ // Special case for tailable cursor -- EOF doesn't preclude more results, so keep
+ // the PlanExecutor alive.
+ if (state == PlanExecutor::IS_EOF && pExpCtx->isTailable()) {
+ _exec->saveState();
+ return;
+ }
}
}