summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2021-03-25 21:53:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-03 00:49:07 +0000
commit48531fdbd58769200b279ed5e31c2111376903ac (patch)
treea7f006a94a5423fb676b5ae5ce55c55fa2acba59
parent17d7453f95b4c1dfa439adb4532a82015c9a7d81 (diff)
downloadmongo-48531fdbd58769200b279ed5e31c2111376903ac.tar.gz
SERVER-56248 Refactor logic for aborting a tenant migration donor
(cherry picked from commit 4565b6d14cf75272a6487e99aabcd05b4c290670)
-rw-r--r--jstests/replsets/tenant_migration_donor_try_abort.js61
-rw-r--r--src/mongo/db/commands/tenant_migration_donor_cmds.cpp4
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp67
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h39
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service_test.cpp3
5 files changed, 120 insertions, 54 deletions
diff --git a/jstests/replsets/tenant_migration_donor_try_abort.js b/jstests/replsets/tenant_migration_donor_try_abort.js
index 5aecd26c1a4..f003429e935 100644
--- a/jstests/replsets/tenant_migration_donor_try_abort.js
+++ b/jstests/replsets/tenant_migration_donor_try_abort.js
@@ -23,6 +23,58 @@ const kDelayMS =
const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
(() => {
+ jsTestLog("Test sending donorAbortMigration before an instance's future chain begins.");
+
+ const tmt = new TenantMigrationTest({name: jsTestName()});
+ if (!tmt.isFeatureFlagEnabled()) {
+ jsTestLog("Skipping test because the tenant migrations feature flag is disabled");
+ return;
+ }
+
+ const donorPrimary = tmt.getDonorPrimary();
+ let fp = configureFailPoint(donorPrimary, "pauseTenantMigrationBeforeEnteringFutureChain");
+
+ const tenantId = kTenantId;
+ const migrationId = extractUUIDFromObject(UUID());
+ const migrationOpts = {
+ migrationIdString: migrationId,
+ tenantId: tenantId,
+ recipientConnString: tmt.getRecipientConnString(),
+ };
+
+ const donorRstArgs = TenantMigrationUtil.createRstArgs(tmt.getDonorRst());
+
+ const startMigrationThread =
+ new Thread(TenantMigrationUtil.runMigrationAsync, migrationOpts, donorRstArgs);
+ startMigrationThread.start();
+
+ fp.wait();
+
+ const tryAbortThread =
+ new Thread(TenantMigrationUtil.tryAbortMigrationAsync, migrationOpts, donorRstArgs);
+ tryAbortThread.start();
+
+ // Wait for donorAbortMigration command to start.
+ assert.soon(() => {
+ const res = assert.commandWorked(donorPrimary.adminCommand(
+ {currentOp: true, desc: "tenant donor migration", tenantId: tenantId}));
+ return res.inprog[0].receivedCancelation;
+ });
+
+ fp.off();
+
+ startMigrationThread.join();
+ tryAbortThread.join();
+ let r = assert.commandWorked(tryAbortThread.returnData());
+
+ const stateRes = assert.commandWorked(tmt.waitForMigrationToComplete(migrationOpts));
+ assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted);
+ assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes));
+
+ tmt.stop();
+})();
+
+(() => {
jsTestLog(
"Test sending donorAbortMigration during a tenant migration while recipientSyncData " +
"command repeatedly fails with retryable errors.");
@@ -58,6 +110,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
const stateRes =
assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted);
+ assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes));
fp.off();
tenantMigrationTest.stop();
@@ -99,6 +152,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
const stateRes =
assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted);
+ assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes));
fp.off();
tenantMigrationTest.stop();
@@ -160,6 +214,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
const stateRes =
assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted);
+ assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes));
tenantMigrationTest.stop();
})();
@@ -199,6 +254,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
const stateRes =
assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted);
+ assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes));
tenantMigrationTest.stop();
})();
@@ -251,6 +307,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
const stateRes = assert.commandWorked(tmt.waitForMigrationToComplete(migrationOpts));
assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted);
+ assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes));
tmt.stop();
})();
@@ -362,6 +419,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
const stateRes =
assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted);
+ assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes));
tenantMigrationTest.stop();
})();
@@ -410,6 +468,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
const stateRes =
assert.commandWorked(tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted);
+ assert.eq(stateRes.abortReason.code, ErrorCodes.TenantMigrationAborted, tojson(stateRes));
tenantMigrationTest.stop();
})();
@@ -436,6 +495,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
assert.eq(stateRes.state, TenantMigrationTest.DonorState.kAborted);
+ assert.eq(stateRes.abortReason.code, ErrorCodes.InternalError, tojson(stateRes));
fp.off();
@@ -464,6 +524,7 @@ const migrationX509Options = TenantMigrationUtil.makeX509OptionsForTest();
const stateRes = assert.commandWorked(tenantMigrationTest.runMigration(migrationOpts));
assert.eq(stateRes.state, TenantMigrationTest.DonorState.kCommitted);
+ assert.isnull(stateRes.abortReason, tojson(stateRes));
assert.commandFailedWithCode(
tenantMigrationTest.tryAbortMigration({migrationIdString: migrationOpts.migrationIdString}),
diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
index 4dee89efc4b..129aa787b40 100644
--- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
+++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
@@ -281,10 +281,6 @@ public:
const auto& donor = donorPtr.get().get();
- // Ensure that we only are able to run donorAbortMigration after the donor has called
- // run() and has inserted a majority committed state document for the migration.
- donor->getMigrationCancelableFuture().get(opCtx);
- donor->getInitialDonorStateDurableFuture().get(opCtx);
donor->onReceiveDonorAbortMigration();
donor->getDecisionFuture().get(opCtx);
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 0595a77f7a5..93a691e2862 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -329,11 +329,11 @@ boost::optional<BSONObj> TenantMigrationDonorService::Instance::reportForCurrent
BSONObjBuilder bob;
bob.append("desc", "tenant donor migration");
bob.append("migrationCompleted", _completionPromise.getFuture().isReady());
- bob.append("receivedCancelation", _abortMigrationSource.token().isCanceled());
bob.append("instanceID", _migrationUuid.toBSON());
bob.append("tenantId", _tenantId);
bob.append("recipientConnectionString", _recipientConnectionString);
bob.append("readPreference", _readPreference.toInnerBSON());
+ bob.append("receivedCancelation", _abortRequested);
bob.append("lastDurableState", _durableState.state);
if (_stateDoc.getMigrationStart()) {
bob.appendDate("migrationStart", *_stateDoc.getMigrationStart());
@@ -386,9 +386,11 @@ TenantMigrationDonorService::Instance::getDurableState(OperationContext* opCtx)
}
void TenantMigrationDonorService::Instance::onReceiveDonorAbortMigration() {
- _abortMigrationSource.cancel();
-
stdx::lock_guard<Latch> lg(_mutex);
+ _abortRequested = true;
+ if (_abortMigrationSource) {
+ _abortMigrationSource->cancel();
+ }
if (auto fetcher = _recipientKeysFetcher.lock()) {
fetcher->shutdown();
}
@@ -406,7 +408,6 @@ void TenantMigrationDonorService::Instance::interrupt(Status status) {
setPromiseErrorIfNotReady(lg, _receiveDonorForgetMigrationPromise, status);
setPromiseErrorIfNotReady(lg, _completionPromise, status);
setPromiseErrorIfNotReady(lg, _decisionPromise, status);
- setPromiseErrorIfNotReady(lg, _migrationCancelablePromise, status);
if (auto fetcher = _recipientKeysFetcher.lock()) {
fetcher->shutdown();
@@ -714,9 +715,25 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget
return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token);
}
+CancelationToken TenantMigrationDonorService::Instance::_initAbortMigrationSource(
+ const CancelationToken& token) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ invariant(!_abortMigrationSource);
+ _abortMigrationSource = CancelationSource(token);
+
+ if (_abortRequested) {
+ // An abort was requested before the abort source was set up so immediately cancel it.
+ _abortMigrationSource->cancel();
+ }
+
+ return _abortMigrationSource->token();
+}
+
SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancelationToken& token) noexcept {
+ pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet();
+
{
stdx::lock_guard<Latch> lg(_mutex);
if (!_stateDoc.getMigrationStart()) {
@@ -724,45 +741,42 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
}
}
- pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet();
-
- _abortMigrationSource = CancelationSource(token);
+ auto abortToken = _initAbortMigrationSource(token);
- {
- stdx::lock_guard<Latch> lg(_mutex);
- setPromiseOkIfNotReady(lg, _migrationCancelablePromise);
- }
auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>(
_recipientUri.getSetName(), _recipientUri.getServers());
auto scopedOutstandingMigrationCounter =
TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingDonatingCount();
return ExecutorFuture(**executor)
- .then([this, self = shared_from_this(), executor] {
- return _enterAbortingIndexBuildsState(executor, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor, token] {
+ // Note we do not use the abort migration token here because the donorAbortMigration
+ // command waits for a decision to be persisted which will not happen if inserting the
+ // initial state document fails.
+ return _enterAbortingIndexBuildsState(executor, token);
})
- .then([this, self = shared_from_this(), executor] {
- _abortIndexBuilds(_abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor, abortToken] {
+ _abortIndexBuilds(abortToken);
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] {
return _fetchAndStoreRecipientClusterTimeKeyDocs(
- executor, recipientTargeterRS, _abortMigrationSource.token());
+ executor, recipientTargeterRS, abortToken);
})
- .then([this, self = shared_from_this(), executor] {
- return _enterDataSyncState(executor, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor, abortToken] {
+ return _enterDataSyncState(executor, abortToken);
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] {
return _waitForRecipientToBecomeConsistentAndEnterBlockingState(
- executor, recipientTargeterRS, _abortMigrationSource.token());
+ executor, recipientTargeterRS, abortToken);
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] {
return _waitForRecipientToReachBlockTimestampAndEnterCommittedState(
- executor, recipientTargeterRS, _abortMigrationSource.token());
+ executor, recipientTargeterRS, abortToken);
})
// Note from here on the migration cannot be aborted, so only the token from the primary
// only service should be used.
- .onError([this, self = shared_from_this(), executor, token](Status status) {
- return _handleErrorOrEnterAbortedState(executor, token, status);
+ .onError([this, self = shared_from_this(), executor, token, abortToken](Status status) {
+ return _handleErrorOrEnterAbortedState(executor, token, abortToken, status);
})
.onCompletion([this, self = shared_from_this()](Status status) {
LOGV2(5006601,
@@ -1106,6 +1120,7 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd
ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterAbortedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancelationToken& token,
+ const CancelationToken& abortToken,
Status status) {
{
stdx::lock_guard<Latch> lg(_mutex);
@@ -1115,7 +1130,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA
}
}
- if (_abortMigrationSource.token().isCanceled()) {
+ if (abortToken.isCanceled()) {
status = Status(ErrorCodes::TenantMigrationAborted, "Aborted due to donorAbortMigration.");
}
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index 71a8d1ca32f..5087017f339 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -122,22 +122,6 @@ public:
}
/**
- * Returns a Future that will be resolved when a migration has called the run() method and
- * instantiated the CancelationSource.
- */
- SharedSemiFuture<void> getMigrationCancelableFuture() const {
- return _migrationCancelablePromise.getFuture();
- }
-
- /**
- * Returns a Future that will be resolved when the donor has majority-committed the write to
- * insert the donor state doc for the migration.
- */
- SharedSemiFuture<void> getInitialDonorStateDurableFuture() const {
- return _initialDonorStateDurablePromise.getFuture();
- }
-
- /**
* Kicks off work for the donorAbortMigration command.
*/
void onReceiveDonorAbortMigration();
@@ -190,6 +174,7 @@ public:
ExecutorFuture<void> _handleErrorOrEnterAbortedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancelationToken& token,
+ const CancelationToken& abortToken,
Status status);
ExecutorFuture<void> _waitForForgetMigrationThenMarkMigrationGarbageCollectable(
@@ -268,6 +253,12 @@ public:
return recipientCmdThreadPoolLimits;
}
+ /*
+ * Initializes _abortMigrationSource and returns a token from it. The source will be
+ * immediately canceled if an abort has already been requested.
+ */
+ CancelationToken _initAbortMigrationSource(const CancelationToken& token);
+
ServiceContext* const _serviceContext;
const TenantMigrationDonorService* const _donorService;
@@ -296,16 +287,13 @@ public:
boost::optional<Status> _abortReason;
- // Protects the durable state, state document, and the promises below.
+ // Protects the durable state, state document, abort requested boolean, and the promises
+ // below.
mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationDonorService::_mutex");
// The latest majority-committed migration state.
DurableState _durableState;
- // Promise that is resolved when run() has been called and the CancelationSource has been
- // instantiated.
- SharedPromise<void> _migrationCancelablePromise;
-
// Promise that is resolved when the donor has majority-committed the write to insert the
// donor state doc for the migration.
SharedPromise<void> _initialDonorStateDurablePromise;
@@ -320,9 +308,14 @@ public:
// abort.
SharedPromise<void> _decisionPromise;
+ // Set to true when a request to cancel the migration has been processed, e.g. after
+ // executing the donorAbortMigration command.
+ bool _abortRequested{false};
+
// Used for logical interrupts that require aborting the migration but not unconditionally
- // interrupting the instance, e.g. receiving donorAbortMigration.
- CancelationSource _abortMigrationSource;
+ // interrupting the instance, e.g. receiving donorAbortMigration. Initialized in
+ // _initAbortMigrationSource().
+ boost::optional<CancelationSource> _abortMigrationSource;
};
private:
diff --git a/src/mongo/db/repl/tenant_migration_donor_service_test.cpp b/src/mongo/db/repl/tenant_migration_donor_service_test.cpp
index 9f25de63f3c..53588c3c158 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service_test.cpp
@@ -176,7 +176,8 @@ TEST_F(TenantMigrationDonorServiceTest, CheckSettingMigrationStartDate) {
// Advance the clock by some arbitrary amount of time so we are not starting at 0 seconds.
_clkSource->advance(Milliseconds(10000));
- auto taskFp = globalFailPointRegistry().find("pauseTenantMigrationBeforeEnteringFutureChain");
+ auto taskFp =
+ globalFailPointRegistry().find("pauseTenantMigrationAfterPersistingInitialDonorStateDoc");
auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn);
const UUID migrationUUID = UUID::gen();