diff options
author | Matt Broadstone <mbroadst@mongodb.com> | 2022-05-10 20:26:39 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-10 21:58:00 +0000 |
commit | 4a097ad4ab5c1cf6b3606578715aaf02b99420cb (patch) | |
tree | 71b1e85a0ffea09f5d09dd6c3653d7286a1a8dc5 /src | |
parent | 610ca5864154d97ddd18d225a2c148a96260eadc (diff) | |
download | mongo-4a097ad4ab5c1cf6b3606578715aaf02b99420cb.tar.gz |
SERVER-66326 Simplify cancellation and abort flow in shard split
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 130 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service_test.cpp | 47 |
2 files changed, 76 insertions, 101 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 290bc98f144..8d70460b54f 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -334,18 +334,16 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( _markKilledExecutor->startup(); _cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor); - const bool shouldRemoveStateDocumentOnRecipient = [&]() { - stdx::lock_guard<Latch> lg(_mutex); - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - return serverless::shouldRemoveStateDocumentOnRecipient(opCtx.get(), _stateDoc); - }(); + _decisionPromise.setWith([&] { + const bool shouldRemoveStateDocumentOnRecipient = [&]() { + stdx::lock_guard<Latch> lg(_mutex); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + return serverless::shouldRemoveStateDocumentOnRecipient(opCtx.get(), _stateDoc); + }(); - if (shouldRemoveStateDocumentOnRecipient) { - if (MONGO_unlikely(pauseShardSplitBeforeRecipientCleanup.shouldFail())) { + if (shouldRemoveStateDocumentOnRecipient) { pauseShardSplitBeforeRecipientCleanup.pauseWhileSet(); - } - _decisionPromise.setWith([&] { return ExecutorFuture(**executor) .then([this, executor, primaryToken, anchor = shared_from_this()] { if (MONGO_unlikely(skipShardSplitRecipientCleanup.shouldFail())) { @@ -358,41 +356,33 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( return DurableState{ShardSplitDonorStateEnum::kCommitted}; }) .unsafeToInlineFuture(); - }); - - _completionPromise.setWith([&] { - return _decisionPromise.getFuture().semi().ignoreValue().unsafeToInlineFuture(); - }); - - return _completionPromise.getFuture().semi(); - } - - auto isConfigValidWithStatus = [&]() { - stdx::lock_guard<Latch> lg(_mutex); - auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); - invariant(replCoord); - return serverless::validateRecipientNodesForShardSplit(_stateDoc, replCoord->getConfig()); - }(); - if (!isConfigValidWithStatus.isOK()) { - LOGV2_ERROR(6395900, - "Failed to validate recipient nodes for shard split.", - "id"_attr = _migrationId, - "status"_attr = isConfigValidWithStatus); - _decisionPromise.emplaceValue( - DurableState{ShardSplitDonorStateEnum::kAborted, isConfigValidWithStatus}); - _completionPromise.setFrom( - _decisionPromise.getFuture().semi().ignoreValue().unsafeToInlineFuture()); - return _completionPromise.getFuture().semi(); - } + } - _initiateTimeout(executor, abortToken); + auto isConfigValidWithStatus = [&]() { + stdx::lock_guard<Latch> lg(_mutex); + auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); + invariant(replCoord); + return serverless::validateRecipientNodesForShardSplit(_stateDoc, + replCoord->getConfig()); + }(); + + if (!isConfigValidWithStatus.isOK()) { + LOGV2_ERROR(6395900, + "Failed to validate recipient nodes for shard split.", + "id"_attr = _migrationId, + "status"_attr = isConfigValidWithStatus); + return ExecutorFuture( + **executor, + DurableState{ShardSplitDonorStateEnum::kAborted, isConfigValidWithStatus}) + .unsafeToInlineFuture(); + } - LOGV2(6086506, - "Starting shard split.", - "id"_attr = _migrationId, - "timeout"_attr = repl::shardSplitTimeoutMS.load()); + _initiateTimeout(executor, abortToken); + LOGV2(6086506, + "Starting shard split.", + "id"_attr = _migrationId, + "timeout"_attr = repl::shardSplitTimeoutMS.load()); - _decisionPromise.setWith([&] { return ExecutorFuture(**executor) .then([this, executor, primaryToken, abortToken] { // Note we do not use the abort split token here because the abortShardSplit @@ -403,11 +393,8 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( .then([this, executor, abortToken] { checkForTokenInterrupt(abortToken); _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); - if (MONGO_unlikely(pauseShardSplitAfterBlocking.shouldFail())) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - pauseShardSplitAfterBlocking.pauseWhileSetAndNotCanceled(opCtx.get(), - abortToken); - } + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + pauseShardSplitAfterBlocking.pauseWhileSet(opCtx.get()); return _waitForRecipientToReachBlockTimestamp(executor, abortToken); }) @@ -433,34 +420,33 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( .unsafeToInlineFuture(); }); - _completionPromise.setFrom( - _decisionPromise.getFuture() - .semi() - .ignoreValue() - .thenRunOn(**executor) + _completionPromise.setWith([&] { + return ExecutorFuture(**executor) + .then([&] { return _decisionPromise.getFuture().semi().ignoreValue(); }) .then([this, executor, primaryToken] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - if (MONGO_unlikely(pauseShardSplitAfterDecision.shouldFail())) { - pauseShardSplitAfterDecision.pauseWhileSetAndNotCanceled(opCtx.get(), - primaryToken); - } + pauseShardSplitAfterDecision.pauseWhileSet(opCtx.get()); return _waitForForgetCmdThenMarkGarbageCollectable(executor, primaryToken); }) - .onCompletion([this, anchor = shared_from_this(), primaryToken](Status status) { + .onCompletion([this, primaryToken, anchor = shared_from_this()](Status status) { stdx::lock_guard<Latch> lg(_mutex); - LOGV2(8423357, - "Marked shard split as garbage collectable.", - "id"_attr = _stateDoc.getId(), - "expireAt"_attr = _stateDoc.getExpireAt()); + // Propagate any errors from the donor stepping down. + if (primaryToken.isCanceled() || + _stateDoc.getState() < ShardSplitDonorStateEnum::kCommitted) { + return status; + } LOGV2(8423356, "Shard split completed.", "id"_attr = _stateDoc.getId(), "status"_attr = status, "abortReason"_attr = _abortReason); + + return Status::OK(); }) - .unsafeToInlineFuture()); + .unsafeToInlineFuture(); + }); return _completionPromise.getFuture().semi(); } @@ -787,11 +773,11 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout( whenAny(std::move(timeoutFuture), completionFuture().semi().ignoreValue().thenRunOn(**executor)) .thenRunOn(**executor) - .then([this, executor, anchor = shared_from_this()](auto result) { + .then([this, executor, abortToken, anchor = shared_from_this()](auto result) { stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() != ShardSplitDonorStateEnum::kCommitted && _stateDoc.getState() != ShardSplitDonorStateEnum::kAborted && - !_abortRequested) { + !abortToken.isCanceled()) { LOGV2(6236500, "Timeout expired, aborting shard split.", "id"_attr = _migrationId, @@ -827,10 +813,9 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( return ExecutorFuture(**executor, statusWithState); } - // There is no use to check the parent token the executor would not run if the parent token - // is cancelled. At this point either the abortToken has been cancelled or a previous - // operation failed. In either case we abort the migration. - if (abortToken.isCanceled()) { + // Make sure we don't change the status if the abortToken is cancelled due to a POS instance + // interruption. + if (abortToken.isCanceled() && !primaryToken.isCanceled()) { statusWithState = Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit' command."); } @@ -868,16 +853,13 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectable( const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) { - LOGV2(6236603, "Waiting to receive 'forgetShardSplit' command.", "id"_attr = _migrationId); - auto expiredAt = [&]() { - stdx::lock_guard<Latch> lg(_mutex); - return _stateDoc.getExpireAt(); - }(); - - if (expiredAt) { + stdx::lock_guard<Latch> lg(_mutex); + if (_stateDoc.getExpireAt() || _stateDoc.getState() < ShardSplitDonorStateEnum::kCommitted) { return ExecutorFuture(**executor); } + LOGV2(6236603, "Waiting to receive 'forgetShardSplit' command.", "id"_attr = _migrationId); + return future_util::withCancellation(_forgetShardSplitReceivedPromise.getFuture(), primaryToken) .thenRunOn(**executor) .then([this, self = shared_from_this(), executor, primaryToken] { diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index d88360f6569..51d203ef6db 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -677,7 +677,7 @@ TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) { test::shard_split::reconfigToAddRecipientNodes( getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); - auto initialFuture = [&]() { + auto firstSplitInstance = [&]() { FailPointEnableBlock fp("pauseShardSplitAfterBlocking"); auto initialTimesEntered = fp.initialTimesEntered(); @@ -687,38 +687,31 @@ TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) { ASSERT(serviceInstance.get()); fp->waitForTimesEntered(initialTimesEntered + 1); - stepDown(); - - return serviceInstance->decisionFuture(); + return serviceInstance; }(); - auto result = initialFuture.getNoThrow(); + stepDown(); + auto result = firstSplitInstance->completionFuture().getNoThrow(); ASSERT_FALSE(result.isOK()); - ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.getStatus().code()); - - // verify that the state document exists - ASSERT_OK(getStateDocument(opCtx.get(), _uuid).getStatus()); - - auto fp = std::make_unique<FailPointEnableBlock>("pauseShardSplitAfterBlocking"); - auto initialTimesEntered = fp->initialTimesEntered(); + ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.code()); - stepUp(opCtx.get()); - - fp->failPoint()->waitForTimesEntered(initialTimesEntered + 1); - - // verify that the state document exists - ASSERT_OK(getStateDocument(opCtx.get(), _uuid).getStatus()); - auto donor = ShardSplitDonorService::DonorStateMachine::lookup( - opCtx.get(), _service, BSON("_id" << _uuid)); - ASSERT(donor); - - fp.reset(); + auto secondSplitInstance = [&]() { + FailPointEnableBlock fp("pauseShardSplitAfterBlocking"); + stepUp(opCtx.get()); + fp->waitForTimesEntered(fp.initialTimesEntered() + 1); + + ASSERT_OK(getStateDocument(opCtx.get(), _uuid).getStatus()); + auto serviceInstance = ShardSplitDonorService::DonorStateMachine::lookup( + opCtx.get(), _service, BSON("_id" << _uuid)); + ASSERT(serviceInstance); + return *serviceInstance; + }(); - ASSERT_OK((*donor)->decisionFuture().getNoThrow().getStatus()); + ASSERT_OK(secondSplitInstance->decisionFuture().getNoThrow().getStatus()); - (*donor)->tryForget(); - ASSERT_OK((*donor)->completionFuture().getNoThrow()); - ASSERT_TRUE((*donor)->isGarbageCollectable()); + secondSplitInstance->tryForget(); + ASSERT_OK(secondSplitInstance->completionFuture().getNoThrow()); + ASSERT_TRUE(secondSplitInstance->isGarbageCollectable()); } class ShardSplitPersistenceTest : public ShardSplitDonorServiceTest { |