summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_cursor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_cursor.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp77
1 files changed, 33 insertions, 44 deletions
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index 6125fc6746b..d4e75b1d2b3 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -134,58 +134,47 @@ void DocumentSourceCursor::loadBatch() {
PlanExecutor::ExecState state;
Document resultObj;
- {
- AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss());
- uassertStatusOK(repl::ReplicationCoordinator::get(pExpCtx->opCtx)
- ->checkCanServeReadsFor(pExpCtx->opCtx, _exec->nss(), true));
-
- _exec->restoreState();
-
- {
- ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); });
-
- while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) {
- _currentBatch.enqueue(transformDoc(std::move(resultObj)));
-
- // As long as we're waiting for inserts, we shouldn't do any batching at this level
- // we need the whole pipeline to see each document to see if we should stop waiting.
- if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
- static_cast<long long>(_currentBatch.memUsageBytes()) >
- internalDocumentSourceCursorBatchSizeBytes.load()) {
- // End this batch and prepare PlanExecutor for yielding.
- _exec->saveState();
- return;
- }
- }
- // Special case for tailable cursor -- EOF doesn't preclude more results, so keep
- // the PlanExecutor alive.
- if (state == PlanExecutor::IS_EOF && pExpCtx->isTailableAwaitData()) {
+
+ AutoGetCollectionForRead autoColl(pExpCtx->opCtx, _exec->nss());
+ uassertStatusOK(repl::ReplicationCoordinator::get(pExpCtx->opCtx)
+ ->checkCanServeReadsFor(pExpCtx->opCtx, _exec->nss(), true));
+
+ _exec->restoreState();
+
+ try {
+ ON_BLOCK_EXIT([this] { recordPlanSummaryStats(); });
+
+ while ((state = _exec->getNext(&resultObj, nullptr)) == PlanExecutor::ADVANCED) {
+ _currentBatch.enqueue(transformDoc(std::move(resultObj)));
+
+ // As long as we're waiting for inserts, we shouldn't do any batching at this level we
+ // need the whole pipeline to see each document to see if we should stop waiting.
+ if (awaitDataState(pExpCtx->opCtx).shouldWaitForInserts ||
+ static_cast<long long>(_currentBatch.memUsageBytes()) >
+ internalDocumentSourceCursorBatchSizeBytes.load()) {
+ // End this batch and prepare PlanExecutor for yielding.
_exec->saveState();
return;
}
}
- // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we
- // must hold a collection lock to destroy '_exec', but we can only assume that our locks are
- // still held if '_exec' did not end in an error. If '_exec' encountered an error during a
- // yield, the locks might be yielded.
- if (state != PlanExecutor::FAILURE) {
- cleanupExecutor();
- }
- }
+ invariant(state == PlanExecutor::IS_EOF);
- switch (state) {
- case PlanExecutor::ADVANCED:
- case PlanExecutor::IS_EOF:
- return; // We've reached our limit or exhausted the cursor.
- case PlanExecutor::FAILURE: {
- _execStatus = WorkingSetCommon::getMemberObjectStatus(resultObj).withContext(
- "Error in $cursor stage");
- uassertStatusOK(_execStatus);
+ // Special case for tailable cursor -- EOF doesn't preclude more results, so keep the
+ // PlanExecutor alive.
+ if (pExpCtx->isTailableAwaitData()) {
+ _exec->saveState();
+ return;
}
- default:
- MONGO_UNREACHABLE;
+ } catch (...) {
+ // Record error details before re-throwing the exception.
+ _execStatus = exceptionToStatus().withContext("Error in $cursor stage");
+ throw;
}
+
+ // If we got here, there won't be any more documents, so destroy our PlanExecutor. Note we must
+ // hold a collection lock to destroy '_exec'.
+ cleanupExecutor();
}
void DocumentSourceCursor::_updateOplogTimestamp() {