diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2021-03-25 21:53:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-09 21:17:52 +0000 |
commit | aab15521427af6233739f5fa65df52ac9d9e95f0 (patch) | |
tree | d7948fd852ef07bdf61a817dc21f65a4bf83ade5 /src | |
parent | d23bccf28cd3ec616eee1d753060834c11cf5bee (diff) | |
download | mongo-aab15521427af6233739f5fa65df52ac9d9e95f0.tar.gz |
SERVER-55780 Simplify cancellation for tenant migration donors
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 304 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.h | 30 |
2 files changed, 136 insertions, 198 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index c75d034f49c..2ff7c4683bc 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -79,41 +79,29 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn const int kMaxRecipientKeyDocsFindAttempts = 10; -bool shouldStopCreatingTTLIndex(Status status, const CancellationToken& token) { - return status.isOK() || token.isCanceled(); +bool shouldStopInsertingDonorStateDoc(Status status) { + return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress; } -bool shouldStopInsertingDonorStateDoc(Status status, const CancellationToken& token) { - return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress || - token.isCanceled(); -} - -bool shouldStopUpdatingDonorStateDoc(Status status, const CancellationToken& token) { - return status.isOK() || token.isCanceled(); -} - -bool shouldStopSendingRecipientCommand(Status status, const CancellationToken& token) { +bool shouldStopSendingRecipientCommand(Status status) { return status.isOK() || !(ErrorCodes::isRetriableError(status) || - status == ErrorCodes::FailedToSatisfyReadPreference) || - token.isCanceled(); + // Returned if findHost() is unable to target the recipient in 15 seconds, which may + // happen after a failover. + status == ErrorCodes::FailedToSatisfyReadPreference); } -bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status, const CancellationToken& token) { +bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status) { // TODO (SERVER-54926): Convert HostUnreachable error in // _fetchAndStoreRecipientClusterTimeKeyDocs to specific error. return status.isOK() || !ErrorCodes::isRetriableError(status) || - status.code() == ErrorCodes::HostUnreachable || token.isCanceled(); + status.code() == ErrorCodes::HostUnreachable; } - -void checkIfReceivedDonorAbortMigration(const CancellationToken& serviceToken, - const CancellationToken& instanceToken) { - // If only the instance token was canceled, then we must have gotten donorAbortMigration. - uassert(ErrorCodes::TenantMigrationAborted, - "Migration aborted due to receiving donorAbortMigration.", - !instanceToken.isCanceled() || serviceToken.isCanceled()); +void checkForTokenInterrupt(const CancellationToken& token) { + uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled()); } + template <class Promise> void setPromiseFromStatusIfNotReady(WithLock lk, Promise& promise, Status status) { if (promise.getFuture().isReady()) { @@ -170,9 +158,9 @@ ExecutorFuture<void> TenantMigrationDonorService::createStateDocumentTTLIndex( result); uassertStatusOK(getStatusFromCommandResult(result)); }) - .until([token](Status status) { return shouldStopCreatingTTLIndex(status, token); }) + .until([](Status status) { return status.isOK(); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, CancellationToken::uncancelable()); + .on(**executor, token); } ExecutorFuture<void> TenantMigrationDonorService::createExternalKeysTTLIndex( @@ -196,9 +184,9 @@ ExecutorFuture<void> TenantMigrationDonorService::createExternalKeysTTLIndex( result); uassertStatusOK(getStatusFromCommandResult(result)); }) - .until([token](Status status) { return shouldStopCreatingTTLIndex(status, token); }) + .until([](Status status) { return status.isOK(); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, CancellationToken::uncancelable()); + .on(**executor, token); } ExecutorFuture<void> TenantMigrationDonorService::_rebuildService( @@ -445,11 +433,11 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([token](StatusWith<repl::OpTime> swOpTime) { - return shouldStopInsertingDonorStateDoc(swOpTime.getStatus(), token); + .until([](StatusWith<repl::OpTime> swOpTime) { + return shouldStopInsertingDonorStateDoc(swOpTime.getStatus()); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, CancellationToken::uncancelable()); + .on(**executor, token); } ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDoc( @@ -554,11 +542,9 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState invariant(updateOpTime); return updateOpTime.get(); }) - .until([token](StatusWith<repl::OpTime> swOpTime) { - return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token); - }) + .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, CancellationToken::uncancelable()); + .on(**executor, token); } ExecutorFuture<repl::OpTime> @@ -595,17 +581,17 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable( return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([token](StatusWith<repl::OpTime> swOpTime) { - return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token); - }) + .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, CancellationToken::uncancelable()); + .on(**executor, token); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern( - std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime) { + std::shared_ptr<executor::ScopedTaskExecutor> executor, + repl::OpTime opTime, + const CancellationToken& token) { return WaitForMajorityService::get(_serviceContext) - .waitUntilMajority(std::move(opTime), CancellationToken::uncancelable()) + .waitUntilMajority(std::move(opTime), token) .thenRunOn(**executor) .then([this, self = shared_from_this()] { stdx::lock_guard<Latch> lg(_mutex); @@ -661,7 +647,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi }); }); }) - .until([token](Status status) { return shouldStopSendingRecipientCommand(status, token); }) + .until([token](Status status) { return shouldStopSendingRecipientCommand(status); }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, token); } @@ -721,7 +707,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget SemiFuture<void> TenantMigrationDonorService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, - const CancellationToken& serviceToken) noexcept { + const CancellationToken& token) noexcept { { stdx::lock_guard<Latch> lg(_mutex); if (!_stateDoc.getMigrationStart()) { @@ -731,7 +717,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet(); - _abortMigrationSource = CancellationSource(serviceToken); + _abortMigrationSource = CancellationSource(token); + { stdx::lock_guard<Latch> lg(_mutex); setPromiseOkIfNotReady(lg, _migrationCancelablePromise); @@ -742,30 +729,31 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingDonatingCount(); return ExecutorFuture(**executor) - .then([this, self = shared_from_this(), executor, serviceToken] { - return _enterAbortingIndexBuildsState( - executor, serviceToken, _abortMigrationSource.token()); + .then([this, self = shared_from_this(), executor] { + return _enterAbortingIndexBuildsState(executor, _abortMigrationSource.token()); }) - .then([this, self = shared_from_this(), serviceToken] { - _abortIndexBuilds(serviceToken, _abortMigrationSource.token()); + .then([this, self = shared_from_this(), executor] { + _abortIndexBuilds(_abortMigrationSource.token()); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS] { return _fetchAndStoreRecipientClusterTimeKeyDocs( - executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token()); + executor, recipientTargeterRS, _abortMigrationSource.token()); }) - .then([this, self = shared_from_this(), executor, serviceToken] { - return _enterDataSyncState(executor, serviceToken, _abortMigrationSource.token()); + .then([this, self = shared_from_this(), executor] { + return _enterDataSyncState(executor, _abortMigrationSource.token()); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS] { return _waitForRecipientToBecomeConsistentAndEnterBlockingState( - executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token()); + executor, recipientTargeterRS, _abortMigrationSource.token()); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS] { return _waitForRecipientToReachBlockTimestampAndEnterCommittedState( - executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token()); + executor, recipientTargeterRS, _abortMigrationSource.token()); }) - .onError([this, self = shared_from_this(), executor, serviceToken](Status status) { - return _handleErrorOrEnterAbortedState(executor, serviceToken, status); + // Note from here on the migration cannot be aborted, so only the token from the primary + // only service should be used. + .onError([this, self = shared_from_this(), executor, token](Status status) { + return _handleErrorOrEnterAbortedState(executor, token, status); }) .onCompletion([this, self = shared_from_this()](Status status) { LOGV2(5006601, @@ -787,13 +775,18 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } } }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { + .then([this, self = shared_from_this(), executor, token, recipientTargeterRS] { return _waitForForgetMigrationThenMarkMigrationGarbageCollectable( - executor, recipientTargeterRS, serviceToken); + executor, recipientTargeterRS, token); }) .onCompletion([this, self = shared_from_this(), + token, scopedCounter{std::move(scopedOutstandingMigrationCounter)}](Status status) { + // Don't set the completion promise if the instance has been canceled. We assume + // whatever canceled the token will also set the promise with an appropriate error. + checkForTokenInterrupt(token); + stdx::lock_guard<Latch> lg(_mutex); LOGV2(4920400, @@ -808,9 +801,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterAbortingIndexBuildsState( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken) { + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) { { stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kUninitialized) { @@ -819,12 +810,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterAbortingIndexB } // Enter "abortingIndexBuilds" state. - return _insertStateDoc(executor, instanceToken) - .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { - // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should - // use its base PrimaryOnlyService's cancellation source to pass tokens - // in calls to WaitForMajorityService::waitUntilMajority. - return _waitForMajorityWriteConcern(executor, std::move(opTime)); + return _insertStateDoc(executor, token) + .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), token); }) .then([this, self = shared_from_this()] { auto opCtxHolder = cc().makeOperationContext(); @@ -833,8 +821,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterAbortingIndexB }); } -void TenantMigrationDonorService::Instance::_abortIndexBuilds( - const CancellationToken& serviceToken, const CancellationToken& instanceToken) { +void TenantMigrationDonorService::Instance::_abortIndexBuilds(const CancellationToken& token) { + checkForTokenInterrupt(token); + { stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kAbortingIndexBuilds) { @@ -842,8 +831,6 @@ void TenantMigrationDonorService::Instance::_abortIndexBuilds( } } - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - // Before starting data sync, abort any in-progress index builds. No new index // builds can start while we are doing this because the mtab prevents it. { @@ -858,8 +845,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken) { + const CancellationToken& token) { { stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kAbortingIndexBuilds) { @@ -867,18 +853,10 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs } } - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - - return AsyncTry([this, - self = shared_from_this(), - executor, - recipientTargeterRS, - serviceToken, - instanceToken] { - return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken) + return AsyncTry([this, self = shared_from_this(), executor, recipientTargeterRS, token] { + return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, token) .thenRunOn(**executor) - .then([this, self = shared_from_this(), executor, serviceToken, instanceToken]( - HostAndPort host) { + .then([this, self = shared_from_this(), executor, token](HostAndPort host) { pauseTenantMigrationBeforeFetchingKeys.pauseWhileSet(); const auto nss = NamespaceString::kKeysCollectionNamespace; @@ -938,10 +916,10 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs { stdx::lock_guard<Latch> lg(_mutex); - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - uassert(ErrorCodes::Interrupted, - "Donor service interrupted", - !serviceToken.isCanceled()); + // Note the fetcher cannot be canceled via token, so this check for + // interrupt is required otherwise stepdown/shutdown could block waiting + // for the fetcher to complete. + checkForTokenInterrupt(token); _recipientKeysFetcher = fetcher; } @@ -961,17 +939,13 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs return keyDocs; }) - .then([this, self = shared_from_this(), executor, serviceToken, instanceToken]( - auto keyDocs) { - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); + .then([this, self = shared_from_this(), executor, token](auto keyDocs) { + checkForTokenInterrupt(token); return tenant_migration_util::storeExternalClusterTimeKeyDocs( std::move(keyDocs)); }) - .then([this, self = shared_from_this(), serviceToken, instanceToken]( - repl::OpTime lastKeyOpTime) { - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - + .then([this, self = shared_from_this(), token](repl::OpTime lastKeyOpTime) { pauseTenantMigrationDonorBeforeWaitingForKeysToReplicate.pauseWhileSet(); auto votingMembersWriteConcern = @@ -981,21 +955,16 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs auto writeConcernFuture = repl::ReplicationCoordinator::get(_serviceContext) ->awaitReplicationAsyncNoWTimeout( lastKeyOpTime, votingMembersWriteConcern); - return future_util::withCancellation(std::move(writeConcernFuture), - instanceToken); + return future_util::withCancellation(std::move(writeConcernFuture), token); }); }) - .until([instanceToken](Status status) { - return shouldStopFetchingRecipientClusterTimeKeyDocs(status, instanceToken); - }) + .until([](Status status) { return shouldStopFetchingRecipientClusterTimeKeyDocs(status); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, CancellationToken::uncancelable()); + .on(**executor, token); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken) { + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) { pauseTenantMigrationAfterFetchingAndStoringKeys.pauseWhileSet(); { stdx::lock_guard<Latch> lg(_mutex); @@ -1004,17 +973,12 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState( } } - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - pauseTenantMigrationBeforeLeavingAbortingIndexBuildsState.pauseWhileSet(); // Enter "dataSync" state. - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, instanceToken) - .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { - // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should - // use its base PrimaryOnlyService's cancellation source to pass tokens - // in calls to WaitForMajorityService::waitUntilMajority. - return _waitForMajorityWriteConcern(executor, std::move(opTime)); + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, token) + .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), token); }); } @@ -1022,8 +986,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnterBlockingState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken) { + const CancellationToken& token) { { stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) { @@ -1031,28 +994,17 @@ TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnt } } - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - - return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, instanceToken) + return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, token) .then([this, self = shared_from_this()] { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); pauseTenantMigrationBeforeLeavingDataSyncState.pauseWhileSet(opCtx); }) - .then([this, self = shared_from_this(), executor, serviceToken, instanceToken] { - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - + .then([this, self = shared_from_this(), executor, token] { // Enter "blocking" state. - return _updateStateDoc( - executor, TenantMigrationDonorStateEnum::kBlocking, instanceToken) - .then([this, self = shared_from_this(), executor, serviceToken, instanceToken]( - repl::OpTime opTime) { - // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should - // use its base PrimaryOnlyService's cancellation source to pass tokens - // in calls to WaitForMajorityService::waitUntilMajority. - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - - return _waitForMajorityWriteConcern(executor, std::move(opTime)); + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking, token) + .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), token); }); }); } @@ -1061,36 +1013,30 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAndEnterCommittedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken) { + const CancellationToken& token) { { stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kBlocking) { return ExecutorFuture(**executor); } - } - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - - { - stdx::lock_guard<Latch> lg(_mutex); invariant(_stateDoc.getBlockTimestamp()); } // Source to cancel the timeout if the operation completed in time. CancellationSource cancelTimeoutSource; + CancellationSource recipientSyncDataSource(token); auto deadlineReachedFuture = (*executor)->sleepFor(Milliseconds(repl::tenantMigrationBlockingStateTimeoutMS.load()), cancelTimeoutSource.token()); - std::vector<ExecutorFuture<void>> futures; - futures.push_back(std::move(deadlineReachedFuture)); - futures.push_back(_sendRecipientSyncDataCommand(executor, recipientTargeterRS, instanceToken)); - - return whenAny(std::move(futures)) + return whenAny(std::move(deadlineReachedFuture), + _sendRecipientSyncDataCommand( + executor, recipientTargeterRS, recipientSyncDataSource.token())) .thenRunOn(**executor) - .then([this, cancelTimeoutSource, self = shared_from_this()](auto result) mutable { + .then([this, self = shared_from_this(), cancelTimeoutSource, recipientSyncDataSource]( + auto result) mutable { const auto& [status, idx] = result; if (idx == 0) { @@ -1098,7 +1044,7 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd "Tenant migration blocking stage timeout expired", "timeoutMs"_attr = repl::tenantMigrationGarbageCollectionDelayMS.load()); // Deadline reached, cancel the pending '_sendRecipientSyncDataCommand()'... - _abortMigrationSource.cancel(); + recipientSyncDataSource.cancel(); // ...and return error. uasserted(ErrorCodes::ExceededTimeLimit, "Blocking state timeout expired"); } else if (idx == 1) { @@ -1132,32 +1078,25 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd uasserted(ErrorCodes::InternalError, "simulate a tenant migration error"); } }) - .then([this, self = shared_from_this(), executor, serviceToken, instanceToken] { - checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - + .then([this, self = shared_from_this(), executor, token] { // Enter "commit" state. - return _updateStateDoc( - executor, TenantMigrationDonorStateEnum::kCommitted, serviceToken) - .then( - [this, self = shared_from_this(), executor, serviceToken](repl::OpTime opTime) { - // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should - // use its base PrimaryOnlyService's cancellation source to pass tokens - // in calls to WaitForMajorityService::waitUntilMajority. - return _waitForMajorityWriteConcern(executor, std::move(opTime)) - .then([this, self = shared_from_this()] { - stdx::lock_guard<Latch> lg(_mutex); - // If interrupt is called at some point during execution, it is - // possible that interrupt() will fulfill the promise before we - // do. - setPromiseOkIfNotReady(lg, _decisionPromise); - }); - }); + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted, token) + .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), token) + .then([this, self = shared_from_this()] { + stdx::lock_guard<Latch> lg(_mutex); + // If interrupt is called at some point during execution, it is + // possible that interrupt() will fulfill the promise before we + // do. + setPromiseOkIfNotReady(lg, _decisionPromise); + }); + }); }); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterAbortedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& serviceToken, + const CancellationToken& token, Status status) { { stdx::lock_guard<Latch> lg(_mutex); @@ -1167,6 +1106,10 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA } } + if (_abortMigrationSource.token().isCanceled()) { + status = Status(ErrorCodes::TenantMigrationAborted, "Aborted due to donorAbortMigration."); + } + auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( _serviceContext, _tenantId); if (status == ErrorCodes::ConflictingOperationInProgress || !mtab) { @@ -1186,9 +1129,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA } else { // Enter "abort" state. _abortReason.emplace(status); - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted, serviceToken) - .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime)) + return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted, token) + .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), token) .then([this, self = shared_from_this()] { stdx::lock_guard<Latch> lg(_mutex); // If interrupt is called at some point during execution, it is @@ -1203,7 +1146,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationGarbageCollectable( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& serviceToken) { + const CancellationToken& token) { auto expiredAt = [&]() { stdx::lock_guard<Latch> lg(_mutex); return _stateDoc.getExpireAt(); @@ -1220,15 +1163,14 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG // Wait for the donorForgetMigration command. // If donorAbortMigration has already canceled work, the abortMigrationSource would be // canceled and continued usage of the source would lead to incorrect behavior. Thus, we - // need to use the serviceToken after the migration has reached a decision state in - // order to continue work, such as sending donorForgetMigration, successfully. + // need to use the token after the migration has reached a decision state in order to continue + // work, such as sending donorForgetMigration, successfully. return std::move(_receiveDonorForgetMigrationPromise.getFuture()) .thenRunOn(**executor) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { - return _sendRecipientForgetMigrationCommand( - executor, recipientTargeterRS, serviceToken); + .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { + return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS, token); }) - .then([this, self = shared_from_this(), executor, serviceToken] { + .then([this, self = shared_from_this(), executor, token] { // Note marking the keys as garbage collectable is not atomic with marking the // state document garbage collectable, so an interleaved failover can lead the // keys to be deleted before the state document has an expiration date. This is @@ -1238,13 +1180,13 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG executor, _donorService->getInstanceCleanupExecutor(), _migrationUuid, - serviceToken); + token); }) - .then([this, self = shared_from_this(), executor, serviceToken] { - return _markStateDocAsGarbageCollectable(executor, serviceToken); + .then([this, self = shared_from_this(), executor, token] { + return _markStateDocAsGarbageCollectable(executor, token); }) - .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime)); + .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), token); }); } diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 487ffc5bf06..a596261e0f7 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -159,11 +159,9 @@ public: ExecutorFuture<void> _enterAbortingIndexBuildsState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken); + const CancellationToken& token); - void _abortIndexBuilds(const CancellationToken& serviceToken, - const CancellationToken& instanceToken); + void _abortIndexBuilds(const CancellationToken& token); /** * Fetches all key documents from the recipient's admin.system.keys collection, stores @@ -172,35 +170,31 @@ public: ExecutorFuture<void> _fetchAndStoreRecipientClusterTimeKeyDocs( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken); + const CancellationToken& token); ExecutorFuture<void> _enterDataSyncState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken); + const CancellationToken& token); ExecutorFuture<void> _waitForRecipientToBecomeConsistentAndEnterBlockingState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken); + const CancellationToken& token); ExecutorFuture<void> _waitForRecipientToReachBlockTimestampAndEnterCommittedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& serviceToken, - const CancellationToken& instanceToken); + const CancellationToken& token); ExecutorFuture<void> _handleErrorOrEnterAbortedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& serviceToken, + const CancellationToken& token, Status status); ExecutorFuture<void> _waitForForgetMigrationThenMarkMigrationGarbageCollectable( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancellationToken& serviceToken); + const CancellationToken& token); /** * Makes a task executor for executing commands against the recipient. If the server @@ -238,7 +232,9 @@ public: * Waits for given opTime to be majority committed. */ ExecutorFuture<void> _waitForMajorityWriteConcern( - std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime); + std::shared_ptr<executor::ScopedTaskExecutor> executor, + repl::OpTime opTime, + const CancellationToken& token); /** * Sends the given command to the recipient replica set. @@ -323,8 +319,8 @@ public: // abort. SharedPromise<void> _decisionPromise; - // This CancellationSource is instantiated from CancellationToken that is passed into run(). - // It allows for manual cancellation of work from the instance. + // Used for logical interrupts that require aborting the migration but not unconditionally + // interrupting the instance, e.g. receiving donorAbortMigration. CancellationSource _abortMigrationSource; }; |