diff options
author | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-10-09 11:37:04 -0400 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-10-10 13:32:08 -0400 |
commit | beda255a13b61570683968d6ac18a02f1d9bf765 (patch) | |
tree | bd545566af4000cdc74f7823ad0f555a4ce86445 | |
parent | 670963110d9d226824842d22540a79154fce59a1 (diff) | |
download | mongo-beda255a13b61570683968d6ac18a02f1d9bf765.tar.gz |
SERVER-37525 Error handling in the exchange can cause invariant failures in detach/reattach
-rw-r--r-- | jstests/noPassthroughWithMongod/exchangeProducer.js | 56 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_exchange.h | 8 |
3 files changed, 98 insertions, 14 deletions
diff --git a/jstests/noPassthroughWithMongod/exchangeProducer.js b/jstests/noPassthroughWithMongod/exchangeProducer.js index f2168232103..c0cc8343448 100644 --- a/jstests/noPassthroughWithMongod/exchangeProducer.js +++ b/jstests/noPassthroughWithMongod/exchangeProducer.js @@ -40,6 +40,22 @@ TestData.disableImplicitSessions = true; return shell; } + /** + * A consumer runs in a parallel shell reading the cursor expecting an error. + * + * @param {Object} cursor - the cursor that a consumer will read + * @param {int} code - the expected error code + */ + function failingConsumer(cursor, code) { + let shell = startParallelShell(`{ + const dbCursor = new DBCommandCursor(db, ${tojsononeline(cursor)}); + const cmdRes = db.runCommand({getMore: dbCursor._cursorid, collection: dbCursor._collName}); + assert.commandFailedWithCode(cmdRes, ${code}); + }`); + + return shell; + } + const numConsumers = 4; // For simplicity we assume that we can evenly distribute documents among consumers. assert.eq(0, numDocs % numConsumers); @@ -240,4 +256,44 @@ TestData.disableImplicitSessions = true; parallelShells[i](); } })(); + + /** + * Range - simulate an exception in loading the batch. + */ + (function testRangeFailLoad() { + const kFailPointName = "exchangeFailLoadNextBatch"; + try { + assert.commandWorked( + db.adminCommand({configureFailPoint: kFailPointName, mode: "alwaysOn"})); + + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + exchange: { + policy: "keyRange", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(1024), + key: {a: 1}, + boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}], + consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] + }, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + // All consumers will see the exchange fail error. + for (let i = 0; i < numConsumers; ++i) { + parallelShells.push(failingConsumer(res.cursors[i], ErrorCodes.FailPointEnabled)); + } + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } + } finally { + assert.commandWorked( + db.adminCommand({configureFailPoint: kFailPointName, mode: "off"})); + } + })(); + })(); diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp index ad44df7df14..9aaeb851387 100644 --- a/src/mongo/db/pipeline/document_source_exchange.cpp +++ b/src/mongo/db/pipeline/document_source_exchange.cpp @@ -41,6 +41,8 @@ namespace mongo { +MONGO_FAIL_POINT_DEFINE(exchangeFailLoadNextBatch); + constexpr size_t Exchange::kMaxBufferSize; constexpr size_t Exchange::kMaxNumberConsumers; @@ -230,6 +232,10 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t stdx::unique_lock<stdx::mutex> lk(_mutex); for (;;) { + // Execute only in case we have not encountered an error. + uassertStatusOKWithContext(_errorInLoadNextBatch, + "Exchange failed due to an error on different thread."); + // Check if we have a document. if (!_consumers[consumerId]->isEmpty()) { auto doc = _consumers[consumerId]->getNext(); @@ -247,13 +253,9 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t if (_loadingThreadId == kInvalidThreadId) { LOG(3) << "A consumer " << consumerId << " begins loading"; - // This consumer won the race and will fill the buffers. - _loadingThreadId = consumerId; - - { - // Make sure we detach the context even when exceptions are thrown; we wrap the - // detach in the guard. - ON_BLOCK_EXIT([this] { _pipeline->detachFromOperationContext(); }); + try { + // This consumer won the race and will fill the buffers. + _loadingThreadId = consumerId; _pipeline->reattachToOperationContext(opCtx); @@ -262,13 +264,29 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t // The return value is an index of a full consumer buffer. size_t fullConsumerId = loadNextBatch(); + if (MONGO_FAIL_POINT(exchangeFailLoadNextBatch)) { + log() << "exchangeFailLoadNextBatch fail point enabled."; + uasserted(ErrorCodes::FailPointEnabled, + "Asserting on loading the next batch due to failpoint."); + } + + _pipeline->detachFromOperationContext(); + // The loading cannot continue until the consumer with the full buffer consumes some // documents. _loadingThreadId = fullConsumerId; - } - // Wake up everybody and try to make some progress. - _haveBufferSpace.notify_all(); + // Wake up everybody and try to make some progress. + _haveBufferSpace.notify_all(); + } catch (const DBException& ex) { + _errorInLoadNextBatch = ex.toStatus(); + + // We have to wake up all other blocked threads so they can detect the error and + // fail too. They can be woken up only after _errorInLoadNextBatch has been set. + _haveBufferSpace.notify_all(); + + throw; + } } else { // Some other consumer is already loading the buffers. There is nothing else we can do // but wait. @@ -361,14 +379,20 @@ size_t Exchange::getTargetConsumer(const Document& input) { return cid; } -void Exchange::dispose(OperationContext* opCtx) { +void Exchange::dispose(OperationContext* opCtx, size_t consumerId) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_disposeRunDown < getConsumers()); ++_disposeRunDown; - if (_disposeRunDown == getConsumers()) { + // If _errorInLoadNextBatch status is not OK then an exception was thrown. In that case the + // throwing thread will do the dispose. + if (!_errorInLoadNextBatch.isOK()) { + if (_loadingThreadId == consumerId) { + _pipeline->dispose(opCtx); + } + } else if (_disposeRunDown == getConsumers()) { _pipeline->dispose(opCtx); } } diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h index 14f6943a677..5e77ef1b3e8 100644 --- a/src/mongo/db/pipeline/document_source_exchange.h +++ b/src/mongo/db/pipeline/document_source_exchange.h @@ -81,7 +81,7 @@ public: return _spec; } - void dispose(OperationContext* opCtx); + void dispose(OperationContext* opCtx, size_t consumerId); private: size_t loadNextBatch(); @@ -143,6 +143,10 @@ private: // A thread that is currently loading the exchange buffers. size_t _loadingThreadId{kInvalidThreadId}; + // A status indicating that the exception was thrown during loadNextBatch(). Once in the failed + // state all other producing threads will fail too. + Status _errorInLoadNextBatch{Status::OK()}; + size_t _roundRobinCounter{0}; // A rundown counter of consumers disposing of the pipelines. Only the last consumer will @@ -192,7 +196,7 @@ public: } void doDispose() final { - _exchange->dispose(pExpCtx->opCtx); + _exchange->dispose(pExpCtx->opCtx, _consumerId); } auto getConsumerId() const { |