diff options
Diffstat (limited to 'src/mongo/db/serverless/shard_split_donor_service.cpp')
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 37 |
1 files changed, 11 insertions, 26 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 5d59e965d3e..26e17a3e2b5 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -71,7 +71,6 @@ MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeSplitConfigRemoval); MONGO_FAIL_POINT_DEFINE(skipShardSplitRecipientCleanup); MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeLeavingBlockingState); MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterUpdatingToCommittedState); -MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeSendingStepUpToRecipients); MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterReceivingAbortCmd); const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); @@ -753,8 +752,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfi ExecutorFuture<void> remoteAdminCommand(TaskExecutorPtr executor, const CancellationToken& token, const HostAndPort remoteNode, - const BSONObj& command, - std::function<bool(Status)> untilCondition = nullptr) { + const BSONObj& command) { return AsyncTry([executor, token, remoteNode, command] { executor::RemoteCommandRequest request(remoteNode, "admin", command, nullptr); auto hasWriteConcern = command.hasField(WriteConcernOptions::kWriteConcernField); @@ -769,27 +767,15 @@ ExecutorFuture<void> remoteAdminCommand(TaskExecutorPtr executor, return status; }); }) - .until([untilCondition](Status status) { - if (untilCondition) { - return untilCondition(status); - } - - return status.isOK() || - (!ErrorCodes::isRetriableError(status) && - !ErrorCodes::isNetworkTimeoutError(status)); - }) - .withBackoffBetweenIterations(kExponentialBackoff) + .until([](Status status) { return status.isOK(); }) .on(executor, token); } ExecutorFuture<void> sendStepUpToRecipient(TaskExecutorPtr executor, const CancellationToken& token, const HostAndPort recipientPrimary) { - return remoteAdminCommand(executor, - token, - recipientPrimary, - BSON("replSetStepUp" << 1 << "skipDryRun" << true), - [](Status status) { return status.isOK(); }); + return remoteAdminCommand( + executor, token, recipientPrimary, BSON("replSetStepUp" << 1 << "skipDryRun" << true)); } ExecutorFuture<void> waitForMajorityWriteOnRecipient(TaskExecutorPtr executor, @@ -823,10 +809,7 @@ ShardSplitDonorService::DonorStateMachine::_waitForSplitAcceptanceAndEnterCommit return ExecutorFuture(**executor) .then([&]() { return _splitAcceptancePromise.getFuture(); }) - .then([this, executor, primaryToken](const HostAndPort& recipientPrimary) { - // only cancel operations on stepdown from here out - _cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor); - + .then([this, executor, abortToken](const HostAndPort& recipientPrimary) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); if (MONGO_unlikely(pauseShardSplitBeforeLeavingBlockingState.shouldFail())) { pauseShardSplitBeforeLeavingBlockingState.execute([&](const BSONObj& data) { @@ -861,19 +844,21 @@ ShardSplitDonorService::DonorStateMachine::_waitForSplitAcceptanceAndEnterCommit ? *_splitAcceptanceTaskExecutorForTest : **executor; - pauseShardSplitBeforeSendingStepUpToRecipients.pauseWhileSet(); - return sendStepUpToRecipient(remoteCommandExecutor, primaryToken, recipientPrimary) - .then([this, remoteCommandExecutor, primaryToken, recipientPrimary]() { + return sendStepUpToRecipient(remoteCommandExecutor, abortToken, recipientPrimary) + .then([this, remoteCommandExecutor, abortToken, recipientPrimary]() { LOGV2(8423365, "Waiting for majority commit on recipient primary", "id"_attr = _migrationId); return waitForMajorityWriteOnRecipient( - remoteCommandExecutor, primaryToken, recipientPrimary); + remoteCommandExecutor, abortToken, recipientPrimary); }); }) .thenRunOn(**executor) .then([this, executor, primaryToken]() { + // only cancel operations on stepdown from here out + _cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor); + LOGV2(6142503, "Entering 'committed' state.", "id"_attr = _stateDoc.getId()); auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); pauseShardSplitAfterUpdatingToCommittedState.pauseWhileSet(opCtx.get()); |