diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2021-03-25 21:53:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-22 19:13:05 +0000 |
commit | 4565b6d14cf75272a6487e99aabcd05b4c290670 (patch) | |
tree | 20b7cfc287bd26731c9b344f74f5ec5356add1a6 /src | |
parent | bc7b5da3d0d91eeb945f103d81f2cebd97f7d9f3 (diff) | |
download | mongo-4565b6d14cf75272a6487e99aabcd05b4c290670.tar.gz |
SERVER-56248 Refactor logic for aborting a tenant migration donor
Diffstat (limited to 'src')
4 files changed, 59 insertions, 54 deletions
diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp index 4dee89efc4b..129aa787b40 100644 --- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp +++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp @@ -281,10 +281,6 @@ public: const auto& donor = donorPtr.get().get(); - // Ensure that we only are able to run donorAbortMigration after the donor has called - // run() and has inserted a majority committed state document for the migration. - donor->getMigrationCancelableFuture().get(opCtx); - donor->getInitialDonorStateDurableFuture().get(opCtx); donor->onReceiveDonorAbortMigration(); donor->getDecisionFuture().get(opCtx); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 69190f729ab..77e6f66e34c 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -333,7 +333,7 @@ boost::optional<BSONObj> TenantMigrationDonorService::Instance::reportForCurrent bob.append("tenantId", _tenantId); bob.append("recipientConnectionString", _recipientConnectionString); bob.append("readPreference", _readPreference.toInnerBSON()); - bob.append("receivedCancellation", _abortMigrationSource.token().isCanceled()); + bob.append("receivedCancellation", _abortRequested); bob.append("lastDurableState", _durableState.state); if (_stateDoc.getMigrationStart()) { bob.appendDate("migrationStart", *_stateDoc.getMigrationStart()); @@ -385,9 +385,11 @@ TenantMigrationDonorService::Instance::getDurableState(OperationContext* opCtx) } void TenantMigrationDonorService::Instance::onReceiveDonorAbortMigration() { - _abortMigrationSource.cancel(); - stdx::lock_guard<Latch> lg(_mutex); + _abortRequested = true; + if (_abortMigrationSource) { + _abortMigrationSource->cancel(); + } if (auto fetcher = _recipientKeysFetcher.lock()) { fetcher->shutdown(); } @@ -405,7 +407,6 @@ void TenantMigrationDonorService::Instance::interrupt(Status status) { setPromiseErrorIfNotReady(lg, _receiveDonorForgetMigrationPromise, status); setPromiseErrorIfNotReady(lg, _completionPromise, status); setPromiseErrorIfNotReady(lg, _decisionPromise, status); - setPromiseErrorIfNotReady(lg, _migrationCancelablePromise, status); if (auto fetcher = _recipientKeysFetcher.lock()) { fetcher->shutdown(); @@ -713,9 +714,25 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token); } +CancellationToken TenantMigrationDonorService::Instance::_initAbortMigrationSource( + const CancellationToken& token) { + stdx::lock_guard<Latch> lg(_mutex); + invariant(!_abortMigrationSource); + _abortMigrationSource = CancellationSource(token); + + if (_abortRequested) { + // An abort was requested before the abort source was set up so immediately cancel it. + _abortMigrationSource->cancel(); + } + + return _abortMigrationSource->token(); +} + SemiFuture<void> TenantMigrationDonorService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept { + pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet(); + { stdx::lock_guard<Latch> lg(_mutex); if (!_stateDoc.getMigrationStart()) { @@ -723,45 +740,42 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } } - pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet(); - - _abortMigrationSource = CancellationSource(token); + auto abortToken = _initAbortMigrationSource(token); - { - stdx::lock_guard<Latch> lg(_mutex); - setPromiseOkIfNotReady(lg, _migrationCancelablePromise); - } auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>( _recipientUri.getSetName(), _recipientUri.getServers()); auto scopedOutstandingMigrationCounter = TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingDonatingCount(); return ExecutorFuture(**executor) - .then([this, self = shared_from_this(), executor] { - return _enterAbortingIndexBuildsState(executor, _abortMigrationSource.token()); + .then([this, self = shared_from_this(), executor, token] { + // Note we do not use the abort migration token here because the donorAbortMigration + // command waits for a decision to be persisted which will not happen if inserting the + // initial state document fails. + return _enterAbortingIndexBuildsState(executor, token); }) - .then([this, self = shared_from_this(), executor] { - _abortIndexBuilds(_abortMigrationSource.token()); + .then([this, self = shared_from_this(), executor, abortToken] { + _abortIndexBuilds(abortToken); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] { return _fetchAndStoreRecipientClusterTimeKeyDocs( - executor, recipientTargeterRS, _abortMigrationSource.token()); + executor, recipientTargeterRS, abortToken); }) - .then([this, self = shared_from_this(), executor] { - return _enterDataSyncState(executor, _abortMigrationSource.token()); + .then([this, self = shared_from_this(), executor, abortToken] { + return _enterDataSyncState(executor, abortToken); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] { return _waitForRecipientToBecomeConsistentAndEnterBlockingState( - executor, recipientTargeterRS, _abortMigrationSource.token()); + executor, recipientTargeterRS, abortToken); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] { return _waitForRecipientToReachBlockTimestampAndEnterCommittedState( - executor, recipientTargeterRS, _abortMigrationSource.token()); + executor, recipientTargeterRS, abortToken); }) // Note from here on the migration cannot be aborted, so only the token from the primary // only service should be used. - .onError([this, self = shared_from_this(), executor, token](Status status) { - return _handleErrorOrEnterAbortedState(executor, token, status); + .onError([this, self = shared_from_this(), executor, token, abortToken](Status status) { + return _handleErrorOrEnterAbortedState(executor, token, abortToken, status); }) .onCompletion([this, self = shared_from_this()](Status status) { LOGV2(5006601, @@ -1105,6 +1119,7 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterAbortedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token, + const CancellationToken& abortToken, Status status) { { stdx::lock_guard<Latch> lg(_mutex); @@ -1114,7 +1129,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA } } - if (_abortMigrationSource.token().isCanceled()) { + if (abortToken.isCanceled()) { status = Status(ErrorCodes::TenantMigrationAborted, "Aborted due to donorAbortMigration."); } diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index a596261e0f7..842d2e8c0d5 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -121,22 +121,6 @@ public: } /** - * Returns a Future that will be resolved when a migration has called the run() method and - * instantiated the CancellationSource. - */ - SharedSemiFuture<void> getMigrationCancelableFuture() const { - return _migrationCancelablePromise.getFuture(); - } - - /** - * Returns a Future that will be resolved when the donor has majority-committed the write to - * insert the donor state doc for the migration. - */ - SharedSemiFuture<void> getInitialDonorStateDurableFuture() const { - return _initialDonorStateDurablePromise.getFuture(); - } - - /** * Kicks off work for the donorAbortMigration command. */ void onReceiveDonorAbortMigration(); @@ -189,6 +173,7 @@ public: ExecutorFuture<void> _handleErrorOrEnterAbortedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& token, + const CancellationToken& abortToken, Status status); ExecutorFuture<void> _waitForForgetMigrationThenMarkMigrationGarbageCollectable( @@ -267,6 +252,12 @@ public: return recipientCmdThreadPoolLimits; } + /* + * Initializes _abortMigrationSource and returns a token from it. The source will be + * immediately canceled if an abort has already been requested. + */ + CancellationToken _initAbortMigrationSource(const CancellationToken& token); + ServiceContext* const _serviceContext; const TenantMigrationDonorService* const _donorService; @@ -295,16 +286,13 @@ public: boost::optional<Status> _abortReason; - // Protects the durable state, state document, and the promises below. + // Protects the durable state, state document, abort requested boolean, and the promises + // below. mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationDonorService::_mutex"); // The latest majority-committed migration state. DurableState _durableState; - // Promise that is resolved when run() has been called and the CancellationSource has been - // instantiated. - SharedPromise<void> _migrationCancelablePromise; - // Promise that is resolved when the donor has majority-committed the write to insert the // donor state doc for the migration. SharedPromise<void> _initialDonorStateDurablePromise; @@ -319,9 +307,14 @@ public: // abort. SharedPromise<void> _decisionPromise; + // Set to true when a request to cancel the migration has been processed, e.g. after + // executing the donorAbortMigration command. + bool _abortRequested{false}; + // Used for logical interrupts that require aborting the migration but not unconditionally - // interrupting the instance, e.g. receiving donorAbortMigration. - CancellationSource _abortMigrationSource; + // interrupting the instance, e.g. receiving donorAbortMigration. Initialized in + // _initAbortMigrationSource(). + boost::optional<CancellationSource> _abortMigrationSource; }; private: diff --git a/src/mongo/db/repl/tenant_migration_donor_service_test.cpp b/src/mongo/db/repl/tenant_migration_donor_service_test.cpp index 9f25de63f3c..53588c3c158 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service_test.cpp @@ -176,7 +176,8 @@ TEST_F(TenantMigrationDonorServiceTest, CheckSettingMigrationStartDate) { // Advance the clock by some arbitrary amount of time so we are not starting at 0 seconds. _clkSource->advance(Milliseconds(10000)); - auto taskFp = globalFailPointRegistry().find("pauseTenantMigrationBeforeEnteringFutureChain"); + auto taskFp = + globalFailPointRegistry().find("pauseTenantMigrationAfterPersistingInitialDonorStateDoc"); auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn); const UUID migrationUUID = UUID::gen(); |