summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2020-03-26 17:21:23 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-27 00:34:36 +0000
commitc60e8ce46af84d273648bc49c495e44f08ea1fb0 (patch)
treecee32f1c2ba989391d2c995a79e7283156d2fde5
parent98299f2f7eb295361cee2aea4dc03b952483d715 (diff)
downloadmongo-c60e8ce46af84d273648bc49c495e44f08ea1fb0.tar.gz
SERVER-47012 Make it more obvious that retryIdempotentWorkUntilSuccess will stop retrying if the node has stepped down
-rw-r--r--src/mongo/db/s/migration_util.cpp44
1 files changed, 27 insertions, 17 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index e85fbd1d5c5..75567a53488 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -145,14 +145,20 @@ static std::shared_ptr<ThreadPool> getMigrationUtilExecutor() {
}
/**
- * Runs doWork until it doesn't throw an error.
+ * Runs doWork until it doesn't throw an error, the node is shutting down, the node has stepped
+ * down, or the node has stepped down and up.
+ *
+ * Note that it is not guaranteed that 'doWork' will not be executed while the node is secondary
+ * or after the node has stepped down and up, only that 'doWork' will eventually stop being retried
+ * if one of those events has happened.
*
* Requirements:
* - doWork must be idempotent.
*/
-void retryIdempotentWorkUntilSuccess(OperationContext* opCtx,
- StringData taskDescription,
- std::function<void(OperationContext*)> doWork) {
+void retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
+ OperationContext* opCtx,
+ StringData taskDescription,
+ std::function<void(OperationContext*)> doWork) {
const std::string newClientName = "{}-{}"_format(getThreadName(), taskDescription);
const auto initialTerm = repl::ReplicationCoordinator::get(opCtx)->getTerm();
@@ -162,13 +168,17 @@ void retryIdempotentWorkUntilSuccess(OperationContext* opCtx,
shutdown(waitForShutdown());
}
- // If the term changed, that means that the step up recovery could have run or is running
- // so stop retrying in order to avoid duplicate work.
+ // If the node is no longer primary, stop retrying.
uassert(ErrorCodes::InterruptedDueToReplStateChange,
"Stepped down while {}"_format(taskDescription),
repl::ReplicationCoordinator::get(opCtx)->getMemberState() ==
- repl::MemberState::RS_PRIMARY &&
- initialTerm == repl::ReplicationCoordinator::get(opCtx)->getTerm());
+ repl::MemberState::RS_PRIMARY);
+
+ // If the term changed, that means that the step up recovery could have run or is running
+ // so stop retrying in order to avoid duplicate work.
+ uassert(ErrorCodes::InterruptedDueToReplStateChange,
+ "Term changed while {}"_format(taskDescription),
+ initialTerm == repl::ReplicationCoordinator::get(opCtx)->getTerm());
try {
auto newClient = opCtx->getServiceContext()->makeClient(newClientName);
@@ -494,7 +504,7 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx,
}
void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) {
- retryIdempotentWorkUntilSuccess(
+ retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
opCtx, "persist migrate commit decision", [&](OperationContext* newOpCtx) {
hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(newOpCtx);
@@ -515,7 +525,7 @@ void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) {
}
void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) {
- retryIdempotentWorkUntilSuccess(
+ retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
opCtx, "persist migrate abort decision", [&](OperationContext* newOpCtx) {
hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(newOpCtx);
@@ -543,7 +553,7 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
false /*multi*/);
deleteOp.setDeletes({query});
- retryIdempotentWorkUntilSuccess(
+ retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
opCtx, "cancel range deletion on recipient", [&](OperationContext* newOpCtx) {
hangInDeleteRangeDeletionOnRecipientInterruptible.pauseWhileSet(newOpCtx);
@@ -565,7 +575,7 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
void deleteRangeDeletionTaskLocally(OperationContext* opCtx,
const UUID& deletionTaskId,
const WriteConcernOptions& writeConcern) {
- retryIdempotentWorkUntilSuccess(
+ retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
opCtx, "cancel local range deletion", [&](OperationContext* newOpCtx) {
hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx);
PersistentTaskStore<RangeDeletionTask> store(newOpCtx,
@@ -594,7 +604,7 @@ void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx,
updateEntry.setUpsert(false);
updateOp.setUpdates({updateEntry});
- retryIdempotentWorkUntilSuccess(
+ retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
opCtx, "ready remote range deletion", [&](OperationContext* newOpCtx) {
hangInReadyRangeDeletionOnRecipientInterruptible.pauseWhileSet(newOpCtx);
@@ -631,7 +641,7 @@ void advanceTransactionOnRecipient(OperationContext* opCtx,
<< WriteConcernOptions::Majority << "lsid" << lsid.toBSON()
<< "txnNumber" << currentTxnNumber + 1);
- retryIdempotentWorkUntilSuccess(
+ retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
opCtx, "advance migration txn number", [&](OperationContext* newOpCtx) {
hangInAdvanceTxnNumInterruptible.pauseWhileSet(newOpCtx);
sendToRecipient(newOpCtx, recipientId, updateOp, passthroughFields);
@@ -649,7 +659,7 @@ void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& mi
auto query = QUERY(RangeDeletionTask::kIdFieldName << migrationId);
auto update = BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << ""));
- retryIdempotentWorkUntilSuccess(
+ retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
opCtx, "ready local range deletion", [&](OperationContext* newOpCtx) {
hangInReadyRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx);
store.update(newOpCtx, query, update);
@@ -682,7 +692,7 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx,
const auto ensureChunkVersionIsGreaterThanRequestBSON =
ensureChunkVersionIsGreaterThanRequest.toBSON({});
- retryIdempotentWorkUntilSuccess(
+ retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
opCtx, "ensureChunkVersionIsGreaterThan", [&](OperationContext* newOpCtx) {
hangInEnsureChunkVersionIsGreaterThanInterruptible.pauseWhileSet(newOpCtx);
@@ -714,7 +724,7 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx,
}
void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss) {
- retryIdempotentWorkUntilSuccess(
+ retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
opCtx, "refreshFilteringMetadataUntilSuccess", [&nss](OperationContext* newOpCtx) {
hangInRefreshFilteringMetadataUntilSuccessInterruptible.pauseWhileSet(newOpCtx);