summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_donor_service.cpp
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2021-03-25 21:53:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-11 14:52:08 +0000
commit4613a9d07b377b1b6a711671a252776dee688892 (patch)
tree971a6bba4baf3a699afcc77dbbca3a2a3a9eb703 /src/mongo/db/repl/tenant_migration_donor_service.cpp
parent5b9f84421440daef351224597fd8ecc594c2f70e (diff)
downloadmongo-4613a9d07b377b1b6a711671a252776dee688892.tar.gz
SERVER-55780 Simplify cancellation for tenant migration donors
(cherry picked from commit aab15521427af6233739f5fa65df52ac9d9e95f0)
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_donor_service.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp304
1 files changed, 123 insertions, 181 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index a8cc8daa2c1..47330bea141 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -79,41 +79,29 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn
const int kMaxRecipientKeyDocsFindAttempts = 10;
-bool shouldStopCreatingTTLIndex(Status status, const CancelationToken& token) {
- return status.isOK() || token.isCanceled();
+bool shouldStopInsertingDonorStateDoc(Status status) {
+ return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress;
}
-bool shouldStopInsertingDonorStateDoc(Status status, const CancelationToken& token) {
- return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress ||
- token.isCanceled();
-}
-
-bool shouldStopUpdatingDonorStateDoc(Status status, const CancelationToken& token) {
- return status.isOK() || token.isCanceled();
-}
-
-bool shouldStopSendingRecipientCommand(Status status, const CancelationToken& token) {
+bool shouldStopSendingRecipientCommand(Status status) {
return status.isOK() ||
!(ErrorCodes::isRetriableError(status) ||
- status == ErrorCodes::FailedToSatisfyReadPreference) ||
- token.isCanceled();
+ // Returned if findHost() is unable to target the recipient in 15 seconds, which may
+ // happen after a failover.
+ status == ErrorCodes::FailedToSatisfyReadPreference);
}
-bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status, const CancelationToken& token) {
+bool shouldStopFetchingRecipientClusterTimeKeyDocs(Status status) {
// TODO (SERVER-54926): Convert HostUnreachable error in
// _fetchAndStoreRecipientClusterTimeKeyDocs to specific error.
return status.isOK() || !ErrorCodes::isRetriableError(status) ||
- status.code() == ErrorCodes::HostUnreachable || token.isCanceled();
+ status.code() == ErrorCodes::HostUnreachable;
}
-
-void checkIfReceivedDonorAbortMigration(const CancelationToken& serviceToken,
- const CancelationToken& instanceToken) {
- // If only the instance token was canceled, then we must have gotten donorAbortMigration.
- uassert(ErrorCodes::TenantMigrationAborted,
- "Migration aborted due to receiving donorAbortMigration.",
- !instanceToken.isCanceled() || serviceToken.isCanceled());
+void checkForTokenInterrupt(const CancelationToken& token) {
+ uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
}
+
template <class Promise>
void setPromiseFromStatusIfNotReady(WithLock lk, Promise& promise, Status status) {
if (promise.getFuture().isReady()) {
@@ -170,9 +158,9 @@ ExecutorFuture<void> TenantMigrationDonorService::createStateDocumentTTLIndex(
result);
uassertStatusOK(getStatusFromCommandResult(result));
})
- .until([token](Status status) { return shouldStopCreatingTTLIndex(status, token); })
+ .until([](Status status) { return status.isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancelationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::createExternalKeysTTLIndex(
@@ -196,9 +184,9 @@ ExecutorFuture<void> TenantMigrationDonorService::createExternalKeysTTLIndex(
result);
uassertStatusOK(getStatusFromCommandResult(result));
})
- .until([token](Status status) { return shouldStopCreatingTTLIndex(status, token); })
+ .until([](Status status) { return status.isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancelationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::_rebuildService(
@@ -446,11 +434,11 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([token](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopInsertingDonorStateDoc(swOpTime.getStatus(), token);
+ .until([](StatusWith<repl::OpTime> swOpTime) {
+ return shouldStopInsertingDonorStateDoc(swOpTime.getStatus());
})
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancelationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDoc(
@@ -555,11 +543,9 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState
invariant(updateOpTime);
return updateOpTime.get();
})
- .until([token](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token);
- })
+ .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancelationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<repl::OpTime>
@@ -596,17 +582,17 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable(
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([token](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token);
- })
+ .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancelationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWriteConcern(
- std::shared_ptr<executor::ScopedTaskExecutor> executor, repl::OpTime opTime) {
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ repl::OpTime opTime,
+ const CancelationToken& token) {
return WaitForMajorityService::get(_serviceContext)
- .waitUntilMajority(std::move(opTime), CancelationToken::uncancelable())
+ .waitUntilMajority(std::move(opTime), token)
.thenRunOn(**executor)
.then([this, self = shared_from_this()] {
stdx::lock_guard<Latch> lg(_mutex);
@@ -662,7 +648,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipi
});
});
})
- .until([token](Status status) { return shouldStopSendingRecipientCommand(status, token); })
+ .until([token](Status status) { return shouldStopSendingRecipientCommand(status); })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -722,7 +708,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget
SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- const CancelationToken& serviceToken) noexcept {
+ const CancelationToken& token) noexcept {
{
stdx::lock_guard<Latch> lg(_mutex);
if (!_stateDoc.getMigrationStart()) {
@@ -732,7 +718,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet();
- _abortMigrationSource = CancelationSource(serviceToken);
+ _abortMigrationSource = CancelationSource(token);
+
{
stdx::lock_guard<Latch> lg(_mutex);
setPromiseOkIfNotReady(lg, _migrationCancelablePromise);
@@ -743,30 +730,31 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingDonatingCount();
return ExecutorFuture(**executor)
- .then([this, self = shared_from_this(), executor, serviceToken] {
- return _enterAbortingIndexBuildsState(
- executor, serviceToken, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor] {
+ return _enterAbortingIndexBuildsState(executor, _abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), serviceToken] {
- _abortIndexBuilds(serviceToken, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor] {
+ _abortIndexBuilds(_abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
return _fetchAndStoreRecipientClusterTimeKeyDocs(
- executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token());
+ executor, recipientTargeterRS, _abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), executor, serviceToken] {
- return _enterDataSyncState(executor, serviceToken, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor] {
+ return _enterDataSyncState(executor, _abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
return _waitForRecipientToBecomeConsistentAndEnterBlockingState(
- executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token());
+ executor, recipientTargeterRS, _abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
return _waitForRecipientToReachBlockTimestampAndEnterCommittedState(
- executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token());
+ executor, recipientTargeterRS, _abortMigrationSource.token());
})
- .onError([this, self = shared_from_this(), executor, serviceToken](Status status) {
- return _handleErrorOrEnterAbortedState(executor, serviceToken, status);
+ // Note from here on the migration cannot be aborted, so only the token from the primary
+ // only service should be used.
+ .onError([this, self = shared_from_this(), executor, token](Status status) {
+ return _handleErrorOrEnterAbortedState(executor, token, status);
})
.onCompletion([this, self = shared_from_this()](Status status) {
LOGV2(5006601,
@@ -788,13 +776,18 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
}
}
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ .then([this, self = shared_from_this(), executor, token, recipientTargeterRS] {
return _waitForForgetMigrationThenMarkMigrationGarbageCollectable(
- executor, recipientTargeterRS, serviceToken);
+ executor, recipientTargeterRS, token);
})
.onCompletion([this,
self = shared_from_this(),
+ token,
scopedCounter{std::move(scopedOutstandingMigrationCounter)}](Status status) {
+ // Don't set the completion promise if the instance has been canceled. We assume
+ // whatever canceled the token will also set the promise with an appropriate error.
+ checkForTokenInterrupt(token);
+
stdx::lock_guard<Latch> lg(_mutex);
LOGV2(4920400,
@@ -809,9 +802,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterAbortingIndexBuildsState(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancelationToken& serviceToken,
- const CancelationToken& instanceToken) {
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancelationToken& token) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kUninitialized) {
@@ -820,12 +811,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterAbortingIndexB
}
// Enter "abortingIndexBuilds" state.
- return _insertStateDoc(executor, instanceToken)
- .then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
- // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
- // use its base PrimaryOnlyService's cancelation source to pass tokens
- // in calls to WaitForMajorityService::waitUntilMajority.
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ return _insertStateDoc(executor, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
})
.then([this, self = shared_from_this()] {
auto opCtxHolder = cc().makeOperationContext();
@@ -834,8 +822,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterAbortingIndexB
});
}
-void TenantMigrationDonorService::Instance::_abortIndexBuilds(
- const CancelationToken& serviceToken, const CancelationToken& instanceToken) {
+void TenantMigrationDonorService::Instance::_abortIndexBuilds(const CancelationToken& token) {
+ checkForTokenInterrupt(token);
+
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kAbortingIndexBuilds) {
@@ -843,8 +832,6 @@ void TenantMigrationDonorService::Instance::_abortIndexBuilds(
}
}
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
// Before starting data sync, abort any in-progress index builds. No new index
// builds can start while we are doing this because the mtab prevents it.
{
@@ -859,8 +846,7 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancelationToken& serviceToken,
- const CancelationToken& instanceToken) {
+ const CancelationToken& token) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kAbortingIndexBuilds) {
@@ -868,18 +854,10 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
}
}
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
- return AsyncTry([this,
- self = shared_from_this(),
- executor,
- recipientTargeterRS,
- serviceToken,
- instanceToken] {
- return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken)
+ return AsyncTry([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
+ return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, token)
.thenRunOn(**executor)
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken](
- HostAndPort host) {
+ .then([this, self = shared_from_this(), executor, token](HostAndPort host) {
pauseTenantMigrationBeforeFetchingKeys.pauseWhileSet();
const auto nss = NamespaceString::kKeysCollectionNamespace;
@@ -939,10 +917,10 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
{
stdx::lock_guard<Latch> lg(_mutex);
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
- uassert(ErrorCodes::Interrupted,
- "Donor service interrupted",
- !serviceToken.isCanceled());
+ // Note the fetcher cannot be canceled via token, so this check for
+ // interrupt is required otherwise stepdown/shutdown could block waiting
+ // for the fetcher to complete.
+ checkForTokenInterrupt(token);
_recipientKeysFetcher = fetcher;
}
@@ -962,17 +940,13 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
return keyDocs;
})
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken](
- auto keyDocs) {
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
+ .then([this, self = shared_from_this(), executor, token](auto keyDocs) {
+ checkForTokenInterrupt(token);
return tenant_migration_util::storeExternalClusterTimeKeyDocs(
std::move(keyDocs));
})
- .then([this, self = shared_from_this(), serviceToken, instanceToken](
- repl::OpTime lastKeyOpTime) {
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
+ .then([this, self = shared_from_this(), token](repl::OpTime lastKeyOpTime) {
pauseTenantMigrationDonorBeforeWaitingForKeysToReplicate.pauseWhileSet();
auto votingMembersWriteConcern =
@@ -982,21 +956,16 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
auto writeConcernFuture = repl::ReplicationCoordinator::get(_serviceContext)
->awaitReplicationAsyncNoWTimeout(
lastKeyOpTime, votingMembersWriteConcern);
- return future_util::withCancelation(std::move(writeConcernFuture),
- instanceToken);
+ return future_util::withCancelation(std::move(writeConcernFuture), token);
});
})
- .until([instanceToken](Status status) {
- return shouldStopFetchingRecipientClusterTimeKeyDocs(status, instanceToken);
- })
+ .until([](Status status) { return shouldStopFetchingRecipientClusterTimeKeyDocs(status); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, CancelationToken::uncancelable());
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancelationToken& serviceToken,
- const CancelationToken& instanceToken) {
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancelationToken& token) {
pauseTenantMigrationAfterFetchingAndStoringKeys.pauseWhileSet();
{
stdx::lock_guard<Latch> lg(_mutex);
@@ -1005,17 +974,12 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_enterDataSyncState(
}
}
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
pauseTenantMigrationBeforeLeavingAbortingIndexBuildsState.pauseWhileSet();
// Enter "dataSync" state.
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, instanceToken)
- .then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
- // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
- // use its base PrimaryOnlyService's cancelation source to pass tokens
- // in calls to WaitForMajorityService::waitUntilMajority.
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kDataSync, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
});
}
@@ -1023,8 +987,7 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnterBlockingState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancelationToken& serviceToken,
- const CancelationToken& instanceToken) {
+ const CancelationToken& token) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) {
@@ -1032,28 +995,17 @@ TenantMigrationDonorService::Instance::_waitForRecipientToBecomeConsistentAndEnt
}
}
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
- return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, instanceToken)
+ return _sendRecipientSyncDataCommand(executor, recipientTargeterRS, token)
.then([this, self = shared_from_this()] {
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
pauseTenantMigrationBeforeLeavingDataSyncState.pauseWhileSet(opCtx);
})
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken] {
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
+ .then([this, self = shared_from_this(), executor, token] {
// Enter "blocking" state.
- return _updateStateDoc(
- executor, TenantMigrationDonorStateEnum::kBlocking, instanceToken)
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken](
- repl::OpTime opTime) {
- // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
- // use its base PrimaryOnlyService's cancelation source to pass tokens
- // in calls to WaitForMajorityService::waitUntilMajority.
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
});
});
}
@@ -1062,36 +1014,30 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAndEnterCommittedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancelationToken& serviceToken,
- const CancelationToken& instanceToken) {
+ const CancelationToken& token) {
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kBlocking) {
return ExecutorFuture(**executor);
}
- }
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
- {
- stdx::lock_guard<Latch> lg(_mutex);
invariant(_stateDoc.getBlockTimestamp());
}
// Source to cancel the timeout if the operation completed in time.
CancelationSource cancelTimeoutSource;
+ CancelationSource recipientSyncDataSource(token);
auto deadlineReachedFuture =
(*executor)->sleepFor(Milliseconds(repl::tenantMigrationBlockingStateTimeoutMS.load()),
cancelTimeoutSource.token());
- std::vector<ExecutorFuture<void>> futures;
- futures.push_back(std::move(deadlineReachedFuture));
- futures.push_back(_sendRecipientSyncDataCommand(executor, recipientTargeterRS, instanceToken));
-
- return whenAny(std::move(futures))
+ return whenAny(std::move(deadlineReachedFuture),
+ _sendRecipientSyncDataCommand(
+ executor, recipientTargeterRS, recipientSyncDataSource.token()))
.thenRunOn(**executor)
- .then([this, cancelTimeoutSource, self = shared_from_this()](auto result) mutable {
+ .then([this, self = shared_from_this(), cancelTimeoutSource, recipientSyncDataSource](
+ auto result) mutable {
const auto& [status, idx] = result;
if (idx == 0) {
@@ -1099,7 +1045,7 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd
"Tenant migration blocking stage timeout expired",
"timeoutMs"_attr = repl::tenantMigrationGarbageCollectionDelayMS.load());
// Deadline reached, cancel the pending '_sendRecipientSyncDataCommand()'...
- _abortMigrationSource.cancel();
+ recipientSyncDataSource.cancel();
// ...and return error.
uasserted(ErrorCodes::ExceededTimeLimit, "Blocking state timeout expired");
} else if (idx == 1) {
@@ -1133,32 +1079,25 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd
uasserted(ErrorCodes::InternalError, "simulate a tenant migration error");
}
})
- .then([this, self = shared_from_this(), executor, serviceToken, instanceToken] {
- checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
-
+ .then([this, self = shared_from_this(), executor, token] {
// Enter "commit" state.
- return _updateStateDoc(
- executor, TenantMigrationDonorStateEnum::kCommitted, serviceToken)
- .then(
- [this, self = shared_from_this(), executor, serviceToken](repl::OpTime opTime) {
- // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
- // use its base PrimaryOnlyService's cancelation source to pass tokens
- // in calls to WaitForMajorityService::waitUntilMajority.
- return _waitForMajorityWriteConcern(executor, std::move(opTime))
- .then([this, self = shared_from_this()] {
- stdx::lock_guard<Latch> lg(_mutex);
- // If interrupt is called at some point during execution, it is
- // possible that interrupt() will fulfill the promise before we
- // do.
- setPromiseOkIfNotReady(lg, _decisionPromise);
- });
- });
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token)
+ .then([this, self = shared_from_this()] {
+ stdx::lock_guard<Latch> lg(_mutex);
+ // If interrupt is called at some point during execution, it is
+ // possible that interrupt() will fulfill the promise before we
+ // do.
+ setPromiseOkIfNotReady(lg, _decisionPromise);
+ });
+ });
});
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterAbortedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancelationToken& serviceToken,
+ const CancelationToken& token,
Status status) {
{
stdx::lock_guard<Latch> lg(_mutex);
@@ -1168,6 +1107,10 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA
}
}
+ if (_abortMigrationSource.token().isCanceled()) {
+ status = Status(ErrorCodes::TenantMigrationAborted, "Aborted due to donorAbortMigration.");
+ }
+
auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
_serviceContext, _tenantId);
if (status == ErrorCodes::ConflictingOperationInProgress || !mtab) {
@@ -1187,9 +1130,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA
} else {
// Enter "abort" state.
_abortReason.emplace(status);
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted, serviceToken)
- .then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime))
+ return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted, token)
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token)
.then([this, self = shared_from_this()] {
stdx::lock_guard<Latch> lg(_mutex);
// If interrupt is called at some point during execution, it is
@@ -1204,7 +1147,7 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationGarbageCollectable(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancelationToken& serviceToken) {
+ const CancelationToken& token) {
auto expiredAt = [&]() {
stdx::lock_guard<Latch> lg(_mutex);
return _stateDoc.getExpireAt();
@@ -1221,15 +1164,14 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
// Wait for the donorForgetMigration command.
// If donorAbortMigration has already canceled work, the abortMigrationSource would be
// canceled and continued usage of the source would lead to incorrect behavior. Thus, we
- // need to use the serviceToken after the migration has reached a decision state in
- // order to continue work, such as sending donorForgetMigration, successfully.
+ // need to use the token after the migration has reached a decision state in order to continue
+ // work, such as sending donorForgetMigration, successfully.
return std::move(_receiveDonorForgetMigrationPromise.getFuture())
.thenRunOn(**executor)
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
- return _sendRecipientForgetMigrationCommand(
- executor, recipientTargeterRS, serviceToken);
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
+ return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS, token);
})
- .then([this, self = shared_from_this(), executor, serviceToken] {
+ .then([this, self = shared_from_this(), executor, token] {
// Note marking the keys as garbage collectable is not atomic with marking the
// state document garbage collectable, so an interleaved failover can lead the
// keys to be deleted before the state document has an expiration date. This is
@@ -1239,13 +1181,13 @@ TenantMigrationDonorService::Instance::_waitForForgetMigrationThenMarkMigrationG
executor,
_donorService->getInstanceCleanupExecutor(),
_migrationUuid,
- serviceToken);
+ token);
})
- .then([this, self = shared_from_this(), executor, serviceToken] {
- return _markStateDocAsGarbageCollectable(executor, serviceToken);
+ .then([this, self = shared_from_this(), executor, token] {
+ return _markStateDocAsGarbageCollectable(executor, token);
})
- .then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime));
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
});
}