diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2021-01-28 17:58:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-08 23:29:36 +0000 |
commit | 3120087175678ec7a61a6d12cd9326ba8cfa2d45 (patch) | |
tree | 70fa55d784662f05ae747d1724b872cec92832df | |
parent | 8e177a8c5eef5aedc0c4fdf08d6e764389907cb6 (diff) | |
download | mongo-3120087175678ec7a61a6d12cd9326ba8cfa2d45.tar.gz |
SERVER-54114 SERVER-54039 Pass cancelation token throughout TenantMigrationDonorService
-rw-r--r-- | jstests/replsets/tenant_migration_abort_forget_retry.js | 132 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 208 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.h | 21 |
3 files changed, 220 insertions, 141 deletions
diff --git a/jstests/replsets/tenant_migration_abort_forget_retry.js b/jstests/replsets/tenant_migration_abort_forget_retry.js index ddb9ad43358..40b97901c71 100644 --- a/jstests/replsets/tenant_migration_abort_forget_retry.js +++ b/jstests/replsets/tenant_migration_abort_forget_retry.js @@ -1,7 +1,8 @@ /** - * Starts a tenant migration that aborts, and then issues a donorForgetMigration command. Finally, - * starts a second tenant migration with the same tenantId as the aborted migration, and expects - * this second migration to go through. + * Starts a tenant migration that aborts, either due to the + * abortTenantMigrationBeforeLeavingBlockingState failpoint or due to receiving donorAbortMigration, + * and then issues a donorForgetMigration command. Finally, starts a second tenant migration with + * the same tenantId as the aborted migration, and expects this second migration to go through. * * @tags: [requires_fcv_49, requires_majority_read_concern, incompatible_with_windows_tls] */ @@ -10,8 +11,17 @@ "use strict"; load("jstests/libs/fail_point_util.js"); +load("jstests/libs/parallelTester.js"); load("jstests/libs/uuid_util.js"); load("jstests/replsets/libs/tenant_migration_test.js"); +load("jstests/replsets/libs/tenant_migration_util.js"); + +const kTenantIdPrefix = "testTenantId"; +let testNum = 0; + +function makeTenantId() { + return kTenantIdPrefix + testNum++; +} const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); if (!tenantMigrationTest.isFeatureFlagEnabled()) { @@ -19,36 +29,90 @@ if (!tenantMigrationTest.isFeatureFlagEnabled()) { return; } -const donorPrimary = tenantMigrationTest.getDonorPrimary(); - -const tenantId = "testTenantId"; - -const migrationId1 = extractUUIDFromObject(UUID()); -const migrationId2 = extractUUIDFromObject(UUID()); - -// Start a migration with the "abortTenantMigrationBeforeLeavingBlockingState" failPoint enabled. -// The migration will abort as a result, and a status of "kAborted" should be returned. -jsTestLog("Starting a migration that is expected to abort. migrationId: " + migrationId1 + - ", tenantId: " + tenantId); -const abortFp = configureFailPoint(donorPrimary, "abortTenantMigrationBeforeLeavingBlockingState"); -const abortRes = assert.commandWorked( - tenantMigrationTest.runMigration({migrationIdString: migrationId1, tenantId}, - false /* retryOnRetryableErrors */, - false /* automaticForgetMigration */)); -assert.eq(abortRes.state, TenantMigrationTest.State.kAborted); -abortFp.off(); - -// Forget the aborted migration. -jsTestLog("Forgetting aborted migration with migrationId: " + migrationId1); -assert.commandWorked(tenantMigrationTest.forgetMigration(migrationId1)); - -// Try running a new migration with the same tenantId. It should succeed, since the previous -// migration with the same tenantId was aborted. -jsTestLog("Attempting to run a new migration with the same tenantId. New migrationId: " + - migrationId2 + ", tenantId: " + tenantId); -const commitRes = assert.commandWorked( - tenantMigrationTest.runMigration({migrationIdString: migrationId2, tenantId})); -assert.eq(commitRes.state, TenantMigrationTest.State.kCommitted); +(() => { + const migrationId1 = extractUUIDFromObject(UUID()); + const migrationId2 = extractUUIDFromObject(UUID()); + const tenantId = makeTenantId(); -tenantMigrationTest.stop(); + // Start a migration with the "abortTenantMigrationBeforeLeavingBlockingState" failPoint + // enabled. The migration will abort as a result, and a status of "kAborted" should be returned. + jsTestLog( + "Starting a migration that is expected to abort due to setting abortTenantMigrationBeforeLeavingBlockingState failpoint. migrationId: " + + migrationId1 + ", tenantId: " + tenantId); + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + const abortFp = + configureFailPoint(donorPrimary, "abortTenantMigrationBeforeLeavingBlockingState"); + const abortRes = assert.commandWorked( + tenantMigrationTest.runMigration({migrationIdString: migrationId1, tenantId: tenantId}, + false /* retryOnRetryableErrors */, + false /* automaticForgetMigration */)); + assert.eq(abortRes.state, TenantMigrationTest.State.kAborted); + abortFp.off(); + + // Forget the aborted migration. + jsTestLog("Forgetting aborted migration with migrationId: " + migrationId1); + assert.commandWorked(tenantMigrationTest.forgetMigration(migrationId1)); + + // Try running a new migration with the same tenantId. It should succeed, since the previous + // migration with the same tenantId was aborted. + jsTestLog("Attempting to run a new migration with the same tenantId. New migrationId: " + + migrationId2 + ", tenantId: " + tenantId); + const commitRes = assert.commandWorked( + tenantMigrationTest.runMigration({migrationIdString: migrationId2, tenantId: tenantId})); + assert.eq(commitRes.state, TenantMigrationTest.State.kCommitted); +})(); + +(() => { + const migrationId1 = extractUUIDFromObject(UUID()); + const migrationId2 = extractUUIDFromObject(UUID()); + const tenantId = makeTenantId(); + + jsTestLog( + "Starting a migration that is expected to abort in blocking state due to receiving donorAbortMigration. migrationId: " + + migrationId1 + ", tenantId: " + tenantId); + + const donorPrimary = tenantMigrationTest.getDonorPrimary(); + let fp = configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeLeavingBlockingState"); + assert.commandWorked( + tenantMigrationTest.startMigration({migrationIdString: migrationId1, tenantId: tenantId})); + + fp.wait(); + + const donorRstArgs = TenantMigrationUtil.createRstArgs(tenantMigrationTest.getDonorRst()); + const tryAbortThread = new Thread(TenantMigrationUtil.tryAbortMigrationAsync, + {migrationIdString: migrationId1, tenantId: tenantId}, + donorRstArgs, + TenantMigrationUtil.runTenantMigrationCommand); + tryAbortThread.start(); + + // Wait for donorAbortMigration command to start. + assert.soon(() => { + const res = assert.commandWorked(donorPrimary.adminCommand( + {currentOp: true, desc: "tenant donor migration", tenantId: tenantId})); + return res.inprog[0].receivedCancelation; + }); + + fp.off(); + + tryAbortThread.join(); + assert.commandWorked(tryAbortThread.returnData()); + + const stateRes = assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete( + {migrationIdString: migrationId1, tenantId: tenantId})); + assert.eq(stateRes.state, TenantMigrationTest.State.kAborted); + + // Forget the aborted migration. + jsTestLog("Forgetting aborted migration with migrationId: " + migrationId1); + assert.commandWorked(tenantMigrationTest.forgetMigration(migrationId1)); + + // Try running a new migration with the same tenantId. It should succeed, since the previous + // migration with the same tenantId was aborted. + jsTestLog("Attempting to run a new migration with the same tenantId. New migrationId: " + + migrationId2 + ", tenantId: " + tenantId); + const commitRes = assert.commandWorked( + tenantMigrationTest.runMigration({migrationIdString: migrationId2, tenantId: tenantId})); + assert.eq(commitRes.state, TenantMigrationTest.State.kCommitted); })(); + +tenantMigrationTest.stop(); +})();
\ No newline at end of file diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index fc58789db3a..af57ada365c 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -88,12 +88,12 @@ bool shouldStopSendingRecipientCommand(Status status, const CancelationToken& to return status.isOK() || !ErrorCodes::isRetriableError(status) || token.isCanceled(); } -void checkIfReceivedDonorAbortMigration(const CancelationToken& parent, - const CancelationToken& instance) { +void checkIfReceivedDonorAbortMigration(const CancelationToken& serviceToken, + const CancelationToken& instanceToken) { // If only the instance token was canceled, then we must have gotten donorAbortMigration. uassert(ErrorCodes::TenantMigrationAborted, "Migration aborted due to receiving donorAbortMigration.", - !instance.isCanceled() || parent.isCanceled()); + !instanceToken.isCanceled() || serviceToken.isCanceled()); } } // namespace @@ -235,7 +235,7 @@ boost::optional<BSONObj> TenantMigrationDonorService::Instance::reportForCurrent BSONObjBuilder bob; bob.append("desc", "tenant donor migration"); bob.append("migrationCompleted", _completionPromise.getFuture().isReady()); - bob.append("receivedCancelation", _instanceCancelationSource.token().isCanceled()); + bob.append("receivedCancelation", _abortMigrationSource.token().isCanceled()); bob.append("instanceID", _stateDoc.getId().toBSON()); bob.append("tenantId", _stateDoc.getTenantId()); bob.append("recipientConnectionString", _stateDoc.getRecipientConnectionString()); @@ -283,7 +283,7 @@ TenantMigrationDonorService::Instance::getDurableState(OperationContext* opCtx) } void TenantMigrationDonorService::Instance::onReceiveDonorAbortMigration() { - _instanceCancelationSource.cancel(); + _abortMigrationSource.cancel(); stdx::lock_guard<Latch> lg(_mutex); if (auto fetcher = _recipientKeysFetcher.lock()) { @@ -319,9 +319,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancelationToken& token) { - return recipientTargeterRS - ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token()) + const CancelationToken& serviceToken, + const CancelationToken& instanceToken) { + return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken) .thenRunOn(**executor) .then([this, self = shared_from_this(), executor](HostAndPort host) { const auto nss = NamespaceString::kKeysCollectionNamespace; @@ -396,16 +396,17 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs return keyDocs; }) - .then([this, self = shared_from_this(), executor, token](auto keyDocs) { - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + .then( + [this, self = shared_from_this(), executor, serviceToken, instanceToken](auto keyDocs) { + checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - tenant_migration_util::storeExternalClusterTimeKeyDocsAndRefreshCache( - executor, std::move(keyDocs), _instanceCancelationSource.token()); - }); + tenant_migration_util::storeExternalClusterTimeKeyDocsAndRefreshCache( + executor, std::move(keyDocs), instanceToken); + }); } ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDoc( - std::shared_ptr<executor::ScopedTaskExecutor> executor) { + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token) { invariant(_stateDoc.getState() == TenantMigrationDonorStateEnum::kUninitialized); _stateDoc.setState(TenantMigrationDonorStateEnum::kDataSync); @@ -430,9 +431,8 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([this, self = shared_from_this()](StatusWith<repl::OpTime> swOpTime) { - return shouldStopInsertingDonorStateDoc(swOpTime.getStatus(), - _instanceCancelationSource.token()); + .until([token](StatusWith<repl::OpTime> swOpTime) { + return shouldStopInsertingDonorStateDoc(swOpTime.getStatus(), token); }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, CancelationToken::uncancelable()); @@ -440,7 +440,8 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDoc( std::shared_ptr<executor::ScopedTaskExecutor> executor, - const TenantMigrationDonorStateEnum nextState) { + const TenantMigrationDonorStateEnum nextState, + const CancelationToken& token) { const auto originalStateDocBson = _stateDoc.toBSON(); return AsyncTry([this, self = shared_from_this(), executor, nextState, originalStateDocBson] { @@ -525,9 +526,8 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState invariant(updateOpTime); return updateOpTime.get(); }) - .until([this, self = shared_from_this()](StatusWith<repl::OpTime> swOpTime) { - return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), - _instanceCancelationSource.token()); + .until([token](StatusWith<repl::OpTime> swOpTime) { + return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token); }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, CancelationToken::uncancelable()); @@ -535,7 +535,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable( - std::shared_ptr<executor::ScopedTaskExecutor> executor) { + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token) { _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); return AsyncTry([this, self = shared_from_this()] { @@ -560,9 +560,8 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable( return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([this, self = shared_from_this()](StatusWith<repl::OpTime> swOpTime) { - return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), - _instanceCancelationSource.token()); + .until([token](StatusWith<repl::OpTime> swOpTime) { + return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token); }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, CancelationToken::uncancelable()); @@ -598,44 +597,46 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWrit ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const BSONObj& cmdObj) { - return AsyncTry([this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj] { - return recipientTargeterRS - ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token()) - .thenRunOn(**executor) - .then([this, self = shared_from_this(), executor, cmdObj](auto recipientHost) { - executor::RemoteCommandRequest request(std::move(recipientHost), - NamespaceString::kAdminDb.toString(), - std::move(cmdObj), - rpc::makeEmptyMetadata(), - nullptr, - kRecipientSyncDataTimeout); - request.sslMode = _sslMode; - - return (_recipientCmdExecutor) - ->scheduleRemoteCommand(std::move(request), - _instanceCancelationSource.token()) - .then([this, self = shared_from_this()](const auto& response) -> Status { - if (!response.isOK()) { - return response.status; - } - auto commandStatus = getStatusFromCommandResult(response.data); - commandStatus.addContext( - "Tenant migration recipient command failed"); - return commandStatus; - }); - }); - }) - .until([this, self = shared_from_this()](Status status) { - return shouldStopSendingRecipientCommand(status, _instanceCancelationSource.token()); - }) + const BSONObj& cmdObj, + const CancelationToken& token) { + return AsyncTry( + [this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj, token] { + return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, token) + .thenRunOn(**executor) + .then([this, self = shared_from_this(), executor, cmdObj, token]( + auto recipientHost) { + executor::RemoteCommandRequest request( + std::move(recipientHost), + NamespaceString::kAdminDb.toString(), + std::move(cmdObj), + rpc::makeEmptyMetadata(), + nullptr, + kRecipientSyncDataTimeout); + request.sslMode = _sslMode; + + return (_recipientCmdExecutor) + ->scheduleRemoteCommand(std::move(request), token) + .then([this, + self = shared_from_this()](const auto& response) -> Status { + if (!response.isOK()) { + return response.status; + } + auto commandStatus = getStatusFromCommandResult(response.data); + commandStatus.addContext( + "Tenant migration recipient command failed"); + return commandStatus; + }); + }); + }) + .until([token](Status status) { return shouldStopSendingRecipientCommand(status, token); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, _instanceCancelationSource.token()); + .on(**executor, token); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) { + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token) { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); @@ -658,12 +659,13 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa return request.toBSON(BSONObj()); }(); - return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj); + return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj, token); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand( std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) { + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token) { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); @@ -681,24 +683,24 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget commonData.setRecipientCertificateForDonor(_stateDoc.getRecipientCertificateForDonor()); request.setMigrationRecipientCommonData(commonData); - return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj())); + return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token); } SemiFuture<void> TenantMigrationDonorService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, - const CancelationToken& token) noexcept { - _instanceCancelationSource = CancelationSource(token); + const CancelationToken& serviceToken) noexcept { + _abortMigrationSource = CancelationSource(serviceToken); auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>( _recipientUri.getSetName(), _recipientUri.getServers()); return ExecutorFuture<void>(**executor) - .then([this, self = shared_from_this(), executor, token] { + .then([this, self = shared_from_this(), executor, serviceToken] { if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kUninitialized) { return ExecutorFuture<void>(**executor, Status::OK()); } // Enter "dataSync" state. - return _insertStateDoc(executor) + return _insertStateDoc(executor, _abortMigrationSource.token()) .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should // use its base PrimaryOnlyService's cancelation source to pass tokens @@ -711,17 +713,18 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( pauseTenantMigrationAfterPersitingInitialDonorStateDoc.pauseWhileSet(opCtx); }); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); - return _fetchAndStoreRecipientClusterTimeKeyDocs(executor, recipientTargeterRS, token); + return _fetchAndStoreRecipientClusterTimeKeyDocs( + executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token()); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) { return ExecutorFuture<void>(**executor, Status::OK()); } - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); // Before starting data sync, abort any in-progress index builds. No new index // builds can start while we are doing this because the mtab prevents it. @@ -733,42 +736,42 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( opCtx, _stateDoc.getTenantId(), "tenant migration"); } - return _sendRecipientSyncDataCommand(executor, recipientTargeterRS) + return _sendRecipientSyncDataCommand( + executor, recipientTargeterRS, _abortMigrationSource.token()) .then([this, self = shared_from_this()] { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); pauseTenantMigrationBeforeLeavingDataSyncState.pauseWhileSet(opCtx); }) - .then([this, self = shared_from_this(), executor, token] { - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + .then([this, self = shared_from_this(), executor, serviceToken] { + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); // Enter "blocking" state. - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking) - .then([this, self = shared_from_this(), executor, token]( + return _updateStateDoc(executor, + TenantMigrationDonorStateEnum::kBlocking, + _abortMigrationSource.token()) + .then([this, self = shared_from_this(), executor, serviceToken]( repl::OpTime opTime) { // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should // use its base PrimaryOnlyService's cancelation source to pass tokens // in calls to WaitForMajorityService::waitUntilMajority. - checkIfReceivedDonorAbortMigration(token, - _instanceCancelationSource.token()); + checkIfReceivedDonorAbortMigration(serviceToken, + _abortMigrationSource.token()); return _waitForMajorityWriteConcern(executor, std::move(opTime)); }); }); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kBlocking) { return ExecutorFuture<void>(**executor, Status::OK()); } - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); invariant(_stateDoc.getBlockTimestamp()); // Source to cancel the timeout if the operation completed in time. CancelationSource cancelTimeoutSource; - // Source to cancel if the timeout expires before completion, as a child of parent - // token. - CancelationSource recipientSyncDataCommandCancelSource(token); auto deadlineReachedFuture = (*executor)->sleepFor( Milliseconds(repl::tenantMigrationBlockingStateTimeoutMS.load()), @@ -776,13 +779,12 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( std::vector<ExecutorFuture<void>> futures; futures.push_back(std::move(deadlineReachedFuture)); - futures.push_back(_sendRecipientSyncDataCommand(executor, recipientTargeterRS)); + futures.push_back(_sendRecipientSyncDataCommand( + executor, recipientTargeterRS, _abortMigrationSource.token())); return whenAny(std::move(futures)) .thenRunOn(**executor) - .then([cancelTimeoutSource, - recipientSyncDataCommandCancelSource, - self = shared_from_this()](auto result) mutable { + .then([this, cancelTimeoutSource, self = shared_from_this()](auto result) mutable { const auto& [status, idx] = result; if (idx == 0) { @@ -791,7 +793,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "timeoutMs"_attr = repl::tenantMigrationGarbageCollectionDelayMS.load()); // Deadline reached, cancel the pending '_sendRecipientSyncDataCommand()'... - recipientSyncDataCommandCancelSource.cancel(); + _abortMigrationSource.cancel(); // ...and return error. uasserted(ErrorCodes::ExceededTimeLimit, "Blocking state timeout expired"); } else if (idx == 1) { @@ -829,12 +831,13 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "simulate a tenant migration error"); }); }) - .then([this, self = shared_from_this(), executor, token] { - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + .then([this, self = shared_from_this(), executor, serviceToken] { + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); // Enter "commit" state. - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted) - .then([this, self = shared_from_this(), executor, token]( + return _updateStateDoc( + executor, TenantMigrationDonorStateEnum::kCommitted, serviceToken) + .then([this, self = shared_from_this(), executor, serviceToken]( repl::OpTime opTime) { // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should // use its base PrimaryOnlyService's cancelation source to pass tokens @@ -852,7 +855,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( }); }); }) - .onError([this, self = shared_from_this(), executor](Status status) { + .onError([this, self = shared_from_this(), executor, serviceToken](Status status) { if (_stateDoc.getState() == TenantMigrationDonorStateEnum::kAborted) { // The migration was resumed on stepup and it was already aborted. return ExecutorFuture<void>(**executor, Status::OK()); @@ -871,7 +874,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } else { // Enter "abort" state. _abortReason.emplace(status); - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted) + return _updateStateDoc( + executor, TenantMigrationDonorStateEnum::kAborted, serviceToken) .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { return _waitForMajorityWriteConcern(executor, std::move(opTime)) .then([this, self = shared_from_this()] { @@ -893,7 +897,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "status"_attr = status, "abortReason"_attr = _abortReason); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { if (_stateDoc.getExpireAt()) { // The migration state has already been marked as garbage collectable. Set the // donorForgetMigration promise here since the Instance's destructor has an @@ -903,13 +907,19 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } // Wait for the donorForgetMigration command. + // If donorAbortMigration has already canceled work, the abortMigrationSource would be + // canceled and continued usage of the source would lead to incorrect behavior. Thus, we + // need to use the serviceToken after the migration has reached a decision state in + // order to continue work, such as sending donorForgetMigration, successfully. return std::move(_receiveDonorForgetMigrationPromise.getFuture()) .thenRunOn(**executor) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { - return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS); - }) - .then([this, self = shared_from_this(), executor] { - return _markStateDocAsGarbageCollectable(executor); + .then( + [this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { + return _sendRecipientForgetMigrationCommand( + executor, recipientTargeterRS, serviceToken); + }) + .then([this, self = shared_from_this(), executor, serviceToken] { + return _markStateDocAsGarbageCollectable(executor, serviceToken); }) .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { return _waitForMajorityWriteConcern(executor, std::move(opTime)); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 3c33846cc54..df3d9072528 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -162,14 +162,15 @@ public: ExecutorFuture<void> _fetchAndStoreRecipientClusterTimeKeyDocs( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancelationToken& token); + const CancelationToken& serviceToken, + const CancelationToken& instanceToken); /** * Inserts the state document to _stateDocumentsNS and returns the opTime for the insert * oplog entry. */ ExecutorFuture<repl::OpTime> _insertStateDoc( - std::shared_ptr<executor::ScopedTaskExecutor> executor); + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token); /** * Updates the state document to have the given state. Then, persists the updated document @@ -179,14 +180,15 @@ public: */ ExecutorFuture<repl::OpTime> _updateStateDoc( std::shared_ptr<executor::ScopedTaskExecutor> executor, - const TenantMigrationDonorStateEnum nextState); + const TenantMigrationDonorStateEnum nextState, + const CancelationToken& token); /** * Sets the "expireAt" time for the state document to be garbage collected, and returns the * the opTime for the write. */ ExecutorFuture<repl::OpTime> _markStateDocAsGarbageCollectable( - std::shared_ptr<executor::ScopedTaskExecutor> executor); + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token); /** * Waits for given opTime to be majority committed. @@ -200,21 +202,24 @@ public: ExecutorFuture<void> _sendCommandToRecipient( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const BSONObj& cmdObj); + const BSONObj& cmdObj, + const CancelationToken& token); /** * Sends the recipientSyncData command to the recipient replica set. */ ExecutorFuture<void> _sendRecipientSyncDataCommand( std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS); + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token); /** * Sends the recipientForgetMigration command to the recipient replica set. */ ExecutorFuture<void> _sendRecipientForgetMigrationCommand( std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS); + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token); ThreadPool::Limits _getRecipientCmdThreadPoolLimits() const { ThreadPool::Limits recipientCmdThreadPoolLimits; @@ -261,7 +266,7 @@ public: // This CancelationSource is instantiated from CancelationToken that is passed into run(). // It allows for manual cancelation of work from the instance. - CancelationSource _instanceCancelationSource; + CancelationSource _abortMigrationSource; }; private: |