diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2021-03-25 21:53:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-03 00:49:07 +0000 |
commit | 48531fdbd58769200b279ed5e31c2111376903ac (patch) | |
tree | a7f006a94a5423fb676b5ae5ce55c55fa2acba59 | |
parent | 17d7453f95b4c1dfa439adb4532a82015c9a7d81 (diff) | |
download | mongo-48531fdbd58769200b279ed5e31c2111376903ac.tar.gz |
SERVER-56248 Refactor logic for aborting a tenant migration donor
(cherry picked from commit 4565b6d14cf75272a6487e99aabcd05b4c290670)
5 files changed, 120 insertions, 54 deletions
diff --git a/jstests/replsets/tenant_migration_donor_try_abort.js b/jstests/replsets/tenant_migration_donor_try_abort.js index 5aecd26c1a4..f003429e935 100644 --- a/jstests/replsets/tenant_migration_donor_try_abort.js +++ b/jstests/replsets/tenant_migration_donor_try_abort.js @@ -23,6 +23,58 @@ const kDelayMS = const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); (() => { + jsTestLog("Test sending donorAbortMigration before an instance's future chain begins."); + + const tmt = new TenantMigrationTest({name: jsTestName()}); + if (!tmt.isFeatureFlagEnabled()) { + jsTestLog("Skipping test because the tenant migrations feature flag is disabled"); + return; + } + + const donorPrimary = tmt.getDonorPrimary(); + let fp = configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeEnteringFutureChain"); + + const tenantId = kTenantId; + const migrationId = extractUUIDFromObject(UUID()); + const migrationOpts = { + migrationIdString: migrationId, + tenantId: tenantId, + recipientConnString: tmt.getRecipientConnString(), + }; + + const donorRstArgs = TenantMigrationUtil.createRstArgs(tmt.getDonorRst()); + + const startMigrationThread = + new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs); + startMigrationThread.start(); + + fp.wait(); + + const tryAbortThread = + new Thread(TenantMigrationUtil.tryAbortMigrationAsync, migrationOpts, donorRstArgs); + tryAbortThread.start(); + + // Wait for donorAbortMigration command to start. + assert.soon(() => { + const res = assert.commandWorked(donorPrimary.adminCommand( + {currentOp: true, desc: "tenant donor migration", tenantId: tenantId})); + return res.inprog[0].receivedCancelation; + }); + + fp.off(); + + startMigrationThread.join(); + tryAbortThread.join(); + let r = assert.commandWorked(tryAbortThread.returnData()); + + const stateRes = assert.commandWorked(tmt.waitForMigrationToComplete(migrationOpts)); + assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted); + assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes)); + + tmt.stop(); +})(); + +(() => { jsTestLog( "Test sending donorAbortMigration during a tenant migration while recipientSyncData " + "command repeatedly fails with retryable errors."); @@ -58,6 +110,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); const stateRes = assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted); + assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes)); fp.off(); tenantMigrationTest.stop(); @@ -99,6 +152,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); const stateRes = assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted); + assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes)); fp.off(); tenantMigrationTest.stop(); @@ -160,6 +214,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); const stateRes = assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted); + assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes)); tenantMigrationTest.stop(); })(); @@ -199,6 +254,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); const stateRes = assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted); + assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes)); tenantMigrationTest.stop(); })(); @@ -251,6 +307,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); const stateRes = assert.commandWorked(tmt.waitForMigrationToComplete(migrationOpts)); assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted); + assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes)); tmt.stop(); })(); @@ -362,6 +419,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); const stateRes = assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted); + assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes)); tenantMigrationTest.stop(); })(); @@ -410,6 +468,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); const stateRes = assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted); + assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes)); tenantMigrationTest.stop(); })(); @@ -436,6 +495,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted); + assert.eq(stateRes.abortReason.code, ErrorCodes.InternalError, tojson(stateRes)); fp.off(); @@ -464,6 +524,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest(); const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts)); assert.eq(stateRes.state, TenantMigrationTest.DonorState.kCommitted); + assert.isnull(stateRes.abortReason, tojson(stateRes)); assert.commandFailedWithCode( tenantMigrationTest.tryAbortMigration({migrationIdString: migrationOpts.migrationIdString}), diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp index 4dee89efc4b..129aa787b40 100644 --- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp +++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp @@ -281,10 +281,6 @@ public: const auto& donor = donorPtr.get().get(); - // Ensure that we only are able to run donorAbortMigration after the donor has called - // run() and has inserted a majority committed state document for the migration. - donor->getMigrationCancelableFuture().get(opCtx); - donor->getInitialDonorStateDurableFuture().get(opCtx); donor->onReceiveDonorAbortMigration(); donor->getDecisionFuture().get(opCtx); diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index 0595a77f7a5..93a691e2862 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -329,11 +329,11 @@ boost::optional<BSONObj> TenantMigrationDonorService::Instance::reportForCurrent BSONObjBuilder bob; bob.append("desc", "tenant donor migration"); bob.append("migrationCompleted", _completionPromise.getFuture().isReady()); - bob.append("receivedCancelation", _abortMigrationSource.token().isCanceled()); bob.append("instanceID", _migrationUuid.toBSON()); bob.append("tenantId", _tenantId); bob.append("recipientConnectionString", _recipientConnectionString); bob.append("readPreference", _readPreference.toInnerBSON()); + bob.append("receivedCancelation", _abortRequested); bob.append("lastDurableState", _durableState.state); if (_stateDoc.getMigrationStart()) { bob.appendDate("migrationStart", *_stateDoc.getMigrationStart()); @@ -386,9 +386,11 @@ TenantMigrationDonorService::Instance::getDurableState(OperationContext* opCtx) } void TenantMigrationDonorService::Instance::onReceiveDonorAbortMigration() { - _abortMigrationSource.cancel(); - stdx::lock_guard<Latch> lg(_mutex); + _abortRequested = true; + if (_abortMigrationSource) { + _abortMigrationSource->cancel(); + } if (auto fetcher = _recipientKeysFetcher.lock()) { fetcher->shutdown(); } @@ -406,7 +408,6 @@ void TenantMigrationDonorService::Instance::interrupt(Status status) { setPromiseErrorIfNotReady(lg, _receiveDonorForgetMigrationPromise, status); setPromiseErrorIfNotReady(lg, _completionPromise, status); setPromiseErrorIfNotReady(lg, _decisionPromise, status); - setPromiseErrorIfNotReady(lg, _migrationCancelablePromise, status); if (auto fetcher = _recipientKeysFetcher.lock()) { fetcher->shutdown(); @@ -714,9 +715,25 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token); } +CancelationToken TenantMigrationDonorService::Instance::_initAbortMigrationSource( + const CancelationToken& token) { + stdx::lock_guard<Latch> lg(_mutex); + invariant(!_abortMigrationSource); + _abortMigrationSource = CancelationSource(token); + + if (_abortRequested) { + // An abort was requested before the abort source was set up so immediately cancel it. + _abortMigrationSource->cancel(); + } + + return _abortMigrationSource->token(); +} + SemiFuture<void> TenantMigrationDonorService::Instance::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& token) noexcept { + pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet(); + { stdx::lock_guard<Latch> lg(_mutex); if (!_stateDoc.getMigrationStart()) { @@ -724,45 +741,42 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( } } - pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet(); - - _abortMigrationSource = CancelationSource(token); + auto abortToken = _initAbortMigrationSource(token); - { - stdx::lock_guard<Latch> lg(_mutex); - setPromiseOkIfNotReady(lg, _migrationCancelablePromise); - } auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>( _recipientUri.getSetName(), _recipientUri.getServers()); auto scopedOutstandingMigrationCounter = TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingDonatingCount(); return ExecutorFuture(**executor) - .then([this, self = shared_from_this(), executor] { - return _enterAbortingIndexBuildsState(executor, _abortMigrationSource.token()); + .then([this, self = shared_from_this(), executor, token] { + // Note we do not use the abort migration token here because the donorAbortMigration + // command waits for a decision to be persisted which will not happen if inserting the + // initial state document fails. + return _enterAbortingIndexBuildsState(executor, token); }) - .then([this, self = shared_from_this(), executor] { - _abortIndexBuilds(_abortMigrationSource.token()); + .then([this, self = shared_from_this(), executor, abortToken] { + _abortIndexBuilds(abortToken); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] { return _fetchAndStoreRecipientClusterTimeKeyDocs( - executor, recipientTargeterRS, _abortMigrationSource.token()); + executor, recipientTargeterRS, abortToken); }) - .then([this, self = shared_from_this(), executor] { - return _enterDataSyncState(executor, _abortMigrationSource.token()); + .then([this, self = shared_from_this(), executor, abortToken] { + return _enterDataSyncState(executor, abortToken); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] { return _waitForRecipientToBecomeConsistentAndEnterBlockingState( - executor, recipientTargeterRS, _abortMigrationSource.token()); + executor, recipientTargeterRS, abortToken); }) - .then([this, self = shared_from_this(), executor, recipientTargeterRS] { + .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] { return _waitForRecipientToReachBlockTimestampAndEnterCommittedState( - executor, recipientTargeterRS, _abortMigrationSource.token()); + executor, recipientTargeterRS, abortToken); }) // Note from here on the migration cannot be aborted, so only the token from the primary // only service should be used. - .onError([this, self = shared_from_this(), executor, token](Status status) { - return _handleErrorOrEnterAbortedState(executor, token, status); + .onError([this, self = shared_from_this(), executor, token, abortToken](Status status) { + return _handleErrorOrEnterAbortedState(executor, token, abortToken, status); }) .onCompletion([this, self = shared_from_this()](Status status) { LOGV2(5006601, @@ -1106,6 +1120,7 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterAbortedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancelationToken& token, + const CancelationToken& abortToken, Status status) { { stdx::lock_guard<Latch> lg(_mutex); @@ -1115,7 +1130,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA } } - if (_abortMigrationSource.token().isCanceled()) { + if (abortToken.isCanceled()) { status = Status(ErrorCodes::TenantMigrationAborted, "Aborted due to donorAbortMigration."); } diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 71a8d1ca32f..5087017f339 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -122,22 +122,6 @@ public: } /** - * Returns a Future that will be resolved when a migration has called the run() method and - * instantiated the CancelationSource. - */ - SharedSemiFuture<void> getMigrationCancelableFuture() const { - return _migrationCancelablePromise.getFuture(); - } - - /** - * Returns a Future that will be resolved when the donor has majority-committed the write to - * insert the donor state doc for the migration. - */ - SharedSemiFuture<void> getInitialDonorStateDurableFuture() const { - return _initialDonorStateDurablePromise.getFuture(); - } - - /** * Kicks off work for the donorAbortMigration command. */ void onReceiveDonorAbortMigration(); @@ -190,6 +174,7 @@ public: ExecutorFuture<void> _handleErrorOrEnterAbortedState( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancelationToken& token, + const CancelationToken& abortToken, Status status); ExecutorFuture<void> _waitForForgetMigrationThenMarkMigrationGarbageCollectable( @@ -268,6 +253,12 @@ public: return recipientCmdThreadPoolLimits; } + /* + * Initializes _abortMigrationSource and returns a token from it. The source will be + * immediately canceled if an abort has already been requested. + */ + CancelationToken _initAbortMigrationSource(const CancelationToken& token); + ServiceContext* const _serviceContext; const TenantMigrationDonorService* const _donorService; @@ -296,16 +287,13 @@ public: boost::optional<Status> _abortReason; - // Protects the durable state, state document, and the promises below. + // Protects the durable state, state document, abort requested boolean, and the promises + // below. mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationDonorService::_mutex"); // The latest majority-committed migration state. DurableState _durableState; - // Promise that is resolved when run() has been called and the CancelationSource has been - // instantiated. - SharedPromise<void> _migrationCancelablePromise; - // Promise that is resolved when the donor has majority-committed the write to insert the // donor state doc for the migration. SharedPromise<void> _initialDonorStateDurablePromise; @@ -320,9 +308,14 @@ public: // abort. SharedPromise<void> _decisionPromise; + // Set to true when a request to cancel the migration has been processed, e.g. after + // executing the donorAbortMigration command. + bool _abortRequested{false}; + // Used for logical interrupts that require aborting the migration but not unconditionally - // interrupting the instance, e.g. receiving donorAbortMigration. - CancelationSource _abortMigrationSource; + // interrupting the instance, e.g. receiving donorAbortMigration. Initialized in + // _initAbortMigrationSource(). + boost::optional<CancelationSource> _abortMigrationSource; }; private: diff --git a/src/mongo/db/repl/tenant_migration_donor_service_test.cpp b/src/mongo/db/repl/tenant_migration_donor_service_test.cpp index 9f25de63f3c..53588c3c158 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service_test.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service_test.cpp @@ -176,7 +176,8 @@ TEST_F(TenantMigrationDonorServiceTest, CheckSettingMigrationStartDate) { // Advance the clock by some arbitrary amount of time so we are not starting at 0 seconds. _clkSource->advance(Milliseconds(10000)); - auto taskFp = globalFailPointRegistry().find("pauseTenantMigrationBeforeEnteringFutureChain"); + auto taskFp = + globalFailPointRegistry().find("pauseTenantMigrationAfterPersistingInitialDonorStateDoc"); auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn); const UUID migrationUUID = UUID::gen(); |