summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Zhang <jason.zhang@mongodb.com>2021-01-28 17:58:14 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-08 23:29:36 +0000
commit3120087175678ec7a61a6d12cd9326ba8cfa2d45 (patch)
tree70fa55d784662f05ae747d1724b872cec92832df
parent8e177a8c5eef5aedc0c4fdf08d6e764389907cb6 (diff)
downloadmongo-3120087175678ec7a61a6d12cd9326ba8cfa2d45.tar.gz
SERVER-54114 SERVER-54039 Pass cancelation token throughout TenantMigrationDonorService
-rw-r--r--jstests/replsets/tenant_migration_abort_forget_retry.js132
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp208
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h21
3 files changed, 220 insertions, 141 deletions
diff --git a/jstests/replsets/tenant_migration_abort_forget_retry.js b/jstests/replsets/tenant_migration_abort_forget_retry.js
index ddb9ad43358..40b97901c71 100644
--- a/jstests/replsets/tenant_migration_abort_forget_retry.js
+++ b/jstests/replsets/tenant_migration_abort_forget_retry.js
@@ -1,7 +1,8 @@
/**
- * Starts a tenant migration that aborts, and then issues a donorForgetMigration command. Finally,
- * starts a second tenant migration with the same tenantId as the aborted migration, and expects
- * this second migration to go through.
+ * Starts a tenant migration that aborts, either due to the
+ * abortTenantMigrationBeforeLeavingBlockingState failpoint or due to receiving donorAbortMigration,
+ * and then issues a donorForgetMigration command. Finally, starts a second tenant migration with
+ * the same tenantId as the aborted migration, and expects this second migration to go through.
*
* @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_windows_tls]
*/
@@ -10,8 +11,17 @@
"use strict";
load("jstests/libs/fail_point_util.js");
+load("jstests/libs/parallelTester.js");
load("jstests/libs/uuid_util.js");
load("jstests/replsets/libs/tenant_migration_test.js");
+load("jstests/replsets/libs/tenant_migration_util.js");
+
+const kTenantIdPrefix = "testTenantId";
+let testNum = 0;
+
+function makeTenantId() {
+ return kTenantIdPrefix + testNum++;
+}
const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
if (!tenantMigrationTest.isFeatureFlagEnabled()) {
@@ -19,36 +29,90 @@ if (!tenantMigrationTest.isFeatureFlagEnabled()) {
return;
}
-const donorPrimary = tenantMigrationTest.getDonorPrimary();
-
-const tenantId = "testTenantId";
-
-const migrationId1 = extractUUIDFromObject(UUID());
-const migrationId2 = extractUUIDFromObject(UUID());
-
-// Start a migration with the "abortTenantMigrationBeforeLeavingBlockingState" failPoint enabled.
-// The migration will abort as a result, and a status of "kAborted" should be returned.
-jsTestLog("Starting a migration that is expected to abort. migrationId: " + migrationId1 +
- ", tenantId: " + tenantId);
-const abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationBeforeLeavingBlockingState");
-const abortRes = assert.commandWorked(
- tenantMigrationTest.runMigration({migrationIdString: migrationId1, tenantId},
- false /* retryOnRetryableErrors */,
- false /* automaticForgetMigration */));
-assert.eq(abortRes.state, TenantMigrationTest.State.kAborted);
-abortFp.off();
-
-// Forget the aborted migration.
-jsTestLog("Forgetting aborted migration with migrationId: " + migrationId1);
-assert.commandWorked(tenantMigrationTest.forgetMigration(migrationId1));
-
-// Try running a new migration with the same tenantId. It should succeed, since the previous
-// migration with the same tenantId was aborted.
-jsTestLog("Attempting to run a new migration with the same tenantId. New migrationId: " +
- migrationId2 + ", tenantId: " + tenantId);
-const commitRes = assert.commandWorked(
- tenantMigrationTest.runMigration({migrationIdString: migrationId2, tenantId}));
-assert.eq(commitRes.state, TenantMigrationTest.State.kCommitted);
+(() => {
+ const migrationId1 = extractUUIDFromObject(UUID());
+ const migrationId2 = extractUUIDFromObject(UUID());
+ const tenantId = makeTenantId();
-tenantMigrationTest.stop();
+ // Start a migration with the "abortTenantMigrationBeforeLeavingBlockingState" failPoint
+ // enabled. The migration will abort as a result, and a status of "kAborted" should be returned.
+ jsTestLog(
+ "Starting a migration that is expected to abort due to setting abortTenantMigrationBeforeLeavingBlockingState failpoint. migrationId: " +
+ migrationId1 + ", tenantId: " + tenantId);
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
+ const abortFp =
+ configureFailPoint(donorPrimary, "abortTenantMigrationBeforeLeavingBlockingState");
+ const abortRes = assert.commandWorked(
+ tenantMigrationTest.runMigration({migrationIdString: migrationId1, tenantId: tenantId},
+ false /* retryOnRetryableErrors */,
+ false /* automaticForgetMigration */));
+ assert.eq(abortRes.state, TenantMigrationTest.State.kAborted);
+ abortFp.off();
+
+ // Forget the aborted migration.
+ jsTestLog("Forgetting aborted migration with migrationId: " + migrationId1);
+ assert.commandWorked(tenantMigrationTest.forgetMigration(migrationId1));
+
+ // Try running a new migration with the same tenantId. It should succeed, since the previous
+ // migration with the same tenantId was aborted.
+ jsTestLog("Attempting to run a new migration with the same tenantId. New migrationId: " +
+ migrationId2 + ", tenantId: " + tenantId);
+ const commitRes = assert.commandWorked(
+ tenantMigrationTest.runMigration({migrationIdString: migrationId2, tenantId: tenantId}));
+ assert.eq(commitRes.state, TenantMigrationTest.State.kCommitted);
+})();
+
+(() => {
+ const migrationId1 = extractUUIDFromObject(UUID());
+ const migrationId2 = extractUUIDFromObject(UUID());
+ const tenantId = makeTenantId();
+
+ jsTestLog(
+ "Starting a migration that is expected to abort in blocking state due to receiving donorAbortMigration. migrationId: " +
+ migrationId1 + ", tenantId: " + tenantId);
+
+ const donorPrimary = tenantMigrationTest.getDonorPrimary();
+ let fp = configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeLeavingBlockingState");
+ assert.commandWorked(
+ tenantMigrationTest.startMigration({migrationIdString: migrationId1, tenantId: tenantId}));
+
+ fp.wait();
+
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst());
+ const tryAbortThread = new Thread(TenantMigrationUtil.tryAbortMigrationAsync,
+ {migrationIdString: migrationId1, tenantId: tenantId},
+ donorRstArgs,
+ TenantMigrationUtil.runTenantMigrationCommand);
+ tryAbortThread.start();
+
+ // Wait for donorAbortMigration command to start.
+ assert.soon(() => {
+ const res = assert.commandWorked(donorPrimary.adminCommand(
+ {currentOp: true, desc: "tenant donor migration", tenantId: tenantId}));
+ return res.inprog[0].receivedCancelation;
+ });
+
+ fp.off();
+
+ tryAbortThread.join();
+ assert.commandWorked(tryAbortThread.returnData());
+
+ const stateRes = assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(
+ {migrationIdString: migrationId1, tenantId: tenantId}));
+ assert.eq(stateRes.state, TenantMigrationTest.State.kAborted);
+
+ // Forget the aborted migration.
+ jsTestLog("Forgetting aborted migration with migrationId: " + migrationId1);
+ assert.commandWorked(tenantMigrationTest.forgetMigration(migrationId1));
+
+ // Try running a new migration with the same tenantId. It should succeed, since the previous
+ // migration with the same tenantId was aborted.
+ jsTestLog("Attempting to run a new migration with the same tenantId. New migrationId: " +
+ migrationId2 + ", tenantId: " + tenantId);
+ const commitRes = assert.commandWorked(
+ tenantMigrationTest.runMigration({migrationIdString: migrationId2, tenantId: tenantId}));
+ assert.eq(commitRes.state, TenantMigrationTest.State.kCommitted);
})();
+
+tenantMigrationTest.stop();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index fc58789db3a..af57ada365c 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -88,12 +88,12 @@ bool shouldStopSendingRecipientCommand(Status status, const CancelationToken& to
return status.isOK() || !ErrorCodes::isRetriableError(status) || token.isCanceled();
}
-void checkIfReceivedDonorAbortMigration(const CancelationToken& parent,
- const CancelationToken& instance) {
+void checkIfReceivedDonorAbortMigration(const CancelationToken& serviceToken,
+ const CancelationToken& instanceToken) {
// If only the instance token was canceled, then we must have gotten donorAbortMigration.
uassert(ErrorCodes::TenantMigrationAborted,
"Migration aborted due to receiving donorAbortMigration.",
- !instance.isCanceled() || parent.isCanceled());
+ !instanceToken.isCanceled() || serviceToken.isCanceled());
}
} // namespace
@@ -235,7 +235,7 @@ boost::optional<BSONObj> TenantMigrationDonorService::Instance::reportForCurrent
BSONObjBuilder bob;
bob.append("desc", "tenant donor migration");
bob.append("migrationCompleted", _completionPromise.getFuture().isReady());
- bob.append("receivedCancelation", _instanceCancelationSource.token().isCanceled());
+ bob.append("receivedCancelation", _abortMigrationSource.token().isCanceled());
bob.append("instanceID", _stateDoc.getId().toBSON());
bob.append("tenantId", _stateDoc.getTenantId());
bob.append("recipientConnectionString", _stateDoc.getRecipientConnectionString());
@@ -283,7 +283,7 @@ TenantMigrationDonorService::Instance::getDurableState(OperationContext* opCtx)
}
void TenantMigrationDonorService::Instance::onReceiveDonorAbortMigration() {
- _instanceCancelationSource.cancel();
+ _abortMigrationSource.cancel();
stdx::lock_guard<Latch> lg(_mutex);
if (auto fetcher = _recipientKeysFetcher.lock()) {
@@ -319,9 +319,9 @@ ExecutorFuture<void>
TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancelationToken& token) {
- return recipientTargeterRS
- ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token())
+ const CancelationToken& serviceToken,
+ const CancelationToken& instanceToken) {
+ return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken)
.thenRunOn(**executor)
.then([this, self = shared_from_this(), executor](HostAndPort host) {
const auto nss = NamespaceString::kKeysCollectionNamespace;
@@ -396,16 +396,17 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs
return keyDocs;
})
- .then([this, self = shared_from_this(), executor, token](auto keyDocs) {
- checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
+ .then(
+ [this, self = shared_from_this(), executor, serviceToken, instanceToken](auto keyDocs) {
+ checkIfReceivedDonorAbortMigration(serviceToken, instanceToken);
- tenant_migration_util::storeExternalClusterTimeKeyDocsAndRefreshCache(
- executor, std::move(keyDocs), _instanceCancelationSource.token());
- });
+ tenant_migration_util::storeExternalClusterTimeKeyDocsAndRefreshCache(
+ executor, std::move(keyDocs), instanceToken);
+ });
}
ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDoc(
- std::shared_ptr<executor::ScopedTaskExecutor> executor) {
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token) {
invariant(_stateDoc.getState() == TenantMigrationDonorStateEnum::kUninitialized);
_stateDoc.setState(TenantMigrationDonorStateEnum::kDataSync);
@@ -430,9 +431,8 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([this, self = shared_from_this()](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopInsertingDonorStateDoc(swOpTime.getStatus(),
- _instanceCancelationSource.token());
+ .until([token](StatusWith<repl::OpTime> swOpTime) {
+ return shouldStopInsertingDonorStateDoc(swOpTime.getStatus(), token);
})
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, CancelationToken::uncancelable());
@@ -440,7 +440,8 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState
ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDoc(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- const TenantMigrationDonorStateEnum nextState) {
+ const TenantMigrationDonorStateEnum nextState,
+ const CancelationToken& token) {
const auto originalStateDocBson = _stateDoc.toBSON();
return AsyncTry([this, self = shared_from_this(), executor, nextState, originalStateDocBson] {
@@ -525,9 +526,8 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState
invariant(updateOpTime);
return updateOpTime.get();
})
- .until([this, self = shared_from_this()](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(),
- _instanceCancelationSource.token());
+ .until([token](StatusWith<repl::OpTime> swOpTime) {
+ return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token);
})
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, CancelationToken::uncancelable());
@@ -535,7 +535,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState
ExecutorFuture<repl::OpTime>
TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable(
- std::shared_ptr<executor::ScopedTaskExecutor> executor) {
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token) {
_stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()});
return AsyncTry([this, self = shared_from_this()] {
@@ -560,9 +560,8 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable(
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([this, self = shared_from_this()](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(),
- _instanceCancelationSource.token());
+ .until([token](StatusWith<repl::OpTime> swOpTime) {
+ return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token);
})
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, CancelationToken::uncancelable());
@@ -598,44 +597,46 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWrit
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const BSONObj& cmdObj) {
- return AsyncTry([this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj] {
- return recipientTargeterRS
- ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token())
- .thenRunOn(**executor)
- .then([this, self = shared_from_this(), executor, cmdObj](auto recipientHost) {
- executor::RemoteCommandRequest request(std::move(recipientHost),
- NamespaceString::kAdminDb.toString(),
- std::move(cmdObj),
- rpc::makeEmptyMetadata(),
- nullptr,
- kRecipientSyncDataTimeout);
- request.sslMode = _sslMode;
-
- return (_recipientCmdExecutor)
- ->scheduleRemoteCommand(std::move(request),
- _instanceCancelationSource.token())
- .then([this, self = shared_from_this()](const auto& response) -> Status {
- if (!response.isOK()) {
- return response.status;
- }
- auto commandStatus = getStatusFromCommandResult(response.data);
- commandStatus.addContext(
- "Tenant migration recipient command failed");
- return commandStatus;
- });
- });
- })
- .until([this, self = shared_from_this()](Status status) {
- return shouldStopSendingRecipientCommand(status, _instanceCancelationSource.token());
- })
+ const BSONObj& cmdObj,
+ const CancelationToken& token) {
+ return AsyncTry(
+ [this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj, token] {
+ return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, token)
+ .thenRunOn(**executor)
+ .then([this, self = shared_from_this(), executor, cmdObj, token](
+ auto recipientHost) {
+ executor::RemoteCommandRequest request(
+ std::move(recipientHost),
+ NamespaceString::kAdminDb.toString(),
+ std::move(cmdObj),
+ rpc::makeEmptyMetadata(),
+ nullptr,
+ kRecipientSyncDataTimeout);
+ request.sslMode = _sslMode;
+
+ return (_recipientCmdExecutor)
+ ->scheduleRemoteCommand(std::move(request), token)
+ .then([this,
+ self = shared_from_this()](const auto& response) -> Status {
+ if (!response.isOK()) {
+ return response.status;
+ }
+ auto commandStatus = getStatusFromCommandResult(response.data);
+ commandStatus.addContext(
+ "Tenant migration recipient command failed");
+ return commandStatus;
+ });
+ });
+ })
+ .until([token](Status status) { return shouldStopSendingRecipientCommand(status, token); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, _instanceCancelationSource.token());
+ .on(**executor, token);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) {
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
+ const CancelationToken& token) {
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
@@ -658,12 +659,13 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa
return request.toBSON(BSONObj());
}();
- return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj);
+ return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj, token);
}
ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) {
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
+ const CancelationToken& token) {
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
@@ -681,24 +683,24 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget
commonData.setRecipientCertificateForDonor(_stateDoc.getRecipientCertificateForDonor());
request.setMigrationRecipientCommonData(commonData);
- return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()));
+ return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token);
}
SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- const CancelationToken& token) noexcept {
- _instanceCancelationSource = CancelationSource(token);
+ const CancelationToken& serviceToken) noexcept {
+ _abortMigrationSource = CancelationSource(serviceToken);
auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>(
_recipientUri.getSetName(), _recipientUri.getServers());
return ExecutorFuture<void>(**executor)
- .then([this, self = shared_from_this(), executor, token] {
+ .then([this, self = shared_from_this(), executor, serviceToken] {
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kUninitialized) {
return ExecutorFuture<void>(**executor, Status::OK());
}
// Enter "dataSync" state.
- return _insertStateDoc(executor)
+ return _insertStateDoc(executor, _abortMigrationSource.token())
.then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
// TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
// use its base PrimaryOnlyService's cancelation source to pass tokens
@@ -711,17 +713,18 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
pauseTenantMigrationAfterPersitingInitialDonorStateDoc.pauseWhileSet(opCtx);
});
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
- checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token());
- return _fetchAndStoreRecipientClusterTimeKeyDocs(executor, recipientTargeterRS, token);
+ return _fetchAndStoreRecipientClusterTimeKeyDocs(
+ executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token());
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) {
return ExecutorFuture<void>(**executor, Status::OK());
}
- checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
+ checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token());
// Before starting data sync, abort any in-progress index builds. No new index
// builds can start while we are doing this because the mtab prevents it.
@@ -733,42 +736,42 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
opCtx, _stateDoc.getTenantId(), "tenant migration");
}
- return _sendRecipientSyncDataCommand(executor, recipientTargeterRS)
+ return _sendRecipientSyncDataCommand(
+ executor, recipientTargeterRS, _abortMigrationSource.token())
.then([this, self = shared_from_this()] {
auto opCtxHolder = cc().makeOperationContext();
auto opCtx = opCtxHolder.get();
pauseTenantMigrationBeforeLeavingDataSyncState.pauseWhileSet(opCtx);
})
- .then([this, self = shared_from_this(), executor, token] {
- checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
+ .then([this, self = shared_from_this(), executor, serviceToken] {
+ checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token());
// Enter "blocking" state.
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking)
- .then([this, self = shared_from_this(), executor, token](
+ return _updateStateDoc(executor,
+ TenantMigrationDonorStateEnum::kBlocking,
+ _abortMigrationSource.token())
+ .then([this, self = shared_from_this(), executor, serviceToken](
repl::OpTime opTime) {
// TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
// use its base PrimaryOnlyService's cancelation source to pass tokens
// in calls to WaitForMajorityService::waitUntilMajority.
- checkIfReceivedDonorAbortMigration(token,
- _instanceCancelationSource.token());
+ checkIfReceivedDonorAbortMigration(serviceToken,
+ _abortMigrationSource.token());
return _waitForMajorityWriteConcern(executor, std::move(opTime));
});
});
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kBlocking) {
return ExecutorFuture<void>(**executor, Status::OK());
}
- checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
+ checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token());
invariant(_stateDoc.getBlockTimestamp());
// Source to cancel the timeout if the operation completed in time.
CancelationSource cancelTimeoutSource;
- // Source to cancel if the timeout expires before completion, as a child of parent
- // token.
- CancelationSource recipientSyncDataCommandCancelSource(token);
auto deadlineReachedFuture = (*executor)->sleepFor(
Milliseconds(repl::tenantMigrationBlockingStateTimeoutMS.load()),
@@ -776,13 +779,12 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::vector<ExecutorFuture<void>> futures;
futures.push_back(std::move(deadlineReachedFuture));
- futures.push_back(_sendRecipientSyncDataCommand(executor, recipientTargeterRS));
+ futures.push_back(_sendRecipientSyncDataCommand(
+ executor, recipientTargeterRS, _abortMigrationSource.token()));
return whenAny(std::move(futures))
.thenRunOn(**executor)
- .then([cancelTimeoutSource,
- recipientSyncDataCommandCancelSource,
- self = shared_from_this()](auto result) mutable {
+ .then([this, cancelTimeoutSource, self = shared_from_this()](auto result) mutable {
const auto& [status, idx] = result;
if (idx == 0) {
@@ -791,7 +793,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
"timeoutMs"_attr =
repl::tenantMigrationGarbageCollectionDelayMS.load());
// Deadline reached, cancel the pending '_sendRecipientSyncDataCommand()'...
- recipientSyncDataCommandCancelSource.cancel();
+ _abortMigrationSource.cancel();
// ...and return error.
uasserted(ErrorCodes::ExceededTimeLimit, "Blocking state timeout expired");
} else if (idx == 1) {
@@ -829,12 +831,13 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
"simulate a tenant migration error");
});
})
- .then([this, self = shared_from_this(), executor, token] {
- checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token());
+ .then([this, self = shared_from_this(), executor, serviceToken] {
+ checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token());
// Enter "commit" state.
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted)
- .then([this, self = shared_from_this(), executor, token](
+ return _updateStateDoc(
+ executor, TenantMigrationDonorStateEnum::kCommitted, serviceToken)
+ .then([this, self = shared_from_this(), executor, serviceToken](
repl::OpTime opTime) {
// TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should
// use its base PrimaryOnlyService's cancelation source to pass tokens
@@ -852,7 +855,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
});
});
})
- .onError([this, self = shared_from_this(), executor](Status status) {
+ .onError([this, self = shared_from_this(), executor, serviceToken](Status status) {
if (_stateDoc.getState() == TenantMigrationDonorStateEnum::kAborted) {
// The migration was resumed on stepup and it was already aborted.
return ExecutorFuture<void>(**executor, Status::OK());
@@ -871,7 +874,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
} else {
// Enter "abort" state.
_abortReason.emplace(status);
- return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted)
+ return _updateStateDoc(
+ executor, TenantMigrationDonorStateEnum::kAborted, serviceToken)
.then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
return _waitForMajorityWriteConcern(executor, std::move(opTime))
.then([this, self = shared_from_this()] {
@@ -893,7 +897,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
"status"_attr = status,
"abortReason"_attr = _abortReason);
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
if (_stateDoc.getExpireAt()) {
// The migration state has already been marked as garbage collectable. Set the
// donorForgetMigration promise here since the Instance's destructor has an
@@ -903,13 +907,19 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
}
// Wait for the donorForgetMigration command.
+ // If donorAbortMigration has already canceled work, the abortMigrationSource would be
+ // canceled and continued usage of the source would lead to incorrect behavior. Thus, we
+ // need to use the serviceToken after the migration has reached a decision state in
+ // order to continue work, such as sending donorForgetMigration, successfully.
return std::move(_receiveDonorForgetMigrationPromise.getFuture())
.thenRunOn(**executor)
- .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
- return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS);
- })
- .then([this, self = shared_from_this(), executor] {
- return _markStateDocAsGarbageCollectable(executor);
+ .then(
+ [this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] {
+ return _sendRecipientForgetMigrationCommand(
+ executor, recipientTargeterRS, serviceToken);
+ })
+ .then([this, self = shared_from_this(), executor, serviceToken] {
+ return _markStateDocAsGarbageCollectable(executor, serviceToken);
})
.then([this, self = shared_from_this(), executor](repl::OpTime opTime) {
return _waitForMajorityWriteConcern(executor, std::move(opTime));
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 3c33846cc54..df3d9072528 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -162,14 +162,15 @@ public:
ExecutorFuture<void> _fetchAndStoreRecipientClusterTimeKeyDocs(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const CancelationToken& token);
+ const CancelationToken& serviceToken,
+ const CancelationToken& instanceToken);
/**
* Inserts the state document to _stateDocumentsNS and returns the opTime for the insert
* oplog entry.
*/
ExecutorFuture<repl::OpTime> _insertStateDoc(
- std::shared_ptr<executor::ScopedTaskExecutor> executor);
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token);
/**
* Updates the state document to have the given state. Then, persists the updated document
@@ -179,14 +180,15 @@ public:
*/
ExecutorFuture<repl::OpTime> _updateStateDoc(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- const TenantMigrationDonorStateEnum nextState);
+ const TenantMigrationDonorStateEnum nextState,
+ const CancelationToken& token);
/**
* Sets the "expireAt" time for the state document to be garbage collected, and returns the
* the opTime for the write.
*/
ExecutorFuture<repl::OpTime> _markStateDocAsGarbageCollectable(
- std::shared_ptr<executor::ScopedTaskExecutor> executor);
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token);
/**
* Waits for given opTime to be majority committed.
@@ -200,21 +202,24 @@ public:
ExecutorFuture<void> _sendCommandToRecipient(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
- const BSONObj& cmdObj);
+ const BSONObj& cmdObj,
+ const CancelationToken& token);
/**
* Sends the recipientSyncData command to the recipient replica set.
*/
ExecutorFuture<void> _sendRecipientSyncDataCommand(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS);
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
+ const CancelationToken& token);
/**
* Sends the recipientForgetMigration command to the recipient replica set.
*/
ExecutorFuture<void> _sendRecipientForgetMigrationCommand(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
- std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS);
+ std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS,
+ const CancelationToken& token);
ThreadPool::Limits _getRecipientCmdThreadPoolLimits() const {
ThreadPool::Limits recipientCmdThreadPoolLimits;
@@ -261,7 +266,7 @@ public:
// This CancelationSource is instantiated from CancelationToken that is passed into run().
// It allows for manual cancelation of work from the instance.
- CancelationSource _instanceCancelationSource;
+ CancelationSource _abortMigrationSource;
};
private: