summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_donor_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_donor_service.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp27
1 files changed, 15 insertions, 12 deletions
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 99b6a0af97d..d4c2ade3eb4 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -228,7 +228,8 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_runUntilBlockin
executor, abortToken);
})
.then([this, executor, abortToken] {
- return _awaitAllRecipientsDoneApplying(executor, abortToken);
+ return _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
+ executor, abortToken);
})
.then([this] { _writeTransactionOplogEntryThenTransitionToBlockingWrites(); });
})
@@ -482,14 +483,14 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted);
}
}
+}
- if (coordinatorState >= CoordinatorStateEnum::kBlockingWrites) {
- _critSecWasAcquired.getFuture().wait(opCtx);
- }
+SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionAcquired() {
+ return _critSecWasAcquired.getFuture();
+}
- if (coordinatorState >= CoordinatorStateEnum::kCommitting) {
- _critSecWasPromoted.getFuture().wait(opCtx);
- }
+SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionPromoted() {
+ return _critSecWasPromoted.getFuture();
}
void ReshardingDonorService::DonorStateMachine::
@@ -578,20 +579,22 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
});
}
-ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_awaitAllRecipientsDoneApplying(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
+ _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) {
return ExecutorFuture<void>(**executor, Status::OK());
}
return future_util::withCancellation(_allRecipientsDoneApplying.getFuture(), abortToken)
- .thenRunOn(**executor);
+ .thenRunOn(**executor)
+ .then([this] { _transitionState(DonorStateEnum::kPreparingToBlockWrites); });
}
void ReshardingDonorService::DonorStateMachine::
_writeTransactionOplogEntryThenTransitionToBlockingWrites() {
- if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) {
+ if (_donorCtx.getState() > DonorStateEnum::kPreparingToBlockWrites) {
stdx::lock_guard<Latch> lk(_mutex);
ensureFulfilledPromise(lk, _critSecWasAcquired);
return;