diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2020-03-26 17:21:23 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-27 00:34:36 +0000 |
commit | c60e8ce46af84d273648bc49c495e44f08ea1fb0 (patch) | |
tree | cee32f1c2ba989391d2c995a79e7283156d2fde5 | |
parent | 98299f2f7eb295361cee2aea4dc03b952483d715 (diff) | |
download | mongo-c60e8ce46af84d273648bc49c495e44f08ea1fb0.tar.gz |
SERVER-47012 Make it more obvious that retryIdempotentWorkUntilSuccess will stop retrying if the node has stepped down
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 44 |
1 files changed, 27 insertions, 17 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index e85fbd1d5c5..75567a53488 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -145,14 +145,20 @@ static std::shared_ptr<ThreadPool> getMigrationUtilExecutor() { } /** - * Runs doWork until it doesn't throw an error. + * Runs doWork until it doesn't throw an error, the node is shutting down, the node has stepped + * down, or the node has stepped down and up. + * + * Note that it is not guaranteed that 'doWork' will not be executed while the node is secondary + * or after the node has stepped down and up, only that 'doWork' will eventually stop being retried + * if one of those events has happened. * * Requirements: * - doWork must be idempotent. */ -void retryIdempotentWorkUntilSuccess(OperationContext* opCtx, - StringData taskDescription, - std::function<void(OperationContext*)> doWork) { +void retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( + OperationContext* opCtx, + StringData taskDescription, + std::function<void(OperationContext*)> doWork) { const std::string newClientName = "{}-{}"_format(getThreadName(), taskDescription); const auto initialTerm = repl::ReplicationCoordinator::get(opCtx)->getTerm(); @@ -162,13 +168,17 @@ void retryIdempotentWorkUntilSuccess(OperationContext* opCtx, shutdown(waitForShutdown()); } - // If the term changed, that means that the step up recovery could have run or is running - // so stop retrying in order to avoid duplicate work. + // If the node is no longer primary, stop retrying. uassert(ErrorCodes::InterruptedDueToReplStateChange, "Stepped down while {}"_format(taskDescription), repl::ReplicationCoordinator::get(opCtx)->getMemberState() == - repl::MemberState::RS_PRIMARY && - initialTerm == repl::ReplicationCoordinator::get(opCtx)->getTerm()); + repl::MemberState::RS_PRIMARY); + + // If the term changed, that means that the step up recovery could have run or is running + // so stop retrying in order to avoid duplicate work. + uassert(ErrorCodes::InterruptedDueToReplStateChange, + "Term changed while {}"_format(taskDescription), + initialTerm == repl::ReplicationCoordinator::get(opCtx)->getTerm()); try { auto newClient = opCtx->getServiceContext()->makeClient(newClientName); @@ -494,7 +504,7 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx, } void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) { - retryIdempotentWorkUntilSuccess( + retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( opCtx, "persist migrate commit decision", [&](OperationContext* newOpCtx) { hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(newOpCtx); @@ -515,7 +525,7 @@ void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) { } void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) { - retryIdempotentWorkUntilSuccess( + retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( opCtx, "persist migrate abort decision", [&](OperationContext* newOpCtx) { hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(newOpCtx); @@ -543,7 +553,7 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, false /*multi*/); deleteOp.setDeletes({query}); - retryIdempotentWorkUntilSuccess( + retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( opCtx, "cancel range deletion on recipient", [&](OperationContext* newOpCtx) { hangInDeleteRangeDeletionOnRecipientInterruptible.pauseWhileSet(newOpCtx); @@ -565,7 +575,7 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, void deleteRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& deletionTaskId, const WriteConcernOptions& writeConcern) { - retryIdempotentWorkUntilSuccess( + retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( opCtx, "cancel local range deletion", [&](OperationContext* newOpCtx) { hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx); PersistentTaskStore<RangeDeletionTask> store(newOpCtx, @@ -594,7 +604,7 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx, updateEntry.setUpsert(false); updateOp.setUpdates({updateEntry}); - retryIdempotentWorkUntilSuccess( + retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( opCtx, "ready remote range deletion", [&](OperationContext* newOpCtx) { hangInReadyRangeDeletionOnRecipientInterruptible.pauseWhileSet(newOpCtx); @@ -631,7 +641,7 @@ void advanceTransactionOnRecipient(OperationContext* opCtx, << WriteConcernOptions::Majority << "lsid" << lsid.toBSON() << "txnNumber" << currentTxnNumber + 1); - retryIdempotentWorkUntilSuccess( + retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( opCtx, "advance migration txn number", [&](OperationContext* newOpCtx) { hangInAdvanceTxnNumInterruptible.pauseWhileSet(newOpCtx); sendToRecipient(newOpCtx, recipientId, updateOp, passthroughFields); @@ -649,7 +659,7 @@ void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& mi auto query = QUERY(RangeDeletionTask::kIdFieldName << migrationId); auto update = BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << "")); - retryIdempotentWorkUntilSuccess( + retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( opCtx, "ready local range deletion", [&](OperationContext* newOpCtx) { hangInReadyRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx); store.update(newOpCtx, query, update); @@ -682,7 +692,7 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx, const auto ensureChunkVersionIsGreaterThanRequestBSON = ensureChunkVersionIsGreaterThanRequest.toBSON({}); - retryIdempotentWorkUntilSuccess( + retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( opCtx, "ensureChunkVersionIsGreaterThan", [&](OperationContext* newOpCtx) { hangInEnsureChunkVersionIsGreaterThanInterruptible.pauseWhileSet(newOpCtx); @@ -714,7 +724,7 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx, } void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss) { - retryIdempotentWorkUntilSuccess( + retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( opCtx, "refreshFilteringMetadataUntilSuccess", [&nss](OperationContext* newOpCtx) { hangInRefreshFilteringMetadataUntilSuccessInterruptible.pauseWhileSet(newOpCtx); |