summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthroughWithMongod/exchangeProducer.js11
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.cpp6
-rw-r--r--src/mongo/s/query/async_results_merger.cpp7
4 files changed, 18 insertions, 7 deletions
diff --git a/jstests/noPassthroughWithMongod/exchangeProducer.js b/jstests/noPassthroughWithMongod/exchangeProducer.js
index c0cc8343448..f3f23ee4e0d 100644
--- a/jstests/noPassthroughWithMongod/exchangeProducer.js
+++ b/jstests/noPassthroughWithMongod/exchangeProducer.js
@@ -282,12 +282,15 @@ TestData.disableImplicitSessions = true;
assert.eq(numConsumers, res.cursors.length);
let parallelShells = [];
+ failingConsumer(res.cursors[0], ErrorCodes.FailPointEnabled)();
- // All consumers will see the exchange fail error.
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells.push(failingConsumer(res.cursors[i], ErrorCodes.FailPointEnabled));
+ // After the first consumer sees an error, each subsequent consumer should see an
+ // 'ExchangePassthrough' error.
+ for (let i = 0; i < numConsumers - 1; ++i) {
+ parallelShells.push(
+ failingConsumer(res.cursors[i + 1], ErrorCodes.ExchangePassthrough));
}
- for (let i = 0; i < numConsumers; ++i) {
+ for (let i = 0; i < numConsumers - 1; ++i) {
parallelShells[i]();
}
} finally {
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 0d79c3a1618..c46116b6c11 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -272,6 +272,7 @@ error_code("JSInterpreterFailureWithStack", 271, extra="JSExceptionInfo")
error_code("MigrationConflict", 272)
error_code("ProducerConsumerQueueProducerQueueDepthExceeded", 273)
error_code("ProducerConsumerQueueConsumed", 274)
+error_code("ExchangePassthrough", 275) # For exchange execution in aggregation. Do not reuse.
# Error codes 4000-8999 are reserved.
# Non-sequential error codes (for compatibility only)
diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp
index 365f7073789..461e26b0109 100644
--- a/src/mongo/db/pipeline/document_source_exchange.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange.cpp
@@ -235,8 +235,10 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t
for (;;) {
// Execute only in case we have not encountered an error.
- uassertStatusOKWithContext(_errorInLoadNextBatch,
- "Exchange failed due to an error on different thread.");
+ if (!_errorInLoadNextBatch.isOK()) {
+ uasserted(ErrorCodes::ExchangePassthrough,
+ "Exchange failed due to an error on different thread.");
+ }
// Check if we have a document.
if (!_consumers[consumerId]->isEmpty()) {
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index cae5f7944a5..685c86affeb 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -569,7 +569,12 @@ void AsyncResultsMerger::_cleanUpFailedBatch(WithLock lk, Status status, size_t
remote.status = std::move(status);
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
// remove the unreachable host entirely from consideration by marking it as exhausted.
- if (_params.getAllowPartialResults()) {
+ //
+ // The ExchangePassthrough error code is an internal-only error code used specifically to
+ // communicate that an error has occurred, but some other thread is responsible for returning
+ // the error to the user. In order to avoid polluting the user's error message, we ingore such
+ // errors with the expectation that all outstanding cursors will be closed promptly.
+ if (_params.getAllowPartialResults() || remote.status == ErrorCodes::ExchangePassthrough) {
remote.status = Status::OK();
// Clear the results buffer and cursor id.