summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_exchange.cpp
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2018-10-09 11:37:04 -0400
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-10-10 13:32:08 -0400
commitbeda255a13b61570683968d6ac18a02f1d9bf765 (patch)
treebd545566af4000cdc74f7823ad0f555a4ce86445 /src/mongo/db/pipeline/document_source_exchange.cpp
parent670963110d9d226824842d22540a79154fce59a1 (diff)
downloadmongo-beda255a13b61570683968d6ac18a02f1d9bf765.tar.gz
SERVER-37525 Error handling in the exchange can cause invariant failures in detach/reattach
Diffstat (limited to 'src/mongo/db/pipeline/document_source_exchange.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.cpp48
1 files changed, 36 insertions, 12 deletions
diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp
index ad44df7df14..9aaeb851387 100644
--- a/src/mongo/db/pipeline/document_source_exchange.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange.cpp
@@ -41,6 +41,8 @@
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(exchangeFailLoadNextBatch);
+
constexpr size_t Exchange::kMaxBufferSize;
constexpr size_t Exchange::kMaxNumberConsumers;
@@ -230,6 +232,10 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t
stdx::unique_lock<stdx::mutex> lk(_mutex);
for (;;) {
+ // Execute only in case we have not encountered an error.
+ uassertStatusOKWithContext(_errorInLoadNextBatch,
+ "Exchange failed due to an error on different thread.");
+
// Check if we have a document.
if (!_consumers[consumerId]->isEmpty()) {
auto doc = _consumers[consumerId]->getNext();
@@ -247,13 +253,9 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t
if (_loadingThreadId == kInvalidThreadId) {
LOG(3) << "A consumer " << consumerId << " begins loading";
- // This consumer won the race and will fill the buffers.
- _loadingThreadId = consumerId;
-
- {
- // Make sure we detach the context even when exceptions are thrown; we wrap the
- // detach in the guard.
- ON_BLOCK_EXIT([this] { _pipeline->detachFromOperationContext(); });
+ try {
+ // This consumer won the race and will fill the buffers.
+ _loadingThreadId = consumerId;
_pipeline->reattachToOperationContext(opCtx);
@@ -262,13 +264,29 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t
// The return value is an index of a full consumer buffer.
size_t fullConsumerId = loadNextBatch();
+ if (MONGO_FAIL_POINT(exchangeFailLoadNextBatch)) {
+ log() << "exchangeFailLoadNextBatch fail point enabled.";
+ uasserted(ErrorCodes::FailPointEnabled,
+ "Asserting on loading the next batch due to failpoint.");
+ }
+
+ _pipeline->detachFromOperationContext();
+
// The loading cannot continue until the consumer with the full buffer consumes some
// documents.
_loadingThreadId = fullConsumerId;
- }
- // Wake up everybody and try to make some progress.
- _haveBufferSpace.notify_all();
+ // Wake up everybody and try to make some progress.
+ _haveBufferSpace.notify_all();
+ } catch (const DBException& ex) {
+ _errorInLoadNextBatch = ex.toStatus();
+
+ // We have to wake up all other blocked threads so they can detect the error and
+ // fail too. They can be woken up only after _errorInLoadNextBatch has been set.
+ _haveBufferSpace.notify_all();
+
+ throw;
+ }
} else {
// Some other consumer is already loading the buffers. There is nothing else we can do
// but wait.
@@ -361,14 +379,20 @@ size_t Exchange::getTargetConsumer(const Document& input) {
return cid;
}
-void Exchange::dispose(OperationContext* opCtx) {
+void Exchange::dispose(OperationContext* opCtx, size_t consumerId) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_disposeRunDown < getConsumers());
++_disposeRunDown;
- if (_disposeRunDown == getConsumers()) {
+ // If _errorInLoadNextBatch status is not OK then an exception was thrown. In that case the
+ // throwing thread will do the dispose.
+ if (!_errorInLoadNextBatch.isOK()) {
+ if (_loadingThreadId == consumerId) {
+ _pipeline->dispose(opCtx);
+ }
+ } else if (_disposeRunDown == getConsumers()) {
_pipeline->dispose(opCtx);
}
}