diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2021-01-28 17:58:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-08 23:29:36 +0000 |
commit | 3120087175678ec7a61a6d12cd9326ba8cfa2d45 (patch) | |
tree | 70fa55d784662f05ae747d1724b872cec92832df /src/mongo/db/repl | |
parent | 8e177a8c5eef5aedc0c4fdf08d6e764389907cb6 (diff) | |
download | mongo-3120087175678ec7a61a6d12cd9326ba8cfa2d45.tar.gz |
SERVER-54114 SERVER-54039 Pass cancelation token throughout TenantMigrationDonorService
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 208 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.h | 21 |
2 files changed, 122 insertions, 107 deletions
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index fc58789db3a..af57ada365c 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -88,12 +88,12 @@ bool shouldStopSendingRecipientCommand(Status status, const CancelationToken& to return status.isOK() || !ErrorCodes::isRetriableError(status) || token.isCanceled(); } -void checkIfReceivedDonorAbortMigration(const CancelationToken& parent, - const CancelationToken& instance) { +void checkIfReceivedDonorAbortMigration(const CancelationToken& serviceToken, + const CancelationToken& instanceToken) { // If only the instance token was canceled, then we must have gotten donorAbortMigration. uassert(ErrorCodes::TenantMigrationAborted, "Migration aborted due to receiving donorAbortMigration.", - !instance.isCanceled() || parent.isCanceled()); + !instanceToken.isCanceled() || serviceToken.isCanceled()); } } // namespace @@ -235,7 +235,7 @@ boost::optional<BSONObj> TenantMigrationDonorService::Instance::reportForCurrent BSONObjBuilder bob; bob.append("desc", "tenant donor migration"); bob.append("migrationCompleted", _completionPromise.getFuture().isReady()); - bob.append("receivedCancelation", _instanceCancelationSource.token().isCanceled()); + bob.append("receivedCancelation", _abortMigrationSource.token().isCanceled()); bob.append("instanceID", _stateDoc.getId().toBSON()); bob.append("tenantId", _stateDoc.getTenantId()); bob.append("recipientConnectionString", _stateDoc.getRecipientConnectionString()); @@ -283,7 +283,7 @@ TenantMigrationDonorService::Instance::getDurableState(OperationContext* opCtx) } void TenantMigrationDonorService::Instance::onReceiveDonorAbortMigration() { - _instanceCancelationSource.cancel(); + _abortMigrationSource.cancel(); stdx::lock_guard<Latch> lg(_mutex); if (auto fetcher = _recipientKeysFetcher.lock()) { @@ -319,9 +319,9 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancelationToken& token) { - return recipientTargeterRS - ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token()) + const CancelationToken& serviceToken, + const CancelationToken& instanceToken) { + return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, instanceToken) .thenRunOn(**executor) .then([this, self = shared_from_this(), executor](HostAndPort host) { const auto nss = NamespaceString::kKeysCollectionNamespace; @@ -396,16 +396,17 @@ TenantMigrationDonorService::Instance::_fetchAndStoreRecipientClusterTimeKeyDocs return keyDocs; }) - .then([this, self = shared_from_this(), executor, token](auto keyDocs) { - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + .then( + [this, self = shared_from_this(), executor, serviceToken, instanceToken](auto keyDocs) { + checkIfReceivedDonorAbortMigration(serviceToken, instanceToken); - tenant_migration_util::storeExternalClusterTimeKeyDocsAndRefreshCache( - executor, std::move(keyDocs), _instanceCancelationSource.token()); - }); + tenant_migration_util::storeExternalClusterTimeKeyDocsAndRefreshCache( + executor, std::move(keyDocs), instanceToken); + }); } ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertStateDoc( - std::shared_ptr<executor::ScopedTaskExecutor> executor) { + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token) { invariant(_stateDoc.getState() == TenantMigrationDonorStateEnum::kUninitialized); _stateDoc.setState(TenantMigrationDonorStateEnum::kDataSync); @@ -430,9 +431,8 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([this, self = shared_from_this()](StatusWith<repl::OpTime> swOpTime) { - return shouldStopInsertingDonorStateDoc(swOpTime.getStatus(), - _instanceCancelationSource.token()); + .until([token](StatusWith<repl::OpTime> swOpTime) { + return shouldStopInsertingDonorStateDoc(swOpTime.getStatus(), token); }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, CancelationToken::uncancelable()); @@ -440,7 +440,8 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_insertState ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateStateDoc( std::shared_ptr<executor::ScopedTaskExecutor> executor, - const TenantMigrationDonorStateEnum nextState) { + const TenantMigrationDonorStateEnum nextState, + const CancelationToken& token) { const auto originalStateDocBson = _stateDoc.toBSON(); return AsyncTry([this, self = shared_from_this(), executor, nextState, originalStateDocBson] { @@ -525,9 +526,8 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState invariant(updateOpTime); return updateOpTime.get(); }) - .until([this, self = shared_from_this()](StatusWith<repl::OpTime> swOpTime) { - return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), - _instanceCancelationSource.token()); + .until([token](StatusWith<repl::OpTime> swOpTime) { + return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token); }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, CancelationToken::uncancelable()); @@ -535,7 +535,7 @@ ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_updateState ExecutorFuture<repl::OpTime> TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable( - std::shared_ptr<executor::ScopedTaskExecutor> executor) { + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token) { _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + Milliseconds{repl::tenantMigrationGarbageCollectionDelayMS.load()}); return AsyncTry([this, self = shared_from_this()] { @@ -560,9 +560,8 @@ TenantMigrationDonorService::Instance::_markStateDocAsGarbageCollectable( return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([this, self = shared_from_this()](StatusWith<repl::OpTime> swOpTime) { - return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), - _instanceCancelationSource.token()); + .until([token](StatusWith<repl::OpTime> swOpTime) { + return shouldStopUpdatingDonorStateDoc(swOpTime.getStatus(), token); }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, CancelationToken::uncancelable()); @@ -598,44 +597,46 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_waitForMajorityWrit ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendCommandToRecipient( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const BSONObj& cmdObj) { - return AsyncTry([this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj] { - return recipientTargeterRS - ->findHost(kPrimaryOnlyReadPreference, _instanceCancelationSource.token()) - .thenRunOn(**executor) - .then([this, self = shared_from_this(), executor, cmdObj](auto recipientHost) { - executor::RemoteCommandRequest request(std::move(recipientHost), - NamespaceString::kAdminDb.toString(), - std::move(cmdObj), - rpc::makeEmptyMetadata(), - nullptr, - kRecipientSyncDataTimeout); - request.sslMode = _sslMode; - - return (_recipientCmdExecutor) - ->scheduleRemoteCommand(std::move(request), - _instanceCancelationSource.token()) - .then([this, self = shared_from_this()](const auto& response) -> Status { - if (!response.isOK()) { - return response.status; - } - auto commandStatus = getStatusFromCommandResult(response.data); - commandStatus.addContext( - "Tenant migration recipient command failed"); - return commandStatus; - }); - }); - }) - .until([this, self = shared_from_this()](Status status) { - return shouldStopSendingRecipientCommand(status, _instanceCancelationSource.token()); - }) + const BSONObj& cmdObj, + const CancelationToken& token) { + return AsyncTry( + [this, self = shared_from_this(), executor, recipientTargeterRS, cmdObj, token] { + return recipientTargeterRS->findHost(kPrimaryOnlyReadPreference, token) + .thenRunOn(**executor) + .then([this, self = shared_from_this(), executor, cmdObj, token]( + auto recipientHost) { + executor::RemoteCommandRequest request( + std::move(recipientHost), + NamespaceString::kAdminDb.toString(), + std::move(cmdObj), + rpc::makeEmptyMetadata(), + nullptr, + kRecipientSyncDataTimeout); + request.sslMode = _sslMode; + + return (_recipientCmdExecutor) + ->scheduleRemoteCommand(std::move(request), token) + .then([this, + self = shared_from_this()](const auto& response) -> Status { + if (!response.isOK()) { + return response.status; + } + auto commandStatus = getStatusFromCommandResult(response.data); + commandStatus.addContext( + "Tenant migration recipient command failed"); + return commandStatus; + }); + }); + }) + .until([token](Status status) { return shouldStopSendingRecipientCommand(status, token); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, _instanceCancelationSource.token()); + .on(**executor, token); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDataCommand( std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) { + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token) { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); @@ -658,12 +659,13 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientSyncDa return request.toBSON(BSONObj()); }(); - return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj); + return _sendCommandToRecipient(executor, recipientTargeterRS, cmdObj, token); } ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForgetMigrationCommand( std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS) { + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token) { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); @@ -681,24 +683,24 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget commonData.setRecipientCertificateForDonor(_stateDoc.getRecipientCertificateForDonor()); request.setMigrationRecipientCommonData(commonData); - return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj())); + return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token); } SemiFuture<void> TenantMigrationDonorService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, - const CancelationToken& token) noexcept { - _instanceCancelationSource = CancelationSource(token); + const CancelationToken& serviceToken) noexcept { + _abortMigrationSource = CancelationSource(serviceToken); auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>( _recipientUri.getSetName(), _recipientUri.getServers()); return ExecutorFuture<void>(**executor) - .then([this, self = shared_from_this(), executor, token] { + .then([this, self = shared_from_this(), executor, serviceToken] { if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kUninitialized) { return ExecutorFuture<void>(**executor, Status::OK()); } // Enter "dataSync" state. - return _insertStateDoc(executor) + return _insertStateDoc(executor, _abortMigrationSource.token()) .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should // use its base PrimaryOnlyService's cancelation source to pass tokens @@ -711,17 +713,18 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( pauseTenantMigrationAfterPersitingInitialDonorStateDoc.pauseWhileSet(opCtx); }); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); - return _fetchAndStoreRecipientClusterTimeKeyDocs(executor, recipientTargeterRS, token); + return _fetchAndStoreRecipientClusterTimeKeyDocs( + executor, recipientTargeterRS, serviceToken, _abortMigrationSource.token()); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kDataSync) { return ExecutorFuture<void>(**executor, Status::OK()); } - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); // Before starting data sync, abort any in-progress index builds. No new index // builds can start while we are doing this because the mtab prevents it. @@ -733,42 +736,42 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( opCtx, _stateDoc.getTenantId(), "tenant migration"); } - return _sendRecipientSyncDataCommand(executor, recipientTargeterRS) + return _sendRecipientSyncDataCommand( + executor, recipientTargeterRS, _abortMigrationSource.token()) .then([this, self = shared_from_this()] { auto opCtxHolder = cc().makeOperationContext(); auto opCtx = opCtxHolder.get(); pauseTenantMigrationBeforeLeavingDataSyncState.pauseWhileSet(opCtx); }) - .then([this, self = shared_from_this(), executor, token] { - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + .then([this, self = shared_from_this(), executor, serviceToken] { + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); // Enter "blocking" state. - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kBlocking) - .then([this, self = shared_from_this(), executor, token]( + return _updateStateDoc(executor, + TenantMigrationDonorStateEnum::kBlocking, + _abortMigrationSource.token()) + .then([this, self = shared_from_this(), executor, serviceToken]( repl::OpTime opTime) { // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should // use its base PrimaryOnlyService's cancelation source to pass tokens // in calls to WaitForMajorityService::waitUntilMajority. - checkIfReceivedDonorAbortMigration(token, - _instanceCancelationSource.token()); + checkIfReceivedDonorAbortMigration(serviceToken, + _abortMigrationSource.token()); return _waitForMajorityWriteConcern(executor, std::move(opTime)); }); }); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS, token] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { if (_stateDoc.getState() > TenantMigrationDonorStateEnum::kBlocking) { return ExecutorFuture<void>(**executor, Status::OK()); } - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); invariant(_stateDoc.getBlockTimestamp()); // Source to cancel the timeout if the operation completed in time. CancelationSource cancelTimeoutSource; - // Source to cancel if the timeout expires before completion, as a child of parent - // token. - CancelationSource recipientSyncDataCommandCancelSource(token); auto deadlineReachedFuture = (*executor)->sleepFor( Milliseconds(repl::tenantMigrationBlockingStateTimeoutMS.load()), @@ -776,13 +779,12 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( std::vector<ExecutorFuture<void>> futures; futures.push_back(std::move(deadlineReachedFuture)); - futures.push_back(_sendRecipientSyncDataCommand(executor, recipientTargeterRS)); + futures.push_back(_sendRecipientSyncDataCommand( + executor, recipientTargeterRS, _abortMigrationSource.token())); return whenAny(std::move(futures)) .thenRunOn(**executor) - .then([cancelTimeoutSource, - recipientSyncDataCommandCancelSource, - self = shared_from_this()](auto result) mutable { + .then([this, cancelTimeoutSource, self = shared_from_this()](auto result) mutable { const auto& [status, idx] = result; if (idx == 0) { @@ -791,7 +793,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "timeoutMs"_attr = repl::tenantMigrationGarbageCollectionDelayMS.load()); // Deadline reached, cancel the pending '_sendRecipientSyncDataCommand()'... - recipientSyncDataCommandCancelSource.cancel(); + _abortMigrationSource.cancel(); // ...and return error. uasserted(ErrorCodes::ExceededTimeLimit, "Blocking state timeout expired"); } else if (idx == 1) { @@ -829,12 +831,13 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "simulate a tenant migration error"); }); }) - .then([this, self = shared_from_this(), executor, token] { - checkIfReceivedDonorAbortMigration(token, _instanceCancelationSource.token()); + .then([this, self = shared_from_this(), executor, serviceToken] { + checkIfReceivedDonorAbortMigration(serviceToken, _abortMigrationSource.token()); // Enter "commit" state. - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kCommitted) - .then([this, self = shared_from_this(), executor, token]( + return _updateStateDoc( + executor, TenantMigrationDonorStateEnum::kCommitted, serviceToken) + .then([this, self = shared_from_this(), executor, serviceToken]( repl::OpTime opTime) { // TODO (SERVER-53389): TenantMigration{Donor, Recipient}Service should // use its base PrimaryOnlyService's cancelation source to pass tokens @@ -852,7 +855,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( }); }); }) - .onError([this, self = shared_from_this(), executor](Status status) { + .onError([this, self = shared_from_this(), executor, serviceToken](Status status) { if (_stateDoc.getState() == TenantMigrationDonorStateEnum::kAborted) { // The migration was resumed on stepup and it was already aborted. return ExecutorFuture<void>(**executor, Status::OK()); @@ -871,7 +874,8 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } else { // Enter "abort" state. _abortReason.emplace(status); - return _updateStateDoc(executor, TenantMigrationDonorStateEnum::kAborted) + return _updateStateDoc( + executor, TenantMigrationDonorStateEnum::kAborted, serviceToken) .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { return _waitForMajorityWriteConcern(executor, std::move(opTime)) .then([this, self = shared_from_this()] { @@ -893,7 +897,7 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( "status"_attr = status, "abortReason"_attr = _abortReason); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { if (_stateDoc.getExpireAt()) { // The migration state has already been marked as garbage collectable. Set the // donorForgetMigration promise here since the Instance's destructor has an @@ -903,13 +907,19 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } // Wait for the donorForgetMigration command. + // If donorAbortMigration has already canceled work, the abortMigrationSource would be + // canceled and continued usage of the source would lead to incorrect behavior. Thus, we + // need to use the serviceToken after the migration has reached a decision state in + // order to continue work, such as sending donorForgetMigration, successfully. return std::move(_receiveDonorForgetMigrationPromise.getFuture()) .thenRunOn(**executor) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { - return _sendRecipientForgetMigrationCommand(executor, recipientTargeterRS); - }) - .then([this, self = shared_from_this(), executor] { - return _markStateDocAsGarbageCollectable(executor); + .then( + [this, self = shared_from_this(), executor, recipientTargeterRS, serviceToken] { + return _sendRecipientForgetMigrationCommand( + executor, recipientTargeterRS, serviceToken); + }) + .then([this, self = shared_from_this(), executor, serviceToken] { + return _markStateDocAsGarbageCollectable(executor, serviceToken); }) .then([this, self = shared_from_this(), executor](repl::OpTime opTime) { return _waitForMajorityWriteConcern(executor, std::move(opTime)); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 3c33846cc54..df3d9072528 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -162,14 +162,15 @@ public: ExecutorFuture<void> _fetchAndStoreRecipientClusterTimeKeyDocs( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const CancelationToken& token); + const CancelationToken& serviceToken, + const CancelationToken& instanceToken); /** * Inserts the state document to _stateDocumentsNS and returns the opTime for the insert * oplog entry. */ ExecutorFuture<repl::OpTime> _insertStateDoc( - std::shared_ptr<executor::ScopedTaskExecutor> executor); + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token); /** * Updates the state document to have the given state. Then, persists the updated document @@ -179,14 +180,15 @@ public: */ ExecutorFuture<repl::OpTime> _updateStateDoc( std::shared_ptr<executor::ScopedTaskExecutor> executor, - const TenantMigrationDonorStateEnum nextState); + const TenantMigrationDonorStateEnum nextState, + const CancelationToken& token); /** * Sets the "expireAt" time for the state document to be garbage collected, and returns the * the opTime for the write. */ ExecutorFuture<repl::OpTime> _markStateDocAsGarbageCollectable( - std::shared_ptr<executor::ScopedTaskExecutor> executor); + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token); /** * Waits for given opTime to be majority committed. @@ -200,21 +202,24 @@ public: ExecutorFuture<void> _sendCommandToRecipient( std::shared_ptr<executor::ScopedTaskExecutor> executor, std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, - const BSONObj& cmdObj); + const BSONObj& cmdObj, + const CancelationToken& token); /** * Sends the recipientSyncData command to the recipient replica set. */ ExecutorFuture<void> _sendRecipientSyncDataCommand( std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS); + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token); /** * Sends the recipientForgetMigration command to the recipient replica set. */ ExecutorFuture<void> _sendRecipientForgetMigrationCommand( std::shared_ptr<executor::ScopedTaskExecutor> executor, - std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS); + std::shared_ptr<RemoteCommandTargeter> recipientTargeterRS, + const CancelationToken& token); ThreadPool::Limits _getRecipientCmdThreadPoolLimits() const { ThreadPool::Limits recipientCmdThreadPoolLimits; @@ -261,7 +266,7 @@ public: // This CancelationSource is instantiated from CancelationToken that is passed into run(). // It allows for manual cancelation of work from the instance. - CancelationSource _instanceCancelationSource; + CancelationSource _abortMigrationSource; }; private: |