summaryrefslogtreecommitdiff
path: root/src/mongo/db/commands/run_aggregate.cpp
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2017-08-07 17:42:09 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2017-08-09 16:22:41 -0400
commitc1aee1915536be2d1928d1098e133acbfdc3a575 (patch)
treee1fe23277781a498b95306cd57c3ddaedd81e56e /src/mongo/db/commands/run_aggregate.cpp
parentbdaa2417d94bf1dab47264ebed36e45224b5d83e (diff)
downloadmongo-c1aee1915536be2d1928d1098e133acbfdc3a575.tar.gz
SERVER-29140 Close cursor for invalidate change notification entries.
This reverts commit ddcc982e9b1ecca89fc315d698990176ba73df25.
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp14
1 files changed, 13 insertions, 1 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index d45cda33b07..753cc2bfebd 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/accumulator.h"
+#include "mongo/db/pipeline/close_change_stream_exception.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/expression.h"
@@ -99,7 +100,18 @@ bool handleCursorCommand(OperationContext* opCtx,
// The initial getNext() on a PipelineProxyStage may be very expensive so we don't
// do it when batchSize is 0 since that indicates a desire for a fast return.
PlanExecutor::ExecState state;
- if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) {
+
+ try {
+ state = cursor->getExecutor()->getNext(&next, nullptr);
+ } catch (const CloseChangeStreamException& ex) {
+ // This exception is thrown when a $changeStream stage encounters an event
+ // that invalidates the cursor. We should close the cursor and return without
+ // error.
+ cursor = nullptr;
+ break;
+ }
+
+ if (state == PlanExecutor::IS_EOF) {
if (!cursor->isTailable()) {
// make it an obvious error to use cursor or executor after this point
cursor = nullptr;