summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2021-03-25 21:53:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-22 19:13:05 +0000
commit4565b6d14cf75272a6487e99aabcd05b4c290670 (patch)
tree20b7cfc287bd26731c9b344f74f5ec5356add1a6 /src
parentbc7b5da3d0d91eeb945f103d81f2cebd97f7d9f3 (diff)
downloadmongo-4565b6d14cf75272a6487e99aabcd05b4c290670.tar.gz
SERVER-56248 Refactor logic for aborting a tenant migration donor
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/tenant_migration_donor_cmds.cpp4
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.cpp67
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service.h39
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_service_test.cpp3
4 files changed, 59 insertions, 54 deletions
diff --git a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
index 4dee89efc4b..129aa787b40 100644
--- a/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
+++ b/src/mongo/db/commands/tenant_migration_donor_cmds.cpp
@@ -281,10 +281,6 @@ public:
const auto& donor = donorPtr.get().get();
- // Ensure that we only are able to run donorAbortMigration after the donor has called
- // run() and has inserted a majority committed state document for the migration.
- donor->getMigrationCancelableFuture().get(opCtx);
- donor->getInitialDonorStateDurableFuture().get(opCtx);
donor->onReceiveDonorAbortMigration();
donor->getDecisionFuture().get(opCtx);
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp
index 69190f729ab..77e6f66e34c 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp
@@ -333,7 +333,7 @@ boost::optional<BSONObj> TenantMigrationDonorService::Instance::reportForCurrent
bob.append("tenantId", _tenantId);
bob.append("recipientConnectionString", _recipientConnectionString);
bob.append("readPreference", _readPreference.toInnerBSON());
- bob.append("receivedCancellation", _abortMigrationSource.token().isCanceled());
+ bob.append("receivedCancellation", _abortRequested);
bob.append("lastDurableState", _durableState.state);
if (_stateDoc.getMigrationStart()) {
bob.appendDate("migrationStart", *_stateDoc.getMigrationStart());
@@ -385,9 +385,11 @@ TenantMigrationDonorService::Instance::getDurableState(OperationContext* opCtx)
}
void TenantMigrationDonorService::Instance::onReceiveDonorAbortMigration() {
- _abortMigrationSource.cancel();
-
stdx::lock_guard<Latch> lg(_mutex);
+ _abortRequested = true;
+ if (_abortMigrationSource) {
+ _abortMigrationSource->cancel();
+ }
if (auto fetcher = _recipientKeysFetcher.lock()) {
fetcher->shutdown();
}
@@ -405,7 +407,6 @@ void TenantMigrationDonorService::Instance::interrupt(Status status) {
setPromiseErrorIfNotReady(lg, _receiveDonorForgetMigrationPromise, status);
setPromiseErrorIfNotReady(lg, _completionPromise, status);
setPromiseErrorIfNotReady(lg, _decisionPromise, status);
- setPromiseErrorIfNotReady(lg, _migrationCancelablePromise, status);
if (auto fetcher = _recipientKeysFetcher.lock()) {
fetcher->shutdown();
@@ -713,9 +714,25 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_sendRecipientForget
return _sendCommandToRecipient(executor, recipientTargeterRS, request.toBSON(BSONObj()), token);
}
+CancellationToken TenantMigrationDonorService::Instance::_initAbortMigrationSource(
+ const CancellationToken& token) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ invariant(!_abortMigrationSource);
+ _abortMigrationSource = CancellationSource(token);
+
+ if (_abortRequested) {
+ // An abort was requested before the abort source was set up so immediately cancel it.
+ _abortMigrationSource->cancel();
+ }
+
+ return _abortMigrationSource->token();
+}
+
SemiFuture<void> TenantMigrationDonorService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
+ pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet();
+
{
stdx::lock_guard<Latch> lg(_mutex);
if (!_stateDoc.getMigrationStart()) {
@@ -723,45 +740,42 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run(
}
}
- pauseTenantMigrationBeforeEnteringFutureChain.pauseWhileSet();
-
- _abortMigrationSource = CancellationSource(token);
+ auto abortToken = _initAbortMigrationSource(token);
- {
- stdx::lock_guard<Latch> lg(_mutex);
- setPromiseOkIfNotReady(lg, _migrationCancelablePromise);
- }
auto recipientTargeterRS = std::make_shared<RemoteCommandTargeterRS>(
_recipientUri.getSetName(), _recipientUri.getServers());
auto scopedOutstandingMigrationCounter =
TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingDonatingCount();
return ExecutorFuture(**executor)
- .then([this, self = shared_from_this(), executor] {
- return _enterAbortingIndexBuildsState(executor, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor, token] {
+ // Note we do not use the abort migration token here because the donorAbortMigration
+ // command waits for a decision to be persisted which will not happen if inserting the
+ // initial state document fails.
+ return _enterAbortingIndexBuildsState(executor, token);
})
- .then([this, self = shared_from_this(), executor] {
- _abortIndexBuilds(_abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor, abortToken] {
+ _abortIndexBuilds(abortToken);
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] {
return _fetchAndStoreRecipientClusterTimeKeyDocs(
- executor, recipientTargeterRS, _abortMigrationSource.token());
+ executor, recipientTargeterRS, abortToken);
})
- .then([this, self = shared_from_this(), executor] {
- return _enterDataSyncState(executor, _abortMigrationSource.token());
+ .then([this, self = shared_from_this(), executor, abortToken] {
+ return _enterDataSyncState(executor, abortToken);
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] {
return _waitForRecipientToBecomeConsistentAndEnterBlockingState(
- executor, recipientTargeterRS, _abortMigrationSource.token());
+ executor, recipientTargeterRS, abortToken);
})
- .then([this, self = shared_from_this(), executor, recipientTargeterRS] {
+ .then([this, self = shared_from_this(), executor, recipientTargeterRS, abortToken] {
return _waitForRecipientToReachBlockTimestampAndEnterCommittedState(
- executor, recipientTargeterRS, _abortMigrationSource.token());
+ executor, recipientTargeterRS, abortToken);
})
// Note from here on the migration cannot be aborted, so only the token from the primary
// only service should be used.
- .onError([this, self = shared_from_this(), executor, token](Status status) {
- return _handleErrorOrEnterAbortedState(executor, token, status);
+ .onError([this, self = shared_from_this(), executor, token, abortToken](Status status) {
+ return _handleErrorOrEnterAbortedState(executor, token, abortToken, status);
})
.onCompletion([this, self = shared_from_this()](Status status) {
LOGV2(5006601,
@@ -1105,6 +1119,7 @@ TenantMigrationDonorService::Instance::_waitForRecipientToReachBlockTimestampAnd
ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterAbortedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& token,
+ const CancellationToken& abortToken,
Status status) {
{
stdx::lock_guard<Latch> lg(_mutex);
@@ -1114,7 +1129,7 @@ ExecutorFuture<void> TenantMigrationDonorService::Instance::_handleErrorOrEnterA
}
}
- if (_abortMigrationSource.token().isCanceled()) {
+ if (abortToken.isCanceled()) {
status = Status(ErrorCodes::TenantMigrationAborted, "Aborted due to donorAbortMigration.");
}
diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h
index a596261e0f7..842d2e8c0d5 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service.h
+++ b/src/mongo/db/repl/tenant_migration_donor_service.h
@@ -121,22 +121,6 @@ public:
}
/**
- * Returns a Future that will be resolved when a migration has called the run() method and
- * instantiated the CancellationSource.
- */
- SharedSemiFuture<void> getMigrationCancelableFuture() const {
- return _migrationCancelablePromise.getFuture();
- }
-
- /**
- * Returns a Future that will be resolved when the donor has majority-committed the write to
- * insert the donor state doc for the migration.
- */
- SharedSemiFuture<void> getInitialDonorStateDurableFuture() const {
- return _initialDonorStateDurablePromise.getFuture();
- }
-
- /**
* Kicks off work for the donorAbortMigration command.
*/
void onReceiveDonorAbortMigration();
@@ -189,6 +173,7 @@ public:
ExecutorFuture<void> _handleErrorOrEnterAbortedState(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& token,
+ const CancellationToken& abortToken,
Status status);
ExecutorFuture<void> _waitForForgetMigrationThenMarkMigrationGarbageCollectable(
@@ -267,6 +252,12 @@ public:
return recipientCmdThreadPoolLimits;
}
+ /*
+ * Initializes _abortMigrationSource and returns a token from it. The source will be
+ * immediately canceled if an abort has already been requested.
+ */
+ CancellationToken _initAbortMigrationSource(const CancellationToken& token);
+
ServiceContext* const _serviceContext;
const TenantMigrationDonorService* const _donorService;
@@ -295,16 +286,13 @@ public:
boost::optional<Status> _abortReason;
- // Protects the durable state, state document, and the promises below.
+ // Protects the durable state, state document, abort requested boolean, and the promises
+ // below.
mutable Mutex _mutex = MONGO_MAKE_LATCH("TenantMigrationDonorService::_mutex");
// The latest majority-committed migration state.
DurableState _durableState;
- // Promise that is resolved when run() has been called and the CancellationSource has been
- // instantiated.
- SharedPromise<void> _migrationCancelablePromise;
-
// Promise that is resolved when the donor has majority-committed the write to insert the
// donor state doc for the migration.
SharedPromise<void> _initialDonorStateDurablePromise;
@@ -319,9 +307,14 @@ public:
// abort.
SharedPromise<void> _decisionPromise;
+ // Set to true when a request to cancel the migration has been processed, e.g. after
+ // executing the donorAbortMigration command.
+ bool _abortRequested{false};
+
// Used for logical interrupts that require aborting the migration but not unconditionally
- // interrupting the instance, e.g. receiving donorAbortMigration.
- CancellationSource _abortMigrationSource;
+ // interrupting the instance, e.g. receiving donorAbortMigration. Initialized in
+ // _initAbortMigrationSource().
+ boost::optional<CancellationSource> _abortMigrationSource;
};
private:
diff --git a/src/mongo/db/repl/tenant_migration_donor_service_test.cpp b/src/mongo/db/repl/tenant_migration_donor_service_test.cpp
index 9f25de63f3c..53588c3c158 100644
--- a/src/mongo/db/repl/tenant_migration_donor_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_donor_service_test.cpp
@@ -176,7 +176,8 @@ TEST_F(TenantMigrationDonorServiceTest, CheckSettingMigrationStartDate) {
// Advance the clock by some arbitrary amount of time so we are not starting at 0 seconds.
_clkSource->advance(Milliseconds(10000));
- auto taskFp = globalFailPointRegistry().find("pauseTenantMigrationBeforeEnteringFutureChain");
+ auto taskFp =
+ globalFailPointRegistry().find("pauseTenantMigrationAfterPersistingInitialDonorStateDoc");
auto initialTimesEntered = taskFp->setMode(FailPoint::alwaysOn);
const UUID migrationUUID = UUID::gen();