diff options
author | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-10-09 11:37:04 -0400 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-10-10 13:32:08 -0400 |
commit | beda255a13b61570683968d6ac18a02f1d9bf765 (patch) | |
tree | bd545566af4000cdc74f7823ad0f555a4ce86445 /src/mongo/db/pipeline/document_source_exchange.cpp | |
parent | 670963110d9d226824842d22540a79154fce59a1 (diff) | |
download | mongo-beda255a13b61570683968d6ac18a02f1d9bf765.tar.gz |
SERVER-37525 Error handling in the exchange can cause invariant failures in detach/reattach
Diffstat (limited to 'src/mongo/db/pipeline/document_source_exchange.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.cpp | 48 |
1 files changed, 36 insertions, 12 deletions
diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp index ad44df7df14..9aaeb851387 100644 --- a/src/mongo/db/pipeline/document_source_exchange.cpp +++ b/src/mongo/db/pipeline/document_source_exchange.cpp @@ -41,6 +41,8 @@ namespace mongo { +MONGO_FAIL_POINT_DEFINE(exchangeFailLoadNextBatch); + constexpr size_t Exchange::kMaxBufferSize; constexpr size_t Exchange::kMaxNumberConsumers; @@ -230,6 +232,10 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t stdx::unique_lock<stdx::mutex> lk(_mutex); for (;;) { + // Execute only in case we have not encountered an error. + uassertStatusOKWithContext(_errorInLoadNextBatch, + "Exchange failed due to an error on different thread."); + // Check if we have a document. if (!_consumers[consumerId]->isEmpty()) { auto doc = _consumers[consumerId]->getNext(); @@ -247,13 +253,9 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t if (_loadingThreadId == kInvalidThreadId) { LOG(3) << "A consumer " << consumerId << " begins loading"; - // This consumer won the race and will fill the buffers. - _loadingThreadId = consumerId; - - { - // Make sure we detach the context even when exceptions are thrown; we wrap the - // detach in the guard. - ON_BLOCK_EXIT([this] { _pipeline->detachFromOperationContext(); }); + try { + // This consumer won the race and will fill the buffers. + _loadingThreadId = consumerId; _pipeline->reattachToOperationContext(opCtx); @@ -262,13 +264,29 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t // The return value is an index of a full consumer buffer. size_t fullConsumerId = loadNextBatch(); + if (MONGO_FAIL_POINT(exchangeFailLoadNextBatch)) { + log() << "exchangeFailLoadNextBatch fail point enabled."; + uasserted(ErrorCodes::FailPointEnabled, + "Asserting on loading the next batch due to failpoint."); + } + + _pipeline->detachFromOperationContext(); + // The loading cannot continue until the consumer with the full buffer consumes some // documents. _loadingThreadId = fullConsumerId; - } - // Wake up everybody and try to make some progress. - _haveBufferSpace.notify_all(); + // Wake up everybody and try to make some progress. + _haveBufferSpace.notify_all(); + } catch (const DBException& ex) { + _errorInLoadNextBatch = ex.toStatus(); + + // We have to wake up all other blocked threads so they can detect the error and + // fail too. They can be woken up only after _errorInLoadNextBatch has been set. + _haveBufferSpace.notify_all(); + + throw; + } } else { // Some other consumer is already loading the buffers. There is nothing else we can do // but wait. @@ -361,14 +379,20 @@ size_t Exchange::getTargetConsumer(const Document& input) { return cid; } -void Exchange::dispose(OperationContext* opCtx) { +void Exchange::dispose(OperationContext* opCtx, size_t consumerId) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_disposeRunDown < getConsumers()); ++_disposeRunDown; - if (_disposeRunDown == getConsumers()) { + // If _errorInLoadNextBatch status is not OK then an exception was thrown. In that case the + // throwing thread will do the dispose. + if (!_errorInLoadNextBatch.isOK()) { + if (_loadingThreadId == consumerId) { + _pipeline->dispose(opCtx); + } + } else if (_disposeRunDown == getConsumers()) { _pipeline->dispose(opCtx); } } |