summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_util.cpp
diff options
context:
space:
mode:
authorMarcos José Grillo Ramírez <marcos.grillo@mongodb.com>2021-01-08 18:31:31 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-08 18:59:59 +0000
commit40aa110c655b6a3b562881c63d14a83c0848b3a0 (patch)
treec1256aceb44955b2fb11722f46e88d3afb9a9dd6 /src/mongo/db/s/migration_util.cpp
parent9e1f0ea4f371a8101f96c84d2ecd3811d68cafb6 (diff)
downloadmongo-40aa110c655b6a3b562881c63d14a83c0848b3a0.tar.gz
SERVER-50890 Improve failover management when completing migrations
Diffstat (limited to 'src/mongo/db/s/migration_util.cpp')
-rw-r--r--src/mongo/db/s/migration_util.cpp79
1 files changed, 36 insertions, 43 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 01651ece011..26d1abcbb48 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -583,15 +583,18 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx,
}
}
-void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) {
+void persistCommitDecision(OperationContext* opCtx,
+ const MigrationCoordinatorDocument& migrationDoc) {
+ invariant(migrationDoc.getDecision() &&
+ *migrationDoc.getDecision() == DecisionEnum::kCommitted);
+
hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(opCtx);
PersistentTaskStore<MigrationCoordinatorDocument> store(
NamespaceString::kMigrationCoordinatorsNamespace);
- store.update(
- opCtx,
- QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId),
- BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName << "committed")));
+ store.upsert(opCtx,
+ QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationDoc.getId()),
+ migrationDoc.toBSON());
if (hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.shouldFail()) {
hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
@@ -600,25 +603,23 @@ void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) {
}
}
-void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) {
- retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
- opCtx, "persist migrate abort decision", [&](OperationContext* newOpCtx) {
- hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(newOpCtx);
-
- PersistentTaskStore<MigrationCoordinatorDocument> store(
- NamespaceString::kMigrationCoordinatorsNamespace);
- store.update(newOpCtx,
- QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId),
- BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName
- << "aborted")));
-
- if (hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.shouldFail()) {
- hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.pauseWhileSet(
- newOpCtx);
- uasserted(ErrorCodes::InternalError,
- "simulate an error response when persisting migrate abort decision");
- }
- });
+void persistAbortDecision(OperationContext* opCtx,
+ const MigrationCoordinatorDocument& migrationDoc) {
+ invariant(migrationDoc.getDecision() && *migrationDoc.getDecision() == DecisionEnum::kAborted);
+
+ hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(opCtx);
+
+ PersistentTaskStore<MigrationCoordinatorDocument> store(
+ NamespaceString::kMigrationCoordinatorsNamespace);
+ store.upsert(opCtx,
+ QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationDoc.getId()),
+ migrationDoc.toBSON());
+
+ if (hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.shouldFail()) {
+ hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response when persisting migrate abort decision");
+ }
}
void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
@@ -646,20 +647,15 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx,
void deleteRangeDeletionTaskLocally(OperationContext* opCtx,
const UUID& deletionTaskId,
const WriteConcernOptions& writeConcern) {
- retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown(
- opCtx, "cancel local range deletion", [&](OperationContext* newOpCtx) {
- hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx);
- PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
- store.remove(
- newOpCtx, QUERY(RangeDeletionTask::kIdFieldName << deletionTaskId), writeConcern);
-
- if (hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) {
- hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet(
- newOpCtx);
- uasserted(ErrorCodes::InternalError,
- "simulate an error response when deleting range deletion locally");
- }
- });
+ hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(opCtx);
+ PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace);
+ store.remove(opCtx, QUERY(RangeDeletionTask::kIdFieldName << deletionTaskId), writeConcern);
+
+ if (hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) {
+ hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet(opCtx);
+ uasserted(ErrorCodes::InternalError,
+ "simulate an error response when deleting range deletion locally");
+ }
}
void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx,
@@ -891,9 +887,6 @@ void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss)
if (doc.getDecision()) {
// The decision is already known.
- coordinator.setMigrationDecision((*doc.getDecision()) == DecisionEnum::kCommitted
- ? MigrationCoordinator::Decision::kCommitted
- : MigrationCoordinator::Decision::kAborted);
coordinator.completeMigration(opCtx);
return true;
}
@@ -957,9 +950,9 @@ void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss)
}
if (currentMetadata.keyBelongsToMe(doc.getRange().getMin())) {
- coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted);
+ coordinator.setMigrationDecision(DecisionEnum::kAborted);
} else {
- coordinator.setMigrationDecision(MigrationCoordinator::Decision::kCommitted);
+ coordinator.setMigrationDecision(DecisionEnum::kCommitted);
}
coordinator.completeMigration(opCtx);