diff options
8 files changed, 78 insertions, 19 deletions
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 75521306201..0ae230e4d90 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -110,6 +110,15 @@ void processReshardingFieldsForDonorCollection(OperationContext* opCtx, ReshardingDonorDocument>( opCtx, reshardingFields.getReshardingUUID())) { donorStateMachine->get()->onReshardingFieldsChanges(opCtx, reshardingFields); + + const auto coordinatorState = reshardingFields.getState(); + if (coordinatorState == CoordinatorStateEnum::kBlockingWrites) { + (*donorStateMachine)->awaitCriticalSectionAcquired().wait(opCtx); + } else if (coordinatorState == CoordinatorStateEnum::kCommitting) { + (*donorStateMachine)->awaitCriticalSectionAcquired().wait(opCtx); + (*donorStateMachine)->awaitCriticalSectionPromoted().wait(opCtx); + } + return; } diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 99b6a0af97d..d4c2ade3eb4 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -228,7 +228,8 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_runUntilBlockin executor, abortToken); }) .then([this, executor, abortToken] { - return _awaitAllRecipientsDoneApplying(executor, abortToken); + return _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites( + executor, abortToken); }) .then([this] { _writeTransactionOplogEntryThenTransitionToBlockingWrites(); }); }) @@ -482,14 +483,14 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted); } } +} - if (coordinatorState >= CoordinatorStateEnum::kBlockingWrites) { - _critSecWasAcquired.getFuture().wait(opCtx); - } +SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionAcquired() { + return _critSecWasAcquired.getFuture(); +} - if (coordinatorState >= CoordinatorStateEnum::kCommitting) { - _critSecWasPromoted.getFuture().wait(opCtx); - } +SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionPromoted() { + return _critSecWasPromoted.getFuture(); } void ReshardingDonorService::DonorStateMachine:: @@ -578,20 +579,22 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: }); } -ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_awaitAllRecipientsDoneApplying( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor, - const CancellationToken& abortToken) { +ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: + _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) { if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) { return ExecutorFuture<void>(**executor, Status::OK()); } return future_util::withCancellation(_allRecipientsDoneApplying.getFuture(), abortToken) - .thenRunOn(**executor); + .thenRunOn(**executor) + .then([this] { _transitionState(DonorStateEnum::kPreparingToBlockWrites); }); } void ReshardingDonorService::DonorStateMachine:: _writeTransactionOplogEntryThenTransitionToBlockingWrites() { - if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) { + if (_donorCtx.getState() > DonorStateEnum::kPreparingToBlockWrites) { stdx::lock_guard<Latch> lk(_mutex); ensureFulfilledPromise(lk, _critSecWasAcquired); return; diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index d08e254abe5..9f50173f29b 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -96,6 +96,10 @@ public: void onReshardingFieldsChanges(OperationContext* opCtx, const TypeCollectionReshardingFields& reshardingFields); + SharedSemiFuture<void> awaitCriticalSectionAcquired(); + + SharedSemiFuture<void> awaitCriticalSectionPromoted(); + SharedSemiFuture<void> awaitFinalOplogEntriesWritten(); /** @@ -154,7 +158,7 @@ private: const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& abortToken); - ExecutorFuture<void> _awaitAllRecipientsDoneApplying( + ExecutorFuture<void> _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const CancellationToken& abortToken); diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp index 8a0e4b17c56..9c28d0f9d46 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -32,6 +32,7 @@ #include "mongo/platform/basic.h" #include <boost/optional/optional_io.hpp> +#include <utility> #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbdirectclient.h" @@ -176,6 +177,13 @@ public: void notifyToStartBlockingWrites(OperationContext* opCtx, DonorStateMachine& donor, const ReshardingDonorDocument& donorDoc) { + notifyToStartBlockingWritesNoWait(opCtx, donor, donorDoc); + ASSERT_OK(donor.awaitCriticalSectionAcquired().waitNoThrow(opCtx)); + } + + void notifyToStartBlockingWritesNoWait(OperationContext* opCtx, + DonorStateMachine& donor, + const ReshardingDonorDocument& donorDoc) { _onReshardingFieldsChanges(opCtx, donor, donorDoc, CoordinatorStateEnum::kBlockingWrites); } @@ -183,6 +191,7 @@ public: DonorStateMachine& donor, const ReshardingDonorDocument& donorDoc) { _onReshardingFieldsChanges(opCtx, donor, donorDoc, CoordinatorStateEnum::kCommitting); + ASSERT_OK(donor.awaitCriticalSectionPromoted().waitNoThrow(opCtx)); } void checkStateDocumentRemoved(OperationContext* opCtx) { @@ -318,9 +327,18 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) { const std::vector<DonorStateEnum> donorStates{DonorStateEnum::kDonatingInitialData, DonorStateEnum::kDonatingOplogEntries, + DonorStateEnum::kPreparingToBlockWrites, DonorStateEnum::kBlockingWrites, DonorStateEnum::kDone}; + const std::vector<std::pair<DonorStateEnum, bool>> donorStateTransitions{ + {DonorStateEnum::kDonatingInitialData, false}, + {DonorStateEnum::kDonatingOplogEntries, false}, + {DonorStateEnum::kPreparingToBlockWrites, false}, + {DonorStateEnum::kBlockingWrites, false}, + {DonorStateEnum::kBlockingWrites, true}, + {DonorStateEnum::kDone, true}}; + for (bool isAlsoRecipient : {false, true}) { LOGV2(5641801, "Running case", @@ -335,7 +353,10 @@ TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) { auto opCtx = makeOperationContext(); auto prevState = DonorStateEnum::kUnused; - for (const auto state : donorStates) { + for (const auto& [state, critSecHeld] : donorStateTransitions) { + // The kBlockingWrite state is interrupted twice so we don't unset the guard until after + // the second time. + bool shouldUnsetPrevState = !(state == DonorStateEnum::kBlockingWrites && critSecHeld); auto donor = [&] { if (prevState == DonorStateEnum::kUnused) { createSourceCollection(opCtx.get(), doc); @@ -351,7 +372,9 @@ TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) { // Allow the transition to prevState to succeed on this primary-only service // instance. - stateTransitionsGuard.unset(prevState); + if (shouldUnsetPrevState) { + stateTransitionsGuard.unset(prevState); + } return *maybeDonor; } }(); @@ -363,8 +386,25 @@ TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) { notifyRecipientsDoneCloning(opCtx.get(), *donor, doc); break; } + case DonorStateEnum::kPreparingToBlockWrites: { + notifyToStartBlockingWritesNoWait(opCtx.get(), *donor, doc); + break; + } case DonorStateEnum::kBlockingWrites: { - notifyToStartBlockingWrites(opCtx.get(), *donor, doc); + // A shard version refresh cannot be triggered once the critical section has + // been acquired. We intentionally test the DonorStateEnum::kBlockingWrites + // transition being triggered two different ways: + // + // - The first transition would wait for the RecoverRefreshThread to + // notify the donor about the CoordinatorStateEnum::kBlockingWrites state. + // + // - The second transition would rely on the donor having already written down + // DonorStateEnum::kPreparingToBlockWrites as a result of the + // RecoverRefreshThread having already been notified the donor about the + // CoordinatorStateEnum::kBlockingWrites state before. + if (!critSecHeld) { + notifyToStartBlockingWrites(opCtx.get(), *donor, doc); + } break; } case DonorStateEnum::kDone: { diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 71c8542268d..e0eed9fdac3 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -507,7 +507,7 @@ void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept { return; invariant(checkState(*_currentOp->donorState, - {DonorStateEnum::kDonatingOplogEntries, + {DonorStateEnum::kPreparingToBlockWrites, DonorStateEnum::kBlockingWrites, DonorStateEnum::kError})); diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp index c90ec1e4c9d..638840361b4 100644 --- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp @@ -253,7 +253,7 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) { // Update metrics for donor const auto kWritesDuringCriticalSection = 7; - getMetrics()->setDonorState(DonorStateEnum::kDonatingOplogEntries); + getMetrics()->setDonorState(DonorStateEnum::kPreparingToBlockWrites); getMetrics()->enterCriticalSection(getGlobalServiceContext()->getFastClockSource()->now()); getMetrics()->onWriteDuringCriticalSection(kWritesDuringCriticalSection); advanceTime(Seconds(elapsedTime)); @@ -436,7 +436,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) { } TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) { - const auto kDonorState = DonorStateEnum::kDonatingOplogEntries; + const auto kDonorState = DonorStateEnum::kPreparingToBlockWrites; startOperation(ReshardingMetrics::Role::kDonor); advanceTime(Seconds(2)); getMetrics()->setDonorState(kDonorState); diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp index 2f26434f6cb..e512fde17f3 100644 --- a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp +++ b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp @@ -67,11 +67,13 @@ template <class StateEnum> void StateTransitionController<StateEnum>::_notifyNewStateAndWaitUntilUnpaused( OperationContext* opCtx, StateEnum newState) { stdx::unique_lock lk(_mutex); + auto guard = makeGuard([this, prevState = _state] { _state = prevState; }); _state = newState; _waitUntilUnpausedCond.notify_all(); opCtx->waitForConditionOrInterrupt(_pauseDuringTransitionCond, lk, [this, newState] { return _pauseDuringTransition.count(newState) == 0; }); + guard.dismiss(); } template <class StateEnum> diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl index 86cf678e49b..7d4086c971f 100644 --- a/src/mongo/s/resharding/common_types.idl +++ b/src/mongo/s/resharding/common_types.idl @@ -59,6 +59,7 @@ enums: kPreparingToDonate: "preparing-to-donate" kDonatingInitialData: "donating-initial-data" kDonatingOplogEntries: "donating-oplog-entries" + kPreparingToBlockWrites: "preparing-to-block-writes" kError: "error" kBlockingWrites: "blocking-writes" kDone: "done" |