diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_cursor.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 77 |
1 files changed, 33 insertions, 44 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index 6125fc6746b..d4e75b1d2b3 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -134,58 +134,47 @@ void DocumentSourceCursor::loadBatch() { PlanExecutor::ExecState state; Document resultObj; - { - AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss()); - uassertStatusOK(repl::ReplicationCoordinator::get(pExpCtx->opCtx) - ->checkCanServeReadsFor(pExpCtx->opCtx, _exec->nss(), true)); - - _exec->restoreState(); - - { - ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); }); - - while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) { - _currentBatch.enqueue(transformDoc(std::move(resultObj))); - - // As long as we're waiting for inserts, we shouldn't do any batching at this level - // we need the whole pipeline to see each document to see if we should stop waiting. - if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || - static_cast<long long>(_currentBatch.memUsageBytes()) > - internalDocumentSourceCursorBatchSizeBytes.load()) { - // End this batch and prepare PlanExecutor for yielding. - _exec->saveState(); - return; - } - } - // Special case for tailable cursor -- EOF doesn't preclude more results, so keep - // the PlanExecutor alive. - if (state == PlanExecutor::IS_EOF && pExpCtx->isTailableAwaitData()) { + + AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss()); + uassertStatusOK(repl::ReplicationCoordinator::get(pExpCtx->opCtx) + ->checkCanServeReadsFor(pExpCtx->opCtx, _exec->nss(), true)); + + _exec->restoreState(); + + try { + ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); }); + + while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) { + _currentBatch.enqueue(transformDoc(std::move(resultObj))); + + // As long as we're waiting for inserts, we shouldn't do any batching at this level we + // need the whole pipeline to see each document to see if we should stop waiting. + if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts || + static_cast<long long>(_currentBatch.memUsageBytes()) > + internalDocumentSourceCursorBatchSizeBytes.load()) { + // End this batch and prepare PlanExecutor for yielding. _exec->saveState(); return; } } - // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we - // must hold a collection lock to destroy '_exec', but we can only assume that our locks are - // still held if '_exec' did not end in an error. If '_exec' encountered an error during a - // yield, the locks might be yielded. - if (state != PlanExecutor::FAILURE) { - cleanupExecutor(); - } - } + invariant(state == PlanExecutor::IS_EOF); - switch (state) { - case PlanExecutor::ADVANCED: - case PlanExecutor::IS_EOF: - return; // We've reached our limit or exhausted the cursor. - case PlanExecutor::FAILURE: { - _execStatus = WorkingSetCommon::getMemberObjectStatus(resultObj).withContext( - "Error in $cursor stage"); - uassertStatusOK(_execStatus); + // Special case for tailable cursor -- EOF doesn't preclude more results, so keep the + // PlanExecutor alive. + if (pExpCtx->isTailableAwaitData()) { + _exec->saveState(); + return; } - default: - MONGO_UNREACHABLE; + } catch (...) { + // Record error details before re-throwing the exception. + _execStatus = exceptionToStatus().withContext("Error in $cursor stage"); + throw; } + + // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we must + // hold a collection lock to destroy '_exec'. + cleanupExecutor(); } void DocumentSourceCursor::_updateOplogTimestamp() { |