diff options
author | Tommaso Tocci <tommaso.tocci@mongodb.com> | 2020-02-13 14:14:44 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-14 12:24:50 +0000 |
commit | 876b3af1091b299884869c34a41f7f37d4dcc0bb (patch) | |
tree | d4d774e229efedb769902bedfca8dced87f1a2b9 /src/mongo/db | |
parent | c25f0bc16a1f515f20972f30abf6bd569235a00f (diff) | |
download | mongo-876b3af1091b299884869c34a41f7f37d4dcc0bb.tar.gz |
SERVER-42617 Race in CloneDocumentsCatchesInsertErrors can causes it to return an unexpected error
When the batch insertion routine on the consumer thread fails,
ensure that we return the proper error instead of
ErrorCodes::ProducerConsumerQueueEndClosed.
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 36 |
1 files changed, 20 insertions, 16 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 54f4a01e87b..8124f36a16f 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -442,27 +442,31 @@ repl::OpTime MigrationDestinationManager::cloneDocumentsFromDonor( "causedBy_exceptionToStatus"_attr = causedBy(redact(exceptionToStatus()))); } }}; - auto inserterThreadJoinGuard = makeGuard([&] { - batches.closeProducerEnd(); - inserterThread.join(); - }); - while (true) { - opCtx->checkForInterrupt(); - auto res = fetchBatchFn(opCtx); - - opCtx->checkForInterrupt(); - batches.push(res.getOwned(), opCtx); - auto arr = res["objects"].Obj(); - if (arr.isEmpty()) { - inserterThreadJoinGuard.dismiss(); + { + auto inserterThreadJoinGuard = makeGuard([&] { + batches.closeProducerEnd(); inserterThread.join(); - opCtx->checkForInterrupt(); - break; + }); + + while (true) { + auto res = fetchBatchFn(opCtx); + try { + batches.push(res.getOwned(), opCtx); + auto arr = res["objects"].Obj(); + if (arr.isEmpty()) { + break; + } + } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>&) { + break; + } } - } + } // This scope ensures that the guard is destroyed + // This check is necessary because the consumer thread uses killOp to propagate errors to the + // producer thread (this thread) + opCtx->checkForInterrupt(); return lastOpApplied; } |