diff options
-rw-r--r-- | jstests/noPassthroughWithMongod/exchangeProducer.js | 11 | ||||
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 7 |
4 files changed, 18 insertions, 7 deletions
diff --git a/jstests/noPassthroughWithMongod/exchangeProducer.js b/jstests/noPassthroughWithMongod/exchangeProducer.js index c0cc8343448..f3f23ee4e0d 100644 --- a/jstests/noPassthroughWithMongod/exchangeProducer.js +++ b/jstests/noPassthroughWithMongod/exchangeProducer.js @@ -282,12 +282,15 @@ TestData.disableImplicitSessions = true; assert.eq(numConsumers, res.cursors.length); let parallelShells = []; + failingConsumer(res.cursors[0], ErrorCodes.FailPointEnabled)(); - // All consumers will see the exchange fail error. - for (let i = 0; i < numConsumers; ++i) { - parallelShells.push(failingConsumer(res.cursors[i], ErrorCodes.FailPointEnabled)); + // After the first consumer sees an error, each subsequent consumer should see an + // 'ExchangePassthrough' error. + for (let i = 0; i < numConsumers - 1; ++i) { + parallelShells.push( + failingConsumer(res.cursors[i + 1], ErrorCodes.ExchangePassthrough)); } - for (let i = 0; i < numConsumers; ++i) { + for (let i = 0; i < numConsumers - 1; ++i) { parallelShells[i](); } } finally { diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 0d79c3a1618..c46116b6c11 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -272,6 +272,7 @@ error_code("JSInterpreterFailureWithStack", 271, extra="JSExceptionInfo") error_code("MigrationConflict", 272) error_code("ProducerConsumerQueueProducerQueueDepthExceeded", 273) error_code("ProducerConsumerQueueConsumed", 274) +error_code("ExchangePassthrough", 275) # For exchange execution in aggregation. Do not reuse. # Error codes 4000-8999 are reserved. # Non-sequential error codes (for compatibility only) diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp index 365f7073789..461e26b0109 100644 --- a/src/mongo/db/pipeline/document_source_exchange.cpp +++ b/src/mongo/db/pipeline/document_source_exchange.cpp @@ -235,8 +235,10 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t for (;;) { // Execute only in case we have not encountered an error. - uassertStatusOKWithContext(_errorInLoadNextBatch, - "Exchange failed due to an error on different thread."); + if (!_errorInLoadNextBatch.isOK()) { + uasserted(ErrorCodes::ExchangePassthrough, + "Exchange failed due to an error on different thread."); + } // Check if we have a document. if (!_consumers[consumerId]->isEmpty()) { diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index cae5f7944a5..685c86affeb 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -569,7 +569,12 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t remote.status = std::move(status); // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We // remove the unreachable host entirely from consideration by marking it as exhausted. - if (_params.getAllowPartialResults()) { + // + // The ExchangePassthrough error code is an internal-only error code used specifically to + // communicate that an error has occurred, but some other thread is responsible for returning + // the error to the user. In order to avoid polluting the user's error message, we ingore such + // errors with the expectation that all outstanding cursors will be closed promptly. + if (_params.getAllowPartialResults() || remote.status == ErrorCodes::ExchangePassthrough) { remote.status = Status::OK(); // Clear the results buffer and cursor id. |