summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2020-02-13 14:14:44 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-14 12:24:50 +0000
commit876b3af1091b299884869c34a41f7f37d4dcc0bb (patch)
treed4d774e229efedb769902bedfca8dced87f1a2b9 /src/mongo/db
parentc25f0bc16a1f515f20972f30abf6bd569235a00f (diff)
downloadmongo-876b3af1091b299884869c34a41f7f37d4dcc0bb.tar.gz
SERVER-42617 Race in CloneDocumentsCatchesInsertErrors can causes it to return an unexpected error
When the batch insertion routine on the consumer thread fails, ensure that we return the proper error instead of ErrorCodes::ProducerConsumerQueueEndClosed.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp36
1 files changed, 20 insertions, 16 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 54f4a01e87b..8124f36a16f 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -442,27 +442,31 @@ repl::OpTime MigrationDestinationManager::cloneDocumentsFromDonor(
"causedBy_exceptionToStatus"_attr = causedBy(redact(exceptionToStatus())));
}
}};
- auto inserterThreadJoinGuard = makeGuard([&] {
- batches.closeProducerEnd();
- inserterThread.join();
- });
- while (true) {
- opCtx->checkForInterrupt();
- auto res = fetchBatchFn(opCtx);
-
- opCtx->checkForInterrupt();
- batches.push(res.getOwned(), opCtx);
- auto arr = res["objects"].Obj();
- if (arr.isEmpty()) {
- inserterThreadJoinGuard.dismiss();
+ {
+ auto inserterThreadJoinGuard = makeGuard([&] {
+ batches.closeProducerEnd();
inserterThread.join();
- opCtx->checkForInterrupt();
- break;
+ });
+
+ while (true) {
+ auto res = fetchBatchFn(opCtx);
+ try {
+ batches.push(res.getOwned(), opCtx);
+ auto arr = res["objects"].Obj();
+ if (arr.isEmpty()) {
+ break;
+ }
+ } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>&) {
+ break;
+ }
}
- }
+ } // This scope ensures that the guard is destroyed
+ // This check is necessary because the consumer thread uses killOp to propagate errors to the
+ // producer thread (this thread)
+ opCtx->checkForInterrupt();
return lastOpApplied;
}