summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp9
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp27
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h6
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service_test.cpp46
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_service_test_helpers.cpp2
-rw-r--r--src/mongo/s/resharding/common_types.idl1
8 files changed, 78 insertions, 19 deletions
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
index 75521306201..0ae230e4d90 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -110,6 +110,15 @@ void processReshardingFieldsForDonorCollection(OperationContext* opCtx,
ReshardingDonorDocument>(
opCtx, reshardingFields.getReshardingUUID())) {
donorStateMachine->get()->onReshardingFieldsChanges(opCtx, reshardingFields);
+
+ const auto coordinatorState = reshardingFields.getState();
+ if (coordinatorState == CoordinatorStateEnum::kBlockingWrites) {
+ (*donorStateMachine)->awaitCriticalSectionAcquired().wait(opCtx);
+ } else if (coordinatorState == CoordinatorStateEnum::kCommitting) {
+ (*donorStateMachine)->awaitCriticalSectionAcquired().wait(opCtx);
+ (*donorStateMachine)->awaitCriticalSectionPromoted().wait(opCtx);
+ }
+
return;
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 99b6a0af97d..d4c2ade3eb4 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -228,7 +228,8 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_runUntilBlockin
executor, abortToken);
})
.then([this, executor, abortToken] {
- return _awaitAllRecipientsDoneApplying(executor, abortToken);
+ return _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
+ executor, abortToken);
})
.then([this] { _writeTransactionOplogEntryThenTransitionToBlockingWrites(); });
})
@@ -482,14 +483,14 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted);
}
}
+}
- if (coordinatorState >= CoordinatorStateEnum::kBlockingWrites) {
- _critSecWasAcquired.getFuture().wait(opCtx);
- }
+SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionAcquired() {
+ return _critSecWasAcquired.getFuture();
+}
- if (coordinatorState >= CoordinatorStateEnum::kCommitting) {
- _critSecWasPromoted.getFuture().wait(opCtx);
- }
+SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitCriticalSectionPromoted() {
+ return _critSecWasPromoted.getFuture();
}
void ReshardingDonorService::DonorStateMachine::
@@ -578,20 +579,22 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
});
}
-ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_awaitAllRecipientsDoneApplying(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
- const CancellationToken& abortToken) {
+ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
+ _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) {
return ExecutorFuture<void>(**executor, Status::OK());
}
return future_util::withCancellation(_allRecipientsDoneApplying.getFuture(), abortToken)
- .thenRunOn(**executor);
+ .thenRunOn(**executor)
+ .then([this] { _transitionState(DonorStateEnum::kPreparingToBlockWrites); });
}
void ReshardingDonorService::DonorStateMachine::
_writeTransactionOplogEntryThenTransitionToBlockingWrites() {
- if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) {
+ if (_donorCtx.getState() > DonorStateEnum::kPreparingToBlockWrites) {
stdx::lock_guard<Latch> lk(_mutex);
ensureFulfilledPromise(lk, _critSecWasAcquired);
return;
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index d08e254abe5..9f50173f29b 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -96,6 +96,10 @@ public:
void onReshardingFieldsChanges(OperationContext* opCtx,
const TypeCollectionReshardingFields& reshardingFields);
+ SharedSemiFuture<void> awaitCriticalSectionAcquired();
+
+ SharedSemiFuture<void> awaitCriticalSectionPromoted();
+
SharedSemiFuture<void> awaitFinalOplogEntriesWritten();
/**
@@ -154,7 +158,7 @@ private:
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken);
- ExecutorFuture<void> _awaitAllRecipientsDoneApplying(
+ ExecutorFuture<void> _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken);
diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
index 8a0e4b17c56..9c28d0f9d46 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/platform/basic.h"
#include <boost/optional/optional_io.hpp>
+#include <utility>
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbdirectclient.h"
@@ -176,6 +177,13 @@ public:
void notifyToStartBlockingWrites(OperationContext* opCtx,
DonorStateMachine& donor,
const ReshardingDonorDocument& donorDoc) {
+ notifyToStartBlockingWritesNoWait(opCtx, donor, donorDoc);
+ ASSERT_OK(donor.awaitCriticalSectionAcquired().waitNoThrow(opCtx));
+ }
+
+ void notifyToStartBlockingWritesNoWait(OperationContext* opCtx,
+ DonorStateMachine& donor,
+ const ReshardingDonorDocument& donorDoc) {
_onReshardingFieldsChanges(opCtx, donor, donorDoc, CoordinatorStateEnum::kBlockingWrites);
}
@@ -183,6 +191,7 @@ public:
DonorStateMachine& donor,
const ReshardingDonorDocument& donorDoc) {
_onReshardingFieldsChanges(opCtx, donor, donorDoc, CoordinatorStateEnum::kCommitting);
+ ASSERT_OK(donor.awaitCriticalSectionPromoted().waitNoThrow(opCtx));
}
void checkStateDocumentRemoved(OperationContext* opCtx) {
@@ -318,9 +327,18 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl
TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) {
const std::vector<DonorStateEnum> donorStates{DonorStateEnum::kDonatingInitialData,
DonorStateEnum::kDonatingOplogEntries,
+ DonorStateEnum::kPreparingToBlockWrites,
DonorStateEnum::kBlockingWrites,
DonorStateEnum::kDone};
+ const std::vector<std::pair<DonorStateEnum, bool>> donorStateTransitions{
+ {DonorStateEnum::kDonatingInitialData, false},
+ {DonorStateEnum::kDonatingOplogEntries, false},
+ {DonorStateEnum::kPreparingToBlockWrites, false},
+ {DonorStateEnum::kBlockingWrites, false},
+ {DonorStateEnum::kBlockingWrites, true},
+ {DonorStateEnum::kDone, true}};
+
for (bool isAlsoRecipient : {false, true}) {
LOGV2(5641801,
"Running case",
@@ -335,7 +353,10 @@ TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) {
auto opCtx = makeOperationContext();
auto prevState = DonorStateEnum::kUnused;
- for (const auto state : donorStates) {
+ for (const auto& [state, critSecHeld] : donorStateTransitions) {
+ // The kBlockingWrite state is interrupted twice so we don't unset the guard until after
+ // the second time.
+ bool shouldUnsetPrevState = !(state == DonorStateEnum::kBlockingWrites && critSecHeld);
auto donor = [&] {
if (prevState == DonorStateEnum::kUnused) {
createSourceCollection(opCtx.get(), doc);
@@ -351,7 +372,9 @@ TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) {
// Allow the transition to prevState to succeed on this primary-only service
// instance.
- stateTransitionsGuard.unset(prevState);
+ if (shouldUnsetPrevState) {
+ stateTransitionsGuard.unset(prevState);
+ }
return *maybeDonor;
}
}();
@@ -363,8 +386,25 @@ TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) {
notifyRecipientsDoneCloning(opCtx.get(), *donor, doc);
break;
}
+ case DonorStateEnum::kPreparingToBlockWrites: {
+ notifyToStartBlockingWritesNoWait(opCtx.get(), *donor, doc);
+ break;
+ }
case DonorStateEnum::kBlockingWrites: {
- notifyToStartBlockingWrites(opCtx.get(), *donor, doc);
+ // A shard version refresh cannot be triggered once the critical section has
+ // been acquired. We intentionally test the DonorStateEnum::kBlockingWrites
+ // transition being triggered two different ways:
+ //
+ // - The first transition would wait for the RecoverRefreshThread to
+ // notify the donor about the CoordinatorStateEnum::kBlockingWrites state.
+ //
+ // - The second transition would rely on the donor having already written down
+ // DonorStateEnum::kPreparingToBlockWrites as a result of the
+ // RecoverRefreshThread having already been notified the donor about the
+ // CoordinatorStateEnum::kBlockingWrites state before.
+ if (!critSecHeld) {
+ notifyToStartBlockingWrites(opCtx.get(), *donor, doc);
+ }
break;
}
case DonorStateEnum::kDone: {
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 71c8542268d..e0eed9fdac3 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -507,7 +507,7 @@ void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
return;
invariant(checkState(*_currentOp->donorState,
- {DonorStateEnum::kDonatingOplogEntries,
+ {DonorStateEnum::kPreparingToBlockWrites,
DonorStateEnum::kBlockingWrites,
DonorStateEnum::kError}));
diff --git a/src/mongo/db/s/resharding/resharding_metrics_test.cpp b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
index c90ec1e4c9d..638840361b4 100644
--- a/src/mongo/db/s/resharding/resharding_metrics_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics_test.cpp
@@ -253,7 +253,7 @@ TEST_F(ReshardingMetricsTest, TestDonorAndRecipientMetrics) {
// Update metrics for donor
const auto kWritesDuringCriticalSection = 7;
- getMetrics()->setDonorState(DonorStateEnum::kDonatingOplogEntries);
+ getMetrics()->setDonorState(DonorStateEnum::kPreparingToBlockWrites);
getMetrics()->enterCriticalSection(getGlobalServiceContext()->getFastClockSource()->now());
getMetrics()->onWriteDuringCriticalSection(kWritesDuringCriticalSection);
advanceTime(Seconds(elapsedTime));
@@ -436,7 +436,7 @@ TEST_F(ReshardingMetricsTest, EstimatedRemainingOperationTime) {
}
TEST_F(ReshardingMetricsTest, CurrentOpReportForDonor) {
- const auto kDonorState = DonorStateEnum::kDonatingOplogEntries;
+ const auto kDonorState = DonorStateEnum::kPreparingToBlockWrites;
startOperation(ReshardingMetrics::Role::kDonor);
advanceTime(Seconds(2));
getMetrics()->setDonorState(kDonorState);
diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp
index 2f26434f6cb..e512fde17f3 100644
--- a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp
+++ b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp
@@ -67,11 +67,13 @@ template <class StateEnum>
void StateTransitionController<StateEnum>::_notifyNewStateAndWaitUntilUnpaused(
OperationContext* opCtx, StateEnum newState) {
stdx::unique_lock lk(_mutex);
+ auto guard = makeGuard([this, prevState = _state] { _state = prevState; });
_state = newState;
_waitUntilUnpausedCond.notify_all();
opCtx->waitForConditionOrInterrupt(_pauseDuringTransitionCond, lk, [this, newState] {
return _pauseDuringTransition.count(newState) == 0;
});
+ guard.dismiss();
}
template <class StateEnum>
diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl
index 86cf678e49b..7d4086c971f 100644
--- a/src/mongo/s/resharding/common_types.idl
+++ b/src/mongo/s/resharding/common_types.idl
@@ -59,6 +59,7 @@ enums:
kPreparingToDonate: "preparing-to-donate"
kDonatingInitialData: "donating-initial-data"
kDonatingOplogEntries: "donating-oplog-entries"
+ kPreparingToBlockWrites: "preparing-to-block-writes"
kError: "error"
kBlockingWrites: "blocking-writes"
kDone: "done"