diff options
author | Marcos José Grillo Ramírez <marcos.grillo@mongodb.com> | 2021-01-08 18:31:31 +0100 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-08 18:59:59 +0000 |
commit | 40aa110c655b6a3b562881c63d14a83c0848b3a0 (patch) | |
tree | c1256aceb44955b2fb11722f46e88d3afb9a9dd6 /src/mongo/db/s | |
parent | 9e1f0ea4f371a8101f96c84d2ecd3811d68cafb6 (diff) | |
download | mongo-40aa110c655b6a3b562881c63d14a83c0848b3a0.tar.gz |
SERVER-50890 Improve failover management when completing migrations
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/migration_coordinator.cpp | 48 | ||||
-rw-r--r-- | src/mongo/db/s/migration_coordinator.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 115 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 79 | ||||
-rw-r--r-- | src/mongo/db/s/migration_util.h | 6 |
6 files changed, 128 insertions, 129 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp index d72311a3369..4cc266fa0e3 100644 --- a/src/mongo/db/s/migration_coordinator.cpp +++ b/src/mongo/db/s/migration_coordinator.cpp @@ -105,10 +105,8 @@ TxnNumber MigrationCoordinator::getTxnNumber() const { } void MigrationCoordinator::startMigration(OperationContext* opCtx) { - LOGV2_DEBUG(23889, - 2, - "Persisting migration coordinator doc", - "migrationDoc"_attr = _migrationInfo.toBSON()); + LOGV2_DEBUG( + 23889, 2, "Persisting migration coordinator doc", "migrationDoc"_attr = _migrationInfo); migrationutil::persistMigrationCoordinatorLocally(opCtx, _migrationInfo); LOGV2_DEBUG(23890, @@ -127,19 +125,20 @@ void MigrationCoordinator::startMigration(OperationContext* opCtx) { opCtx, donorDeletionTask, WriteConcerns::kMajorityWriteConcern); } -void MigrationCoordinator::setMigrationDecision(Decision decision) { +void MigrationCoordinator::setMigrationDecision(DecisionEnum decision) { LOGV2_DEBUG(23891, 2, "MigrationCoordinator setting migration decision to {decision}", "MigrationCoordinator setting migration decision", - "decision"_attr = (decision == Decision::kCommitted ? "committed" : "aborted"), + "decision"_attr = (decision == DecisionEnum::kCommitted ? "committed" : "aborted"), "migrationId"_attr = _migrationInfo.getId()); - _decision = decision; + _migrationInfo.setDecision(decision); } boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(OperationContext* opCtx) { - if (!_decision) { + auto decision = _migrationInfo.getDecision(); + if (!decision) { LOGV2( 23892, "Migration completed without setting a decision. This node might have " @@ -153,17 +152,17 @@ boost::optional<SemiFuture<void>> MigrationCoordinator::completeMigration(Operat LOGV2(23893, "MigrationCoordinator delivering decision {decision} to self and to recipient", "MigrationCoordinator delivering decision to self and to recipient", - "decision"_attr = (_decision == Decision::kCommitted ? "committed" : "aborted"), + "decision"_attr = (decision == DecisionEnum::kCommitted ? "committed" : "aborted"), "migrationId"_attr = _migrationInfo.getId()); boost::optional<SemiFuture<void>> cleanupCompleteFuture = boost::none; - switch (*_decision) { - case Decision::kAborted: + switch (*decision) { + case DecisionEnum::kAborted: _abortMigrationOnDonorAndRecipient(opCtx); hangBeforeForgettingMigrationAfterAbortDecision.pauseWhileSet(); break; - case Decision::kCommitted: + case DecisionEnum::kCommitted: cleanupCompleteFuture = _commitMigrationOnDonorAndRecipient(opCtx); hangBeforeForgettingMigrationAfterCommitDecision.pauseWhileSet(); break; @@ -180,7 +179,7 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient( LOGV2_DEBUG( 23894, 2, "Making commit decision durable", "migrationId"_attr = _migrationInfo.getId()); - migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId()); + migrationutil::persistCommitDecision(opCtx, _migrationInfo); LOGV2_DEBUG( 23895, @@ -233,7 +232,17 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* LOGV2_DEBUG( 23899, 2, "Making abort decision durable", "migrationId"_attr = _migrationInfo.getId()); - migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId()); + migrationutil::persistAbortDecision(opCtx, _migrationInfo); + + hangBeforeSendingAbortDecision.pauseWhileSet(); + + // Ensure removing the local range deletion document to prevent incoming migrations with + // overlapping ranges to hang. + LOGV2_DEBUG(23901, + 2, + "Deleting range deletion task on donor", + "migrationId"_attr = _migrationInfo.getId()); + migrationutil::deleteRangeDeletionTaskLocally(opCtx, _migrationInfo.getId()); try { LOGV2_DEBUG(23900, @@ -254,7 +263,8 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& exShardNotFound) { LOGV2_DEBUG(4620231, 1, - "Failed to advance transaction number on recipient shard for abort", + "Failed to advance transaction number on recipient shard for abort and/or " + "marking range deletion task on recipient as ready for processing", "namespace"_attr = _migrationInfo.getNss(), "migrationId"_attr = _migrationInfo.getId(), "recipientShardId"_attr = _migrationInfo.getRecipientShardId(), @@ -262,14 +272,6 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext* "error"_attr = exShardNotFound); } - hangBeforeSendingAbortDecision.pauseWhileSet(); - - LOGV2_DEBUG(23901, - 2, - "Deleting range deletion task on donor", - "migrationId"_attr = _migrationInfo.getId()); - migrationutil::deleteRangeDeletionTaskLocally(opCtx, _migrationInfo.getId()); - LOGV2_DEBUG(23902, 2, "Marking range deletion task on recipient as ready for processing", diff --git a/src/mongo/db/s/migration_coordinator.h b/src/mongo/db/s/migration_coordinator.h index 742de204d38..e560dff9d0b 100644 --- a/src/mongo/db/s/migration_coordinator.h +++ b/src/mongo/db/s/migration_coordinator.h @@ -42,8 +42,6 @@ namespace migrationutil { */ class MigrationCoordinator { public: - enum class Decision { kAborted, kCommitted }; - MigrationCoordinator(MigrationSessionId sessionId, ShardId donorShard, ShardId recipientShard, @@ -80,7 +78,7 @@ public: * * This method is non-blocking and does not perform any I/O. */ - void setMigrationDecision(Decision decision); + void setMigrationDecision(DecisionEnum decision); /** * If a decision has been set, makes the decision durable, then communicates the decision by @@ -110,9 +108,6 @@ private: */ void _abortMigrationOnDonorAndRecipient(OperationContext* opCtx); - // The decision of the migration commit against the config server. - boost::optional<Decision> _decision; - MigrationCoordinatorDocument _migrationInfo; bool _waitForDelete = false; }; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 3a32c4400c1..a1ae8f973ed 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -500,7 +500,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig() { "updatedCollectionVersion"_attr = refreshedMetadata.getCollVersion(), "migrationId"_attr = _coordinator->getMigrationId()); - _coordinator->setMigrationDecision(migrationutil::MigrationCoordinator::Decision::kCommitted); + _coordinator->setMigrationDecision(DecisionEnum::kCommitted); hangBeforeLeavingCriticalSection.pauseWhileSet(); @@ -567,17 +567,7 @@ void MigrationSourceManager::cleanupOnError() { << _args.getFromShardId() << "to" << _args.getToShardId()), ShardingCatalogClient::kMajorityWriteConcern); - try { - _cleanup(true); - } catch (const DBException& ex) { - LOGV2_WARNING(22022, - "Failed to clean up migration with request parameters " - "{chunkMigrationRequestParameters} due to: {error}", - "Failed to clean up migration", - "chunkMigrationRequestParameters"_attr = redact(_args.toString()), - "error"_attr = redact(ex), - "migrationId"_attr = _coordinator->getMigrationId()); - } + _cleanup(true); } void MigrationSourceManager::abortDueToConflictingIndexOperation(OperationContext* opCtx) { @@ -651,7 +641,7 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk( }); } -void MigrationSourceManager::_cleanup(bool completeMigration) { +void MigrationSourceManager::_cleanup(bool completeMigration) noexcept { invariant(_state != kDone); auto cloneDriver = [&]() { @@ -692,49 +682,66 @@ void MigrationSourceManager::_cleanup(bool completeMigration) { cloneDriver->cancelClone(_opCtx); } - if (_state == kCriticalSection || _state == kCloneCompleted || _state == kCommittingOnConfig) { - _stats.totalCriticalSectionTimeMillis.addAndFetch(_cloneAndCommitTimer.millis()); - - // NOTE: The order of the operations below is important and the comments explain the - // reasoning behind it - - // Wait for the updates to the cache of the routing table to be fully written to disk before - // clearing the 'minOpTime recovery' document. This way, we ensure that all nodes from a - // shard, which donated a chunk will always be at the shard version of the last migration it - // performed. - // - // If the metadata is not persisted before clearing the 'inMigration' flag below, it is - // possible that the persisted metadata is rolled back after step down, but the write which - // cleared the 'inMigration' flag is not, a secondary node will report itself at an older - // shard version. - CatalogCacheLoader::get(_opCtx).waitForCollectionFlush(_opCtx, getNss()); - - // Clear the 'minOpTime recovery' document so that the next time a node from this shard - // becomes a primary, it won't have to recover the config server optime. - ShardingStateRecovery::endMetadataOp(_opCtx); - } - - if (completeMigration && _state >= kCloning) { - invariant(_coordinator); - if (_state < kCommittingOnConfig) { - _coordinator->setMigrationDecision( - migrationutil::MigrationCoordinator::Decision::kAborted); - } - // This can be called on an exception path after the OperationContext has been interrupted, - // so use a new OperationContext. Note, it's valid to call getServiceContext on an - // interrupted OperationContext. - auto newClient = _opCtx->getServiceContext()->makeClient("MigrationCoordinator"); - { - stdx::lock_guard<Client> lk(*newClient.get()); - newClient->setSystemOperationKillableByStepdown(lk); + try { + if (_state >= kCloning) { + invariant(_coordinator); + if (_state < kCommittingOnConfig) { + _coordinator->setMigrationDecision(DecisionEnum::kAborted); + } + + auto newClient = _opCtx->getServiceContext()->makeClient("MigrationCoordinator"); + { + stdx::lock_guard<Client> lk(*newClient.get()); + newClient->setSystemOperationKillableByStepdown(lk); + } + AlternativeClientRegion acr(newClient); + auto newOpCtxPtr = cc().makeOperationContext(); + auto newOpCtx = newOpCtxPtr.get(); + + if (_state >= kCriticalSection && _state <= kCommittingOnConfig) { + _stats.totalCriticalSectionTimeMillis.addAndFetch(_cloneAndCommitTimer.millis()); + + // NOTE: The order of the operations below is important and the comments explain the + // reasoning behind it. + // + // Wait for the updates to the cache of the routing table to be fully written to + // disk before clearing the 'minOpTime recovery' document. This way, we ensure that + // all nodes from a shard, which donated a chunk will always be at the shard version + // of the last migration it performed. + // + // If the metadata is not persisted before clearing the 'inMigration' flag below, it + // is possible that the persisted metadata is rolled back after step down, but the + // write which cleared the 'inMigration' flag is not, a secondary node will report + // itself at an older shard version. + CatalogCacheLoader::get(newOpCtx).waitForCollectionFlush(newOpCtx, getNss()); + + // Clear the 'minOpTime recovery' document so that the next time a node from this + // shard becomes a primary, it won't have to recover the config server optime. + ShardingStateRecovery::endMetadataOp(newOpCtx); + } + if (completeMigration) { + // This can be called on an exception path after the OperationContext has been + // interrupted, so use a new OperationContext. Note, it's valid to call + // getServiceContext on an interrupted OperationContext. + _cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx); + } } - AlternativeClientRegion acr(newClient); - auto newOpCtxPtr = cc().makeOperationContext(); - auto newOpCtx = newOpCtxPtr.get(); - _cleanupCompleteFuture = _coordinator->completeMigration(newOpCtx); - } - _state = kDone; + _state = kDone; + } catch (const DBException& ex) { + LOGV2_WARNING(5089001, + "Failed to complete the migration {migrationId} with " + "{chunkMigrationRequestParameters} due to: {error}", + "Failed to complete the migration", + "chunkMigrationRequestParameters"_attr = redact(_args.toString()), + "error"_attr = redact(ex), + "migrationId"_attr = _coordinator->getMigrationId()); + // Something went really wrong when completing the migration just unset the metadata and let + // the next op to recover. + UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); + AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX); + CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata(_opCtx); + } } BSONObj MigrationSourceManager::getMigrationStatusReport() const { diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 3bbae9988af..d032047c4fb 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -214,7 +214,7 @@ private: * Called when any of the states fails. May only be called once and will put the migration * manager into the kDone state. */ - void _cleanup(bool completeMigration); + void _cleanup(bool completeMigration) noexcept; // This is the opCtx of the moveChunk request that constructed the MigrationSourceManager. // The caller must guarantee it outlives the MigrationSourceManager. diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 01651ece011..26d1abcbb48 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -583,15 +583,18 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx, } } -void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) { +void persistCommitDecision(OperationContext* opCtx, + const MigrationCoordinatorDocument& migrationDoc) { + invariant(migrationDoc.getDecision() && + *migrationDoc.getDecision() == DecisionEnum::kCommitted); + hangInPersistMigrateCommitDecisionInterruptible.pauseWhileSet(opCtx); PersistentTaskStore<MigrationCoordinatorDocument> store( NamespaceString::kMigrationCoordinatorsNamespace); - store.update( - opCtx, - QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), - BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName << "committed"))); + store.upsert(opCtx, + QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationDoc.getId()), + migrationDoc.toBSON()); if (hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.shouldFail()) { hangInPersistMigrateCommitDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); @@ -600,25 +603,23 @@ void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) { } } -void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) { - retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( - opCtx, "persist migrate abort decision", [&](OperationContext* newOpCtx) { - hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(newOpCtx); - - PersistentTaskStore<MigrationCoordinatorDocument> store( - NamespaceString::kMigrationCoordinatorsNamespace); - store.update(newOpCtx, - QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), - BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName - << "aborted"))); - - if (hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.shouldFail()) { - hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.pauseWhileSet( - newOpCtx); - uasserted(ErrorCodes::InternalError, - "simulate an error response when persisting migrate abort decision"); - } - }); +void persistAbortDecision(OperationContext* opCtx, + const MigrationCoordinatorDocument& migrationDoc) { + invariant(migrationDoc.getDecision() && *migrationDoc.getDecision() == DecisionEnum::kAborted); + + hangInPersistMigrateAbortDecisionInterruptible.pauseWhileSet(opCtx); + + PersistentTaskStore<MigrationCoordinatorDocument> store( + NamespaceString::kMigrationCoordinatorsNamespace); + store.upsert(opCtx, + QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationDoc.getId()), + migrationDoc.toBSON()); + + if (hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.shouldFail()) { + hangInPersistMigrateAbortDecisionThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); + uasserted(ErrorCodes::InternalError, + "simulate an error response when persisting migrate abort decision"); + } } void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, @@ -646,20 +647,15 @@ void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, void deleteRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& deletionTaskId, const WriteConcernOptions& writeConcern) { - retryIdempotentWorkAsPrimaryUntilSuccessOrStepdown( - opCtx, "cancel local range deletion", [&](OperationContext* newOpCtx) { - hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(newOpCtx); - PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); - store.remove( - newOpCtx, QUERY(RangeDeletionTask::kIdFieldName << deletionTaskId), writeConcern); - - if (hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) { - hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet( - newOpCtx); - uasserted(ErrorCodes::InternalError, - "simulate an error response when deleting range deletion locally"); - } - }); + hangInDeleteRangeDeletionLocallyInterruptible.pauseWhileSet(opCtx); + PersistentTaskStore<RangeDeletionTask> store(NamespaceString::kRangeDeletionNamespace); + store.remove(opCtx, QUERY(RangeDeletionTask::kIdFieldName << deletionTaskId), writeConcern); + + if (hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.shouldFail()) { + hangInDeleteRangeDeletionLocallyThenSimulateErrorUninterruptible.pauseWhileSet(opCtx); + uasserted(ErrorCodes::InternalError, + "simulate an error response when deleting range deletion locally"); + } } void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx, @@ -891,9 +887,6 @@ void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss) if (doc.getDecision()) { // The decision is already known. - coordinator.setMigrationDecision((*doc.getDecision()) == DecisionEnum::kCommitted - ? MigrationCoordinator::Decision::kCommitted - : MigrationCoordinator::Decision::kAborted); coordinator.completeMigration(opCtx); return true; } @@ -957,9 +950,9 @@ void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss) } if (currentMetadata.keyBelongsToMe(doc.getRange().getMin())) { - coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted); + coordinator.setMigrationDecision(DecisionEnum::kAborted); } else { - coordinator.setMigrationDecision(MigrationCoordinator::Decision::kCommitted); + coordinator.setMigrationDecision(DecisionEnum::kCommitted); } coordinator.completeMigration(opCtx); diff --git a/src/mongo/db/s/migration_util.h b/src/mongo/db/s/migration_util.h index facef4118e2..09ca6b7c1e0 100644 --- a/src/mongo/db/s/migration_util.h +++ b/src/mongo/db/s/migration_util.h @@ -145,13 +145,15 @@ void persistRangeDeletionTaskLocally(OperationContext* opCtx, * Updates the migration coordinator document to set the decision field to "committed" and waits for * majority writeConcern. */ -void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId); +void persistCommitDecision(OperationContext* opCtx, + const MigrationCoordinatorDocument& migrationDoc); /** * Updates the migration coordinator document to set the decision field to "aborted" and waits for * majority writeConcern. */ -void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId); +void persistAbortDecision(OperationContext* opCtx, + const MigrationCoordinatorDocument& migrationDoc); /** * Deletes the range deletion task document with the specified id from config.rangeDeletions and |