diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_donor_service.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_service.cpp | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 99b6a0af97d..d4c2ade3eb4 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -228,7 +228,8 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_runUntilBlockin executor, abortToken); }) .then([this, executor, abortToken] { - return _awaitAllRecipientsDoneApplying(executor, abortToken); + return _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites( + executor, abortToken); }) .then([this] { _writeTransactionOplogEntryThenTransitionToBlockingWrites(); }); }) @@ -482,14 +483,14 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted); } } +} - if (coordinatorState >= CoordinatorStateEnum::kBlockingWrites) { - _critSecWasAcquired.getFuture().wait(opCtx); - } +SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionAcquired() { + return _critSecWasAcquired.getFuture(); +} - if (coordinatorState >= CoordinatorStateEnum::kCommitting) { - _critSecWasPromoted.getFuture().wait(opCtx); - } +SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionPromoted() { + return _critSecWasPromoted.getFuture(); } void ReshardingDonorService::DonorStateMachine:: @@ -578,20 +579,22 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: }); } -ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_awaitAllRecipientsDoneApplying( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken) { +ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: + _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) { if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) { return ExecutorFuture<void>(**executor, Status::OK()); } return future_util::withCancellation(_allRecipientsDoneApplying.getFuture(), abortToken) - .thenRunOn(**executor); + .thenRunOn(**executor) + .then([this] { _transitionState(DonorStateEnum::kPreparingToBlockWrites); }); } void ReshardingDonorService::DonorStateMachine:: _writeTransactionOplogEntryThenTransitionToBlockingWrites() { - if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) { + if (_donorCtx.getState() > DonorStateEnum::kPreparingToBlockWrites) { stdx::lock_guard<Latch> lk(_mutex); ensureFulfilledPromise(lk, _critSecWasAcquired); return; |