summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2022-05-10 20:26:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-10 21:58:00 +0000
commit4a097ad4ab5c1cf6b3606578715aaf02b99420cb (patch)
tree71b1e85a0ffea09f5d09dd6c3653d7286a1a8dc5 /src
parent610ca5864154d97ddd18d225a2c148a96260eadc (diff)
downloadmongo-4a097ad4ab5c1cf6b3606578715aaf02b99420cb.tar.gz
SERVER-66326 Simplify cancellation and abort flow in shard split
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp130
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp47
2 files changed, 76 insertions, 101 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 290bc98f144..8d70460b54f 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -334,18 +334,16 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
_markKilledExecutor->startup();
_cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor);
- const bool shouldRemoveStateDocumentOnRecipient = [&]() {
- stdx::lock_guard<Latch> lg(_mutex);
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- return serverless::shouldRemoveStateDocumentOnRecipient(opCtx.get(), _stateDoc);
- }();
+ _decisionPromise.setWith([&] {
+ const bool shouldRemoveStateDocumentOnRecipient = [&]() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ return serverless::shouldRemoveStateDocumentOnRecipient(opCtx.get(), _stateDoc);
+ }();
- if (shouldRemoveStateDocumentOnRecipient) {
- if (MONGO_unlikely(pauseShardSplitBeforeRecipientCleanup.shouldFail())) {
+ if (shouldRemoveStateDocumentOnRecipient) {
pauseShardSplitBeforeRecipientCleanup.pauseWhileSet();
- }
- _decisionPromise.setWith([&] {
return ExecutorFuture(**executor)
.then([this, executor, primaryToken, anchor = shared_from_this()] {
if (MONGO_unlikely(skipShardSplitRecipientCleanup.shouldFail())) {
@@ -358,41 +356,33 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
return DurableState{ShardSplitDonorStateEnum::kCommitted};
})
.unsafeToInlineFuture();
- });
-
- _completionPromise.setWith([&] {
- return _decisionPromise.getFuture().semi().ignoreValue().unsafeToInlineFuture();
- });
-
- return _completionPromise.getFuture().semi();
- }
-
- auto isConfigValidWithStatus = [&]() {
- stdx::lock_guard<Latch> lg(_mutex);
- auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext());
- invariant(replCoord);
- return serverless::validateRecipientNodesForShardSplit(_stateDoc, replCoord->getConfig());
- }();
- if (!isConfigValidWithStatus.isOK()) {
- LOGV2_ERROR(6395900,
- "Failed to validate recipient nodes for shard split.",
- "id"_attr = _migrationId,
- "status"_attr = isConfigValidWithStatus);
- _decisionPromise.emplaceValue(
- DurableState{ShardSplitDonorStateEnum::kAborted, isConfigValidWithStatus});
- _completionPromise.setFrom(
- _decisionPromise.getFuture().semi().ignoreValue().unsafeToInlineFuture());
- return _completionPromise.getFuture().semi();
- }
+ }
- _initiateTimeout(executor, abortToken);
+ auto isConfigValidWithStatus = [&]() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext());
+ invariant(replCoord);
+ return serverless::validateRecipientNodesForShardSplit(_stateDoc,
+ replCoord->getConfig());
+ }();
+
+ if (!isConfigValidWithStatus.isOK()) {
+ LOGV2_ERROR(6395900,
+ "Failed to validate recipient nodes for shard split.",
+ "id"_attr = _migrationId,
+ "status"_attr = isConfigValidWithStatus);
+ return ExecutorFuture(
+ **executor,
+ DurableState{ShardSplitDonorStateEnum::kAborted, isConfigValidWithStatus})
+ .unsafeToInlineFuture();
+ }
- LOGV2(6086506,
- "Starting shard split.",
- "id"_attr = _migrationId,
- "timeout"_attr = repl::shardSplitTimeoutMS.load());
+ _initiateTimeout(executor, abortToken);
+ LOGV2(6086506,
+ "Starting shard split.",
+ "id"_attr = _migrationId,
+ "timeout"_attr = repl::shardSplitTimeoutMS.load());
- _decisionPromise.setWith([&] {
return ExecutorFuture(**executor)
.then([this, executor, primaryToken, abortToken] {
// Note we do not use the abort split token here because the abortShardSplit
@@ -403,11 +393,8 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
.then([this, executor, abortToken] {
checkForTokenInterrupt(abortToken);
_cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
- if (MONGO_unlikely(pauseShardSplitAfterBlocking.shouldFail())) {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- pauseShardSplitAfterBlocking.pauseWhileSetAndNotCanceled(opCtx.get(),
- abortToken);
- }
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ pauseShardSplitAfterBlocking.pauseWhileSet(opCtx.get());
return _waitForRecipientToReachBlockTimestamp(executor, abortToken);
})
@@ -433,34 +420,33 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
.unsafeToInlineFuture();
});
- _completionPromise.setFrom(
- _decisionPromise.getFuture()
- .semi()
- .ignoreValue()
- .thenRunOn(**executor)
+ _completionPromise.setWith([&] {
+ return ExecutorFuture(**executor)
+ .then([&] { return _decisionPromise.getFuture().semi().ignoreValue(); })
.then([this, executor, primaryToken] {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- if (MONGO_unlikely(pauseShardSplitAfterDecision.shouldFail())) {
- pauseShardSplitAfterDecision.pauseWhileSetAndNotCanceled(opCtx.get(),
- primaryToken);
- }
+ pauseShardSplitAfterDecision.pauseWhileSet(opCtx.get());
return _waitForForgetCmdThenMarkGarbageCollectable(executor, primaryToken);
})
- .onCompletion([this, anchor = shared_from_this(), primaryToken](Status status) {
+ .onCompletion([this, primaryToken, anchor = shared_from_this()](Status status) {
stdx::lock_guard<Latch> lg(_mutex);
- LOGV2(8423357,
- "Marked shard split as garbage collectable.",
- "id"_attr = _stateDoc.getId(),
- "expireAt"_attr = _stateDoc.getExpireAt());
+ // Propagate any errors from the donor stepping down.
+ if (primaryToken.isCanceled() ||
+ _stateDoc.getState() < ShardSplitDonorStateEnum::kCommitted) {
+ return status;
+ }
LOGV2(8423356,
"Shard split completed.",
"id"_attr = _stateDoc.getId(),
"status"_attr = status,
"abortReason"_attr = _abortReason);
+
+ return Status::OK();
})
- .unsafeToInlineFuture());
+ .unsafeToInlineFuture();
+ });
return _completionPromise.getFuture().semi();
}
@@ -787,11 +773,11 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout(
whenAny(std::move(timeoutFuture),
completionFuture().semi().ignoreValue().thenRunOn(**executor))
.thenRunOn(**executor)
- .then([this, executor, anchor = shared_from_this()](auto result) {
+ .then([this, executor, abortToken, anchor = shared_from_this()](auto result) {
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() != ShardSplitDonorStateEnum::kCommitted &&
_stateDoc.getState() != ShardSplitDonorStateEnum::kAborted &&
- !_abortRequested) {
+ !abortToken.isCanceled()) {
LOGV2(6236500,
"Timeout expired, aborting shard split.",
"id"_attr = _migrationId,
@@ -827,10 +813,9 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
return ExecutorFuture(**executor, statusWithState);
}
- // There is no use to check the parent token the executor would not run if the parent token
- // is cancelled. At this point either the abortToken has been cancelled or a previous
- // operation failed. In either case we abort the migration.
- if (abortToken.isCanceled()) {
+ // Make sure we don't change the status if the abortToken is cancelled due to a POS instance
+ // interruption.
+ if (abortToken.isCanceled() && !primaryToken.isCanceled()) {
statusWithState =
Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit' command.");
}
@@ -868,16 +853,13 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
ExecutorFuture<void>
ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectable(
const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) {
- LOGV2(6236603, "Waiting to receive 'forgetShardSplit' command.", "id"_attr = _migrationId);
- auto expiredAt = [&]() {
- stdx::lock_guard<Latch> lg(_mutex);
- return _stateDoc.getExpireAt();
- }();
-
- if (expiredAt) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_stateDoc.getExpireAt() || _stateDoc.getState() < ShardSplitDonorStateEnum::kCommitted) {
return ExecutorFuture(**executor);
}
+ LOGV2(6236603, "Waiting to receive 'forgetShardSplit' command.", "id"_attr = _migrationId);
+
return future_util::withCancellation(_forgetShardSplitReceivedPromise.getFuture(), primaryToken)
.thenRunOn(**executor)
.then([this, self = shared_from_this(), executor, primaryToken] {
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index d88360f6569..51d203ef6db 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -677,7 +677,7 @@ TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) {
test::shard_split::reconfigToAddRecipientNodes(
getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
- auto initialFuture = [&]() {
+ auto firstSplitInstance = [&]() {
FailPointEnableBlock fp("pauseShardSplitAfterBlocking");
auto initialTimesEntered = fp.initialTimesEntered();
@@ -687,38 +687,31 @@ TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) {
ASSERT(serviceInstance.get());
fp->waitForTimesEntered(initialTimesEntered + 1);
- stepDown();
-
- return serviceInstance->decisionFuture();
+ return serviceInstance;
}();
- auto result = initialFuture.getNoThrow();
+ stepDown();
+ auto result = firstSplitInstance->completionFuture().getNoThrow();
ASSERT_FALSE(result.isOK());
- ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.getStatus().code());
-
- // verify that the state document exists
- ASSERT_OK(getStateDocument(opCtx.get(), _uuid).getStatus());
-
- auto fp = std::make_unique<FailPointEnableBlock>("pauseShardSplitAfterBlocking");
- auto initialTimesEntered = fp->initialTimesEntered();
+ ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.code());
- stepUp(opCtx.get());
-
- fp->failPoint()->waitForTimesEntered(initialTimesEntered + 1);
-
- // verify that the state document exists
- ASSERT_OK(getStateDocument(opCtx.get(), _uuid).getStatus());
- auto donor = ShardSplitDonorService::DonorStateMachine::lookup(
- opCtx.get(), _service, BSON("_id" << _uuid));
- ASSERT(donor);
-
- fp.reset();
+ auto secondSplitInstance = [&]() {
+ FailPointEnableBlock fp("pauseShardSplitAfterBlocking");
+ stepUp(opCtx.get());
+ fp->waitForTimesEntered(fp.initialTimesEntered() + 1);
+
+ ASSERT_OK(getStateDocument(opCtx.get(), _uuid).getStatus());
+ auto serviceInstance = ShardSplitDonorService::DonorStateMachine::lookup(
+ opCtx.get(), _service, BSON("_id" << _uuid));
+ ASSERT(serviceInstance);
+ return *serviceInstance;
+ }();
- ASSERT_OK((*donor)->decisionFuture().getNoThrow().getStatus());
+ ASSERT_OK(secondSplitInstance->decisionFuture().getNoThrow().getStatus());
- (*donor)->tryForget();
- ASSERT_OK((*donor)->completionFuture().getNoThrow());
- ASSERT_TRUE((*donor)->isGarbageCollectable());
+ secondSplitInstance->tryForget();
+ ASSERT_OK(secondSplitInstance->completionFuture().getNoThrow());
+ ASSERT_TRUE(secondSplitInstance->isGarbageCollectable());
}
class ShardSplitPersistenceTest : public ShardSplitDonorServiceTest {