diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-08-07 17:42:09 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-08-09 16:22:41 -0400 |
commit | c1aee1915536be2d1928d1098e133acbfdc3a575 (patch) | |
tree | e1fe23277781a498b95306cd57c3ddaedd81e56e /src/mongo/db/commands/run_aggregate.cpp | |
parent | bdaa2417d94bf1dab47264ebed36e45224b5d83e (diff) | |
download | mongo-c1aee1915536be2d1928d1098e133acbfdc3a575.tar.gz |
SERVER-29140 Close cursor for invalidate change notification entries.
This reverts commit ddcc982e9b1ecca89fc315d698990176ba73df25.
Diffstat (limited to 'src/mongo/db/commands/run_aggregate.cpp')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index d45cda33b07..753cc2bfebd 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -43,6 +43,7 @@ #include "mongo/db/exec/working_set_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/accumulator.h" +#include "mongo/db/pipeline/close_change_stream_exception.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/expression.h" @@ -99,7 +100,18 @@ bool handleCursorCommand(OperationContext* opCtx, // The initial getNext() on a PipelineProxyStage may be very expensive so we don't // do it when batchSize is 0 since that indicates a desire for a fast return. PlanExecutor::ExecState state; - if ((state = cursor->getExecutor()->getNext(&next, nullptr)) == PlanExecutor::IS_EOF) { + + try { + state = cursor->getExecutor()->getNext(&next, nullptr); + } catch (const CloseChangeStreamException& ex) { + // This exception is thrown when a $changeStream stage encounters an event + // that invalidates the cursor. We should close the cursor and return without + // error. + cursor = nullptr; + break; + } + + if (state == PlanExecutor::IS_EOF) { if (!cursor->isTailable()) { // make it an obvious error to use cursor or executor after this point cursor = nullptr; |