summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2021-03-25 21:53:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-09 21:17:52 +0000
commitaab15521427af6233739f5fa65df52ac9d9e95f0 (patch)
treed7948fd852ef07bdf61a817dc21f65a4bf83ade5
parentd23bccf28cd3ec616eee1d753060834c11cf5bee (diff)
downloadmongo-aab15521427af6233739f5fa65df52ac9d9e95f0.tar.gz
SERVER-55780 Simplify cancellation for tenant migration donors
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp304
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h30
2 files changed, 136 insertions, 198 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index c75d034f49c..2ff7c4683bc 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -79,41 +79,29 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn
const int kMaxRecipientKeyDocsFindAttempts = 10;
-bool shouldStopCreatingTTLIndex(Status status, const CancellationToken& token) {
- return status.isOK() || token.isCanceled();
+bool shouldStopInsertingDonorStateDoc(Status status) {
+ return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress;
}
-bool shouldStopInsertingDonorStateDoc(Status status, const CancellationToken& token) {
- return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress ||
- token.isCanceled();
-}
-
-bool shouldStopUpdatingDonorStateDoc(Status status, const CancellationToken& token) {
- return status.isOK() || token.isCanceled();
-}
-
-bool shouldStopSendingRecipientCommand(Status status, const CancellationToken& token) {
+bool shouldStopSendingRecipientCommand(Status status) {
return status.isOK() ||
!(ErrorCodes::isRetriableError(status) ||
- status == ErrorCodes::FailedToSatisfyReadPreference) ||
- token.isCanceled();
+ // Returned if findHost() is unable to target the recipient in 15 seconds, which may
+ // happen after a failover.
+ status == ErrorCodes::FailedToSatisfyReadPreference);
}
-bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status, const CancellationToken& token) {
+bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status) {
// TODO (SERVER-54926): Convert HostUnreachable error in
// _fetchAndStoreRecipientClusterTimeKeyDocs to specific error.
return status.isOK() || !ErrorCodes::isRetriableError(status) ||
- status.code() == ErrorCodes::HostUnreachable || token.isCanceled();
+ status.code() == ErrorCodes::HostUnreachable;
}
-
-void checkIfReceivedDonorAbortMigration(const CancellationToken& serviceToken,
- const CancellationToken& instanceToken) {
- // If only the instance token was canceled, then we must have gotten donorAbortMigration.
- uassert(ErrorCodes::TenantMigrationAborted,
- "Migration aborted due to receiving donorAbortMigration.",
- !instanceToken.isCanceled() || serviceToken.isCanceled());
+void checkForTokenInterrupt(const CancellationToken& token) {
+ uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
}
+
template <class Promise>
void setPromiseFromStatusIfNotReady(WithLock lk, Promise& promise, Status status) {
if (promise.getFuture().isReady()) {
@@ -170,9 +158,9 @@ ExecutorFuture<void> TenantMigrationDonorService::createStateDocumentTTLIndex(
result);
uassertStatusOK(getStatusFromCommandResult(result));
})
- .until([token](Status status) { return shouldStopCreatingTTLIndex(status, token); })
+ .until([](Status status) { return status.isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancellationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::createExternalKeysTTLIndex(
@@ -196,9 +184,9 @@ ExecutorFuture<void> TenantMigrationDonorService::createExternalKeysTTLIndex(
result);
uassertStatusOK(getStatusFromCommandResult(result));
})
- .until([token](Status status) { return shouldStopCreatingTTLIndex(status, token); })
+ .until([](Status status) { return status.isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancellationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::_rebuildService(
@@ -445,11 +433,11 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([token](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopInsertingDonorStateDoc(swOpTime.getStatus(), token);
+ .until([](StatusWith<repl::OpTime> swOpTime) {
+ return shouldStopInsertingDonorStateDoc(swOpTime.getStatus());
})
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancellationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDoc(
@@ -554,11 +542,9 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState
invariant(updateOpTime);
return updateOpTime.get();
})
- .until([token](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token);
- })
+ .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancellationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<repl::OpTime>
@@ -595,17 +581,17 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable(
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([token](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token);
- })
+ .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancellationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern(
- std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime) {
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ repl::OpTime opTime,
+ const CancellationToken& token) {
return WaitForMajorityService::get(_serviceContext)
- .waitUntilMajority(std::move(opTime), CancellationToken::uncancelable())
+ .waitUntilMajority(std::move(opTime), token)
.thenRunOn(**executor)
.then([this, self = shared_from_this()] {
stdx::lock_guard<Latch> lg(_mutex);
@@ -661,7 +647,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi
});
});
})
- .until([token](Status status) { return shouldStopSendingRecipientCommand(status, token); })
+ .until([token](Status status) { return shouldStopSendingRecipientCommand(status); })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -721,7 +707,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget
SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- const CancellationToken& serviceToken) noexcept {
+ const CancellationToken& token) noexcept {
{
stdx::lock_guard<Latch> lg(_mutex);
if (!_stateDoc.getMigrationStart()) {
@@ -731,7 +717,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet();
- _abortMigrationSource = CancellationSource(serviceToken);
+ _abortMigrationSource = CancellationSource(token);
+
{
stdx::lock_guard<Latch> lg(_mutex);
setPromiseOkIfNotReady(lg, _migrationCancelablePromise);
@@ -742,30 +729,31 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingDonatingCount();
return ExecutorFuture(**executor)
- .then([this, self = shared_from_this(), executor, serviceToken] {
- return _enterAbortingIndexBuildsState(
- executor, serviceToken, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor] {
+ return _enterAbortingIndexBuildsState(executor, _abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), serviceToken] {
- _abortIndexBuilds(serviceToken, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor] {
+ _abortIndexBuilds(_abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
return _fetchAndStoreRecipientClusterTimeKeyDocs(
- executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token());
+ executor, recipientTargeterRS, _abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), executor, serviceToken] {
- return _enterDataSyncState(executor, serviceToken, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor] {
+ return _enterDataSyncState(executor, _abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
return _waitForRecipientToBecomeConsistentAndEnterBlockingState(
- executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token());
+ executor, recipientTargeterRS, _abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
return _waitForRecipientToReachBlockTimestampAndEnterCommittedState(
- executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token());
+ executor, recipientTargeterRS, _abortMigrationSource.token());
})
- .onError([this, self = shared_from_this(), executor, serviceToken](Status status) {
- return _handleErrorOrEnterAbortedState(executor, serviceToken, status);
+ // Note from here on the migration cannot be aborted, so only the token from the primary
+ // only service should be used.
+ .onError([this, self = shared_from_this(), executor, token](Status status) {
+ return _handleErrorOrEnterAbortedState(executor, token, status);
})
.onCompletion([this, self = shared_from_this()](Status status) {
LOGV2(5006601,
@@ -787,13 +775,18 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
}
}
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ .then([this, self = shared_from_this(), executor, token, recipientTargeterRS] {
return _waitForForgetMigrationThenMarkMigrationGarbageCollectable(
- executor, recipientTargeterRS, serviceToken);
+ executor, recipientTargeterRS, token);
})
.onCompletion([this,
self = shared_from_this(),
+ token,
scopedCounter{std::move(scopedOutstandingMigrationCounter)}](Status status) {
+ // Don't set the completion promise if the instance has been canceled. We assume
+ // whatever canceled the token will also set the promise with an appropriate error.
+ checkForTokenInterrupt(token);
+
stdx::lock_guard<Latch> lg(_mutex);
LOGV2(4920400,
@@ -808,9 +801,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterAbortingIndexBuildsState(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken) {
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kUninitialized) {
@@ -819,12 +810,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterAbortingIndexB
}
// Enter "abortingIndexBuilds" state.
- return _insertStateDoc(executor, instanceToken)
- .then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
- // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
- // use its base PrimaryOnlyService's cancellation source to pass tokens
- // in calls to WaitForMajorityService::waitUntilMajority.
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ return _insertStateDoc(executor, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
})
.then([this, self = shared_from_this()] {
auto opCtxHolder = cc().makeOperationContext();
@@ -833,8 +821,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterAbortingIndexB
});
}
-void TenantMigrationDonorService::Instance::_abortIndexBuilds(
- const CancellationToken& serviceToken, const CancellationToken& instanceToken) {
+void TenantMigrationDonorService::Instance::_abortIndexBuilds(const CancellationToken& token) {
+ checkForTokenInterrupt(token);
+
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kAbortingIndexBuilds) {
@@ -842,8 +831,6 @@ void TenantMigrationDonorService::Instance::_abortIndexBuilds(
}
}
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
// Before starting data sync, abort any in-progress index builds. No new index
// builds can start while we are doing this because the mtab prevents it.
{
@@ -858,8 +845,7 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken) {
+ const CancellationToken& token) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kAbortingIndexBuilds) {
@@ -867,18 +853,10 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
}
}
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
- return AsyncTry([this,
- self = shared_from_this(),
- executor,
- recipientTargeterRS,
- serviceToken,
- instanceToken] {
- return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken)
+ return AsyncTry([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
+ return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, token)
.thenRunOn(**executor)
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken](
- HostAndPort host) {
+ .then([this, self = shared_from_this(), executor, token](HostAndPort host) {
pauseTenantMigrationBeforeFetchingKeys.pauseWhileSet();
const auto nss = NamespaceString::kKeysCollectionNamespace;
@@ -938,10 +916,10 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
{
stdx::lock_guard<Latch> lg(_mutex);
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
- uassert(ErrorCodes::Interrupted,
- "Donor service interrupted",
- !serviceToken.isCanceled());
+ // Note the fetcher cannot be canceled via token, so this check for
+ // interrupt is required otherwise stepdown/shutdown could block waiting
+ // for the fetcher to complete.
+ checkForTokenInterrupt(token);
_recipientKeysFetcher = fetcher;
}
@@ -961,17 +939,13 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
return keyDocs;
})
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken](
- auto keyDocs) {
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
+ .then([this, self = shared_from_this(), executor, token](auto keyDocs) {
+ checkForTokenInterrupt(token);
return tenant_migration_util::storeExternalClusterTimeKeyDocs(
std::move(keyDocs));
})
- .then([this, self = shared_from_this(), serviceToken, instanceToken](
- repl::OpTime lastKeyOpTime) {
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
+ .then([this, self = shared_from_this(), token](repl::OpTime lastKeyOpTime) {
pauseTenantMigrationDonorBeforeWaitingForKeysToReplicate.pauseWhileSet();
auto votingMembersWriteConcern =
@@ -981,21 +955,16 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
auto writeConcernFuture = repl::ReplicationCoordinator::get(_serviceContext)
->awaitReplicationAsyncNoWTimeout(
lastKeyOpTime, votingMembersWriteConcern);
- return future_util::withCancellation(std::move(writeConcernFuture),
- instanceToken);
+ return future_util::withCancellation(std::move(writeConcernFuture), token);
});
})
- .until([instanceToken](Status status) {
- return shouldStopFetchingRecipientClusterTimeKeyDocs(status, instanceToken);
- })
+ .until([](Status status) { return shouldStopFetchingRecipientClusterTimeKeyDocs(status); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancellationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken) {
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token) {
pauseTenantMigrationAfterFetchingAndStoringKeys.pauseWhileSet();
{
stdx::lock_guard<Latch> lg(_mutex);
@@ -1004,17 +973,12 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState(
}
}
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
pauseTenantMigrationBeforeLeavingAbortingIndexBuildsState.pauseWhileSet();
// Enter "dataSync" state.
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, instanceToken)
- .then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
- // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
- // use its base PrimaryOnlyService's cancellation source to pass tokens
- // in calls to WaitForMajorityService::waitUntilMajority.
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
});
}
@@ -1022,8 +986,7 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnterBlockingState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken) {
+ const CancellationToken& token) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) {
@@ -1031,28 +994,17 @@ TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnt
}
}
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
- return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, instanceToken)
+ return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, token)
.then([this, self = shared_from_this()] {
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
pauseTenantMigrationBeforeLeavingDataSyncState.pauseWhileSet(opCtx);
})
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken] {
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
+ .then([this, self = shared_from_this(), executor, token] {
// Enter "blocking" state.
- return _updateStateDoc(
- executor, TenantMigrationDonorStateEnum::kBlocking, instanceToken)
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken](
- repl::OpTime opTime) {
- // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
- // use its base PrimaryOnlyService's cancellation source to pass tokens
- // in calls to WaitForMajorityService::waitUntilMajority.
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
});
});
}
@@ -1061,36 +1013,30 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAndEnterCommittedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken) {
+ const CancellationToken& token) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kBlocking) {
return ExecutorFuture(**executor);
}
- }
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
- {
- stdx::lock_guard<Latch> lg(_mutex);
invariant(_stateDoc.getBlockTimestamp());
}
// Source to cancel the timeout if the operation completed in time.
CancellationSource cancelTimeoutSource;
+ CancellationSource recipientSyncDataSource(token);
auto deadlineReachedFuture =
(*executor)->sleepFor(Milliseconds(repl::tenantMigrationBlockingStateTimeoutMS.load()),
cancelTimeoutSource.token());
- std::vector<ExecutorFuture<void>> futures;
- futures.push_back(std::move(deadlineReachedFuture));
- futures.push_back(_sendRecipientSyncDataCommand(executor, recipientTargeterRS, instanceToken));
-
- return whenAny(std::move(futures))
+ return whenAny(std::move(deadlineReachedFuture),
+ _sendRecipientSyncDataCommand(
+ executor, recipientTargeterRS, recipientSyncDataSource.token()))
.thenRunOn(**executor)
- .then([this, cancelTimeoutSource, self = shared_from_this()](auto result) mutable {
+ .then([this, self = shared_from_this(), cancelTimeoutSource, recipientSyncDataSource](
+ auto result) mutable {
const auto& [status, idx] = result;
if (idx == 0) {
@@ -1098,7 +1044,7 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd
"Tenant migration blocking stage timeout expired",
"timeoutMs"_attr = repl::tenantMigrationGarbageCollectionDelayMS.load());
// Deadline reached, cancel the pending '_sendRecipientSyncDataCommand()'...
- _abortMigrationSource.cancel();
+ recipientSyncDataSource.cancel();
// ...and return error.
uasserted(ErrorCodes::ExceededTimeLimit, "Blocking state timeout expired");
} else if (idx == 1) {
@@ -1132,32 +1078,25 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd
uasserted(ErrorCodes::InternalError, "simulate a tenant migration error");
}
})
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken] {
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
+ .then([this, self = shared_from_this(), executor, token] {
// Enter "commit" state.
- return _updateStateDoc(
- executor, TenantMigrationDonorStateEnum::kCommitted, serviceToken)
- .then(
- [this, self = shared_from_this(), executor, serviceToken](repl::OpTime opTime) {
- // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
- // use its base PrimaryOnlyService's cancellation source to pass tokens
- // in calls to WaitForMajorityService::waitUntilMajority.
- return _waitForMajorityWriteConcern(executor, std::move(opTime))
- .then([this, self = shared_from_this()] {
- stdx::lock_guard<Latch> lg(_mutex);
- // If interrupt is called at some point during execution, it is
- // possible that interrupt() will fulfill the promise before we
- // do.
- setPromiseOkIfNotReady(lg, _decisionPromise);
- });
- });
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token)
+ .then([this, self = shared_from_this()] {
+ stdx::lock_guard<Latch> lg(_mutex);
+ // If interrupt is called at some point during execution, it is
+ // possible that interrupt() will fulfill the promise before we
+ // do.
+ setPromiseOkIfNotReady(lg, _decisionPromise);
+ });
+ });
});
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterAbortedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& serviceToken,
+ const CancellationToken& token,
Status status) {
{
stdx::lock_guard<Latch> lg(_mutex);
@@ -1167,6 +1106,10 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA
}
}
+ if (_abortMigrationSource.token().isCanceled()) {
+ status = Status(ErrorCodes::TenantMigrationAborted, "Aborted due to donorAbortMigration.");
+ }
+
auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
_serviceContext, _tenantId);
if (status == ErrorCodes::ConflictingOperationInProgress || !mtab) {
@@ -1186,9 +1129,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA
} else {
// Enter "abort" state.
_abortReason.emplace(status);
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted, serviceToken)
- .then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime))
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token)
.then([this, self = shared_from_this()] {
stdx::lock_guard<Latch> lg(_mutex);
// If interrupt is called at some point during execution, it is
@@ -1203,7 +1146,7 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationGarbageCollectable(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& serviceToken) {
+ const CancellationToken& token) {
auto expiredAt = [&]() {
stdx::lock_guard<Latch> lg(_mutex);
return _stateDoc.getExpireAt();
@@ -1220,15 +1163,14 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
// Wait for the donorForgetMigration command.
// If donorAbortMigration has already canceled work, the abortMigrationSource would be
// canceled and continued usage of the source would lead to incorrect behavior. Thus, we
- // need to use the serviceToken after the migration has reached a decision state in
- // order to continue work, such as sending donorForgetMigration, successfully.
+ // need to use the token after the migration has reached a decision state in order to continue
+ // work, such as sending donorForgetMigration, successfully.
return std::move(_receiveDonorForgetMigrationPromise.getFuture())
.thenRunOn(**executor)
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
- return _sendRecipientForgetMigrationCommand(
- executor, recipientTargeterRS, serviceToken);
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
+ return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS, token);
})
- .then([this, self = shared_from_this(), executor, serviceToken] {
+ .then([this, self = shared_from_this(), executor, token] {
// Note marking the keys as garbage collectable is not atomic with marking the
// state document garbage collectable, so an interleaved failover can lead the
// keys to be deleted before the state document has an expiration date. This is
@@ -1238,13 +1180,13 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
executor,
_donorService->getInstanceCleanupExecutor(),
_migrationUuid,
- serviceToken);
+ token);
})
- .then([this, self = shared_from_this(), executor, serviceToken] {
- return _markStateDocAsGarbageCollectable(executor, serviceToken);
+ .then([this, self = shared_from_this(), executor, token] {
+ return _markStateDocAsGarbageCollectable(executor, token);
})
- .then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
});
}
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 487ffc5bf06..a596261e0f7 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -159,11 +159,9 @@ public:
ExecutorFuture<void> _enterAbortingIndexBuildsState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken);
+ const CancellationToken& token);
- void _abortIndexBuilds(const CancellationToken& serviceToken,
- const CancellationToken& instanceToken);
+ void _abortIndexBuilds(const CancellationToken& token);
/**
* Fetches all key documents from the recipient's admin.system.keys collection, stores
@@ -172,35 +170,31 @@ public:
ExecutorFuture<void> _fetchAndStoreRecipientClusterTimeKeyDocs(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken);
+ const CancellationToken& token);
ExecutorFuture<void> _enterDataSyncState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken);
+ const CancellationToken& token);
ExecutorFuture<void> _waitForRecipientToBecomeConsistentAndEnterBlockingState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken);
+ const CancellationToken& token);
ExecutorFuture<void> _waitForRecipientToReachBlockTimestampAndEnterCommittedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& serviceToken,
- const CancellationToken& instanceToken);
+ const CancellationToken& token);
ExecutorFuture<void> _handleErrorOrEnterAbortedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& serviceToken,
+ const CancellationToken& token,
Status status);
ExecutorFuture<void> _waitForForgetMigrationThenMarkMigrationGarbageCollectable(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancellationToken& serviceToken);
+ const CancellationToken& token);
/**
* Makes a task executor for executing commands against the recipient. If the server
@@ -238,7 +232,9 @@ public:
* Waits for given opTime to be majority committed.
*/
ExecutorFuture<void> _waitForMajorityWriteConcern(
- std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime);
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ repl::OpTime opTime,
+ const CancellationToken& token);
/**
* Sends the given command to the recipient replica set.
@@ -323,8 +319,8 @@ public:
// abort.
SharedPromise<void> _decisionPromise;
- // This CancellationSource is instantiated from CancellationToken that is passed into run().
- // It allows for manual cancellation of work from the instance.
+ // Used for logical interrupts that require aborting the migration but not unconditionally
+ // interrupting the instance, e.g. receiving donorAbortMigration.
CancellationSource _abortMigrationSource;
};