summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2018-10-09 11:37:04 -0400
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-10-10 13:32:08 -0400
commitbeda255a13b61570683968d6ac18a02f1d9bf765 (patch)
treebd545566af4000cdc74f7823ad0f555a4ce86445
parent670963110d9d226824842d22540a79154fce59a1 (diff)
downloadmongo-beda255a13b61570683968d6ac18a02f1d9bf765.tar.gz
SERVER-37525 Error handling in the exchange can cause invariant failures in detach/reattach
-rw-r--r--jstests/noPassthroughWithMongod/exchangeProducer.js56
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.cpp48
-rw-r--r--src/mongo/db/pipeline/document_source_exchange.h8
3 files changed, 98 insertions, 14 deletions
diff --git a/jstests/noPassthroughWithMongod/exchangeProducer.js b/jstests/noPassthroughWithMongod/exchangeProducer.js
index f2168232103..c0cc8343448 100644
--- a/jstests/noPassthroughWithMongod/exchangeProducer.js
+++ b/jstests/noPassthroughWithMongod/exchangeProducer.js
@@ -40,6 +40,22 @@ TestData.disableImplicitSessions = true;
return shell;
}
+ /**
+ * A consumer runs in a parallel shell reading the cursor expecting an error.
+ *
+ * @param {Object} cursor - the cursor that a consumer will read
+ * @param {int} code - the expected error code
+ */
+ function failingConsumer(cursor, code) {
+ let shell = startParallelShell(`{
+ const dbCursor = new DBCommandCursor(db, ${tojsononeline(cursor)});
+ const cmdRes = db.runCommand({getMore: dbCursor._cursorid, collection: dbCursor._collName});
+ assert.commandFailedWithCode(cmdRes, ${code});
+ }`);
+
+ return shell;
+ }
+
const numConsumers = 4;
// For simplicity we assume that we can evenly distribute documents among consumers.
assert.eq(0, numDocs % numConsumers);
@@ -240,4 +256,44 @@ TestData.disableImplicitSessions = true;
parallelShells[i]();
}
})();
+
+ /**
+ * Range - simulate an exception in loading the batch.
+ */
+ (function testRangeFailLoad() {
+ const kFailPointName = "exchangeFailLoadNextBatch";
+ try {
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: kFailPointName, mode: "alwaysOn"}));
+
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ exchange: {
+ policy: "keyRange",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(1024),
+ key: {a: 1},
+ boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}],
+ consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
+ },
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ // All consumers will see the exchange fail error.
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells.push(failingConsumer(res.cursors[i], ErrorCodes.FailPointEnabled));
+ }
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+ } finally {
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: kFailPointName, mode: "off"}));
+ }
+ })();
+
})();
diff --git a/src/mongo/db/pipeline/document_source_exchange.cpp b/src/mongo/db/pipeline/document_source_exchange.cpp
index ad44df7df14..9aaeb851387 100644
--- a/src/mongo/db/pipeline/document_source_exchange.cpp
+++ b/src/mongo/db/pipeline/document_source_exchange.cpp
@@ -41,6 +41,8 @@
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(exchangeFailLoadNextBatch);
+
constexpr size_t Exchange::kMaxBufferSize;
constexpr size_t Exchange::kMaxNumberConsumers;
@@ -230,6 +232,10 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t
stdx::unique_lock<stdx::mutex> lk(_mutex);
for (;;) {
+ // Execute only in case we have not encountered an error.
+ uassertStatusOKWithContext(_errorInLoadNextBatch,
+ "Exchange failed due to an error on different thread.");
+
// Check if we have a document.
if (!_consumers[consumerId]->isEmpty()) {
auto doc = _consumers[consumerId]->getNext();
@@ -247,13 +253,9 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t
if (_loadingThreadId == kInvalidThreadId) {
LOG(3) << "A consumer " << consumerId << " begins loading";
- // This consumer won the race and will fill the buffers.
- _loadingThreadId = consumerId;
-
- {
- // Make sure we detach the context even when exceptions are thrown; we wrap the
- // detach in the guard.
- ON_BLOCK_EXIT([this] { _pipeline->detachFromOperationContext(); });
+ try {
+ // This consumer won the race and will fill the buffers.
+ _loadingThreadId = consumerId;
_pipeline->reattachToOperationContext(opCtx);
@@ -262,13 +264,29 @@ DocumentSource::GetNextResult Exchange::getNext(OperationContext* opCtx, size_t
// The return value is an index of a full consumer buffer.
size_t fullConsumerId = loadNextBatch();
+ if (MONGO_FAIL_POINT(exchangeFailLoadNextBatch)) {
+ log() << "exchangeFailLoadNextBatch fail point enabled.";
+ uasserted(ErrorCodes::FailPointEnabled,
+ "Asserting on loading the next batch due to failpoint.");
+ }
+
+ _pipeline->detachFromOperationContext();
+
// The loading cannot continue until the consumer with the full buffer consumes some
// documents.
_loadingThreadId = fullConsumerId;
- }
- // Wake up everybody and try to make some progress.
- _haveBufferSpace.notify_all();
+ // Wake up everybody and try to make some progress.
+ _haveBufferSpace.notify_all();
+ } catch (const DBException& ex) {
+ _errorInLoadNextBatch = ex.toStatus();
+
+ // We have to wake up all other blocked threads so they can detect the error and
+ // fail too. They can be woken up only after _errorInLoadNextBatch has been set.
+ _haveBufferSpace.notify_all();
+
+ throw;
+ }
} else {
// Some other consumer is already loading the buffers. There is nothing else we can do
// but wait.
@@ -361,14 +379,20 @@ size_t Exchange::getTargetConsumer(const Document& input) {
return cid;
}
-void Exchange::dispose(OperationContext* opCtx) {
+void Exchange::dispose(OperationContext* opCtx, size_t consumerId) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_disposeRunDown < getConsumers());
++_disposeRunDown;
- if (_disposeRunDown == getConsumers()) {
+ // If _errorInLoadNextBatch status is not OK then an exception was thrown. In that case the
+ // throwing thread will do the dispose.
+ if (!_errorInLoadNextBatch.isOK()) {
+ if (_loadingThreadId == consumerId) {
+ _pipeline->dispose(opCtx);
+ }
+ } else if (_disposeRunDown == getConsumers()) {
_pipeline->dispose(opCtx);
}
}
diff --git a/src/mongo/db/pipeline/document_source_exchange.h b/src/mongo/db/pipeline/document_source_exchange.h
index 14f6943a677..5e77ef1b3e8 100644
--- a/src/mongo/db/pipeline/document_source_exchange.h
+++ b/src/mongo/db/pipeline/document_source_exchange.h
@@ -81,7 +81,7 @@ public:
return _spec;
}
- void dispose(OperationContext* opCtx);
+ void dispose(OperationContext* opCtx, size_t consumerId);
private:
size_t loadNextBatch();
@@ -143,6 +143,10 @@ private:
// A thread that is currently loading the exchange buffers.
size_t _loadingThreadId{kInvalidThreadId};
+ // A status indicating that the exception was thrown during loadNextBatch(). Once in the failed
+ // state all other producing threads will fail too.
+ Status _errorInLoadNextBatch{Status::OK()};
+
size_t _roundRobinCounter{0};
// A rundown counter of consumers disposing of the pipelines. Only the last consumer will
@@ -192,7 +196,7 @@ public:
}
void doDispose() final {
- _exchange->dispose(pExpCtx->opCtx);
+ _exchange->dispose(pExpCtx->opCtx, _consumerId);
}
auto getConsumerId() const {