diff options
author | Marcos José Grillo Ramírez <marcos.grillo@mongodb.com> | 2021-01-08 18:31:31 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-08 18:59:59 +0000 |
commit | 40aa110c655b6a3b562881c63d14a83c0848b3a0 (patch) | |
tree | c1256aceb44955b2fb11722f46e88d3afb9a9dd6 /src/mongo/db/s/migration_util.cpp | |
parent | 9e1f0ea4f371a8101f96c84d2ecd3811d68cafb6 (diff) | |
download | mongo-40aa110c655b6a3b562881c63d14a83c0848b3a0.tar.gz |
SERVER-50890 Improve failover management when completing migrations
Diffstat (limited to 'src/mongo/db/s/migration_util.cpp')
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 79 |
1 files changed, 36 insertions, 43 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 01651ece011..26d1abcbb48 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -583,15 +583,18 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx, } } -void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) { +void persistCommitDecision(OperationContext* opCtx, + const MigrationCoordinatorDocument& migrationDoc) { + invariant(migrationDoc.getDecision() && + *migrationDoc.getDecision() == DecisionEnum::kCommitted); + hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(opCtx); PersistentTaskStore<MigrationCoordinatorDocument> store( NamespaceString::kMigrationCoordinatorsNamespace); - store.update( - opCtx, - QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), - BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName << "committed"))); + store.upsert(opCtx, + QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationDoc.getId()), + migrationDoc.toBSON()); if (hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.shouldFail()) { hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); @@ -600,25 +603,23 @@ void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) { } } -void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) { - retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( - opCtx, "persist migrate abort decision", [&](OperationContext* newOpCtx) { - hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(newOpCtx); - - PersistentTaskStore<MigrationCoordinatorDocument> store( - NamespaceString::kMigrationCoordinatorsNamespace); - store.update(newOpCtx, - QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), - BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName - << "aborted"))); - - if (hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.shouldFail()) { - hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.pauseWhileSet( - newOpCtx); - uasserted(ErrorCodes::InternalError, - "simulate an error response when persisting migrate abort decision"); - } - }); +void persistAbortDecision(OperationContext* opCtx, + const MigrationCoordinatorDocument& migrationDoc) { + invariant(migrationDoc.getDecision() && *migrationDoc.getDecision() == DecisionEnum::kAborted); + + hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(opCtx); + + PersistentTaskStore<MigrationCoordinatorDocument> store( + NamespaceString::kMigrationCoordinatorsNamespace); + store.upsert(opCtx, + QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationDoc.getId()), + migrationDoc.toBSON()); + + if (hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.shouldFail()) { + hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); + uasserted(ErrorCodes::InternalError, + "simulate an error response when persisting migrate abort decision"); + } } void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, @@ -646,20 +647,15 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, void deleteRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& deletionTaskId, const WriteConcernOptions& writeConcern) { - retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( - opCtx, "cancel local range deletion", [&](OperationContext* newOpCtx) { - hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx); - PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); - store.remove( - newOpCtx, QUERY(RangeDeletionTask::kIdFieldName << deletionTaskId), writeConcern); - - if (hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) { - hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet( - newOpCtx); - uasserted(ErrorCodes::InternalError, - "simulate an error response when deleting range deletion locally"); - } - }); + hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(opCtx); + PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); + store.remove(opCtx, QUERY(RangeDeletionTask::kIdFieldName << deletionTaskId), writeConcern); + + if (hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) { + hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); + uasserted(ErrorCodes::InternalError, + "simulate an error response when deleting range deletion locally"); + } } void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx, @@ -891,9 +887,6 @@ void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss) if (doc.getDecision()) { // The decision is already known. - coordinator.setMigrationDecision((*doc.getDecision()) == DecisionEnum::kCommitted - ? MigrationCoordinator::Decision::kCommitted - : MigrationCoordinator::Decision::kAborted); coordinator.completeMigration(opCtx); return true; } @@ -957,9 +950,9 @@ void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss) } if (currentMetadata.keyBelongsToMe(doc.getRange().getMin())) { - coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted); + coordinator.setMigrationDecision(DecisionEnum::kAborted); } else { - coordinator.setMigrationDecision(MigrationCoordinator::Decision::kCommitted); + coordinator.setMigrationDecision(DecisionEnum::kCommitted); } coordinator.completeMigration(opCtx); |