diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2021-03-11 20:52:05 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-08 17:33:32 +0000 |
commit | 60f2db4bd2f1bad10f1c92f3eb0b626eca933477 (patch) | |
tree | 339b11ffe910e0ae7b6301b8ab3338a8646708c1 /src | |
parent | 4b8f97ab352255b4d9cb265f3f5f5665998a6c46 (diff) | |
download | mongo-60f2db4bd2f1bad10f1c92f3eb0b626eca933477.tar.gz |
SERVER-51606 Handle recovery from resharding donors
Diffstat (limited to 'src')
9 files changed, 513 insertions, 269 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp index 2cc932b201e..3722b55eee9 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp @@ -136,41 +136,6 @@ boost::optional<Status> getAbortReasonIfExists( return boost::none; } - -template <class TState, class TParticipant> -bool allParticipantsDoneWithAbortReason(WithLock lk, - TState expectedState, - const std::vector<TParticipant>& participants) { - for (const auto& shard : participants) { - if (!(shard.getMutableState().getState() == expectedState && - shard.getMutableState().getAbortReason().is_initialized())) { - return false; - } - } - return true; -} - -/** - * Fulfills allParticipantsDoneAbortingSp if all participants have reported to the coordinator that - * they have finished aborting locally. - */ -void checkAllParticipantsAborted(WithLock lk, - SharedPromise<void>& allParticipantsDoneAbortingSp, - const ReshardingCoordinatorDocument& updatedStateDoc) { - if (allParticipantsDoneAbortingSp.getFuture().isReady()) { - return; - } - - bool allDonorsAborted = allParticipantsDoneWithAbortReason( - lk, DonorStateEnum::kDone, updatedStateDoc.getDonorShards()); - bool allRecipientsAborted = allParticipantsDoneWithAbortReason( - lk, RecipientStateEnum::kDone, updatedStateDoc.getRecipientShards()); - - if (allDonorsAborted && allRecipientsAborted) { - allParticipantsDoneAbortingSp.emplaceValue(); - } -} - } // namespace ReshardingCoordinatorObserver::ReshardingCoordinatorObserver() = default; @@ -181,18 +146,16 @@ ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() { invariant(_allRecipientsFinishedCloning.getFuture().isReady()); invariant(_allRecipientsFinishedApplying.getFuture().isReady()); invariant(_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady()); - invariant(_allRecipientsRenamedCollection.getFuture().isReady()); - invariant(_allDonorsDroppedOriginalCollection.getFuture().isReady()); + invariant(_allRecipientsDone.getFuture().isReady()); + invariant(_allDonorsDone.getFuture().isReady()); } void ReshardingCoordinatorObserver::onReshardingParticipantTransition( const ReshardingCoordinatorDocument& updatedStateDoc) { stdx::lock_guard<Latch> lk(_mutex); - if (auto abortReason = getAbortReasonIfExists(updatedStateDoc)) { _onAbortOrStepdown(lk, abortReason.get()); - checkAllParticipantsAborted(lk, _allParticipantsDoneAborting, updatedStateDoc); - return; + // Don't exit early since the coordinator waits for all participants to report state 'done'. } if (!stateTransistionsComplete(lk, @@ -222,12 +185,11 @@ void ReshardingCoordinatorObserver::onReshardingParticipantTransition( } if (!stateTransistionsComplete( - lk, _allRecipientsRenamedCollection, RecipientStateEnum::kDone, updatedStateDoc)) { + lk, _allRecipientsDone, RecipientStateEnum::kDone, updatedStateDoc)) { return; } - if (!stateTransistionsComplete( - lk, _allDonorsDroppedOriginalCollection, DonorStateEnum::kDone, updatedStateDoc)) { + if (!stateTransistionsComplete(lk, _allDonorsDone, DonorStateEnum::kDone, updatedStateDoc)) { return; } } @@ -253,25 +215,25 @@ ReshardingCoordinatorObserver::awaitAllRecipientsInStrictConsistency() { } SharedSemiFuture<ReshardingCoordinatorDocument> -ReshardingCoordinatorObserver::awaitAllDonorsDroppedOriginalCollection() { - return _allDonorsDroppedOriginalCollection.getFuture(); +ReshardingCoordinatorObserver::awaitAllDonorsDone() { + return _allDonorsDone.getFuture(); } SharedSemiFuture<ReshardingCoordinatorDocument> -ReshardingCoordinatorObserver::awaitAllRecipientsRenamedCollection() { - return _allRecipientsRenamedCollection.getFuture(); -} - -SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllParticipantsDoneAborting() { - return _allParticipantsDoneAborting.getFuture(); +ReshardingCoordinatorObserver::awaitAllRecipientsDone() { + return _allRecipientsDone.getFuture(); } void ReshardingCoordinatorObserver::interrupt(Status status) { stdx::lock_guard<Latch> lk(_mutex); _onAbortOrStepdown(lk, status); - if (!_allParticipantsDoneAborting.getFuture().isReady()) { - _allParticipantsDoneAborting.setError(status); + if (!_allRecipientsDone.getFuture().isReady()) { + _allRecipientsDone.setError(status); + } + + if (!_allDonorsDone.getFuture().isReady()) { + _allDonorsDone.setError(status); } } @@ -300,14 +262,6 @@ void ReshardingCoordinatorObserver::_onAbortOrStepdown(WithLock, Status status) if (!_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady()) { _allRecipientsReportedStrictConsistencyTimestamp.setError(status); } - - if (!_allRecipientsRenamedCollection.getFuture().isReady()) { - _allRecipientsRenamedCollection.setError(status); - } - - if (!_allDonorsDroppedOriginalCollection.getFuture().isReady()) { - _allDonorsDroppedOriginalCollection.setError(status); - } } } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h index b0c6eedfe95..5860945df66 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h @@ -92,22 +92,16 @@ public: SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllRecipientsInStrictConsistency(); /** - * Fulfills the '_allRecipientsRenamedCollection' promise when the last recipient writes + * Fulfills the '_allRecipientsDone' promise when the last recipient writes * that it is in 'done' state. */ - SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllRecipientsRenamedCollection(); + SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllRecipientsDone(); /** - * Fulfills the '_allDonorsDroppedOriginalCollection' promise when the last donor writes that it + * Fulfills the '_allDonorsDone' promise when the last donor writes that it * is in 'done' state. */ - SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllDonorsDroppedOriginalCollection(); - - /** - * Fulfills the '_allParticipantsDoneAborting' promise when the last recipient or donor writes - * that it is in 'kDone' with an abortReason. - */ - SharedSemiFuture<void> awaitAllParticipantsDoneAborting(); + SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllDonorsDone(); /** * Checks if all recipients are in steady state. Otherwise, sets an error state so that @@ -142,11 +136,8 @@ private: * {_allRecipientsFinishedCloning, RecipientStateEnum::kApplying} * {_allRecipientsFinishedApplying, RecipientStateEnum::kSteadyState} * {_allRecipientsReportedStrictConsistencyTimestamp, RecipientStateEnum::kStrictConsistency} - * {_allRecipientsRenamedCollection, RecipientStateEnum::kDone} - * {_allDonorsDroppedOriginalCollection, DonorStateEnum::kDone} - * {_allParticipantsDoneAborting, - * DonorStateEnum::kDone with abortReason AND - * RecipientStateEnum::kDone with abortReason} + * {_allRecipientsDone, RecipientStateEnum::kDone} + * {_allDonorsDone, DonorStateEnum::kDone} */ SharedPromise<ReshardingCoordinatorDocument> _allDonorsReportedMinFetchTimestamp; @@ -157,11 +148,9 @@ private: SharedPromise<ReshardingCoordinatorDocument> _allRecipientsReportedStrictConsistencyTimestamp; - SharedPromise<ReshardingCoordinatorDocument> _allRecipientsRenamedCollection; - - SharedPromise<ReshardingCoordinatorDocument> _allDonorsDroppedOriginalCollection; + SharedPromise<ReshardingCoordinatorDocument> _allRecipientsDone; - SharedPromise<void> _allParticipantsDoneAborting; + SharedPromise<ReshardingCoordinatorDocument> _allDonorsDone; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp index ffb44e7abdd..a9f5fc29524 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp @@ -59,15 +59,18 @@ protected: DonorStateEnum donorState, boost::optional<Timestamp> timestamp = boost::none, boost::optional<Status> abortReason = boost::none) { - return {makeDonorShard(ShardId{"s1"}, donorState, timestamp, abortReason), + // The mock state here is simulating only one donor shard having errored locally. + return {makeDonorShard(ShardId{"s1"}, donorState, timestamp), makeDonorShard(ShardId{"s2"}, donorState, timestamp, abortReason), - makeDonorShard(ShardId{"s3"}, donorState, timestamp, abortReason)}; + makeDonorShard(ShardId{"s3"}, donorState, timestamp)}; } std::vector<RecipientShardEntry> makeMockRecipientsInState( RecipientStateEnum recipientState, boost::optional<Timestamp> timestamp = boost::none, boost::optional<Status> abortReason = boost::none) { + // TODO SERVER-55511: Make the mock state here simulate only one recipient shard errored + // locally. return {makeRecipientShard(ShardId{"s1"}, recipientState, abortReason), makeRecipientShard(ShardId{"s2"}, recipientState, abortReason), makeRecipientShard(ShardId{"s3"}, recipientState, abortReason)}; @@ -160,27 +163,30 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) { TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) { auto reshardingObserver = std::make_shared<ReshardingCoordinatorObserver>(); - auto fut = reshardingObserver->awaitAllParticipantsDoneAborting(); + auto fut = reshardingObserver->awaitAllDonorsDone(); ASSERT_FALSE(fut.isReady()); auto abortReason = Status{ErrorCodes::InternalError, "We gotta abort"}; - // All participants have an abortReason, but not all are in state kDone yet. - auto donorShards = makeMockDonorsInState(DonorStateEnum::kDone, Timestamp(1, 1), abortReason); - std::vector<RecipientShardEntry> recipientShards0{ - {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kError, abortReason)}, - {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kDone, abortReason)}, - {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kDone, abortReason)}}; + // All recipients and donors are done (including the donor who caused the abort) except a single + // donor who hasn't seen there was an error yet. + auto recipientShards = makeMockRecipientsInState(RecipientStateEnum::kDone, Timestamp(1, 1)); + std::vector<DonorShardEntry> donorShards0{ + {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)}, + {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDonatingOplogEntries, Timestamp(1, 1))}, + {makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}}; auto coordinatorDoc0 = - makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards, abortReason); + makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards0, abortReason); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0); ASSERT_FALSE(fut.isReady()); - // All participants in state kDone with abortReason. - auto recipientShards1 = - makeMockRecipientsInState(RecipientStateEnum::kDone, boost::none, abortReason); + // All participants are done. + std::vector<DonorShardEntry> donorShards1{ + {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)}, + {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDone, Timestamp(1, 1))}, + {makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}}; auto coordinatorDoc1 = - makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards, abortReason); + makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards1, abortReason); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1); ASSERT_TRUE(fut.isReady()); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 3a323652fe8..35df4d6b38b 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -993,7 +993,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishRe // The shared_ptr maintaining the ReshardingCoordinatorService Instance object gets // deleted from the PrimaryOnlyService's map. Thus, shared_from_this() is necessary to // keep 'this' pointer alive for the remaining callbacks. - return _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(executor); + return _awaitAllParticipantShardsDone(executor); }) .onError([this, self = shared_from_this(), executor](Status status) { { @@ -1088,9 +1088,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_onAbort( // Wait for all participants to acknowledge the operation reached an unrecoverable // error. - future_util::withCancellation( - _reshardingCoordinatorObserver->awaitAllParticipantsDoneAborting(), - _ctHolder->getStepdownToken()) + future_util::withCancellation(_awaitAllParticipantShardsDone(executor), + _ctHolder->getStepdownToken()) .get(); } @@ -1368,20 +1367,18 @@ Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisi return Status::OK(); }; -ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator:: - _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { +ExecutorFuture<void> +ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsDone( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { if (_coordinatorDoc.getState() > CoordinatorStateEnum::kDecisionPersisted) { return ExecutorFuture<void>(**executor, Status::OK()); } std::vector<ExecutorFuture<ReshardingCoordinatorDocument>> futures; futures.emplace_back( - _reshardingCoordinatorObserver->awaitAllRecipientsRenamedCollection().thenRunOn( - **executor)); + _reshardingCoordinatorObserver->awaitAllRecipientsDone().thenRunOn(**executor)); futures.emplace_back( - _reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection().thenRunOn( - **executor)); + _reshardingCoordinatorObserver->awaitAllDonorsDone().thenRunOn(**executor)); // We only allow the stepdown token to cancel operations after progressing past // kDecisionPersisted. diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index 6a675fe9121..9f3df91fc42 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -313,13 +313,13 @@ private: /** * Waits on _reshardingCoordinatorObserver to notify that: * 1. All recipient shards have renamed the temporary collection to the original collection - * namespace, and + * namespace or have finished aborting, and * 2. All donor shards that were not also recipient shards have dropped the original - * collection. + * collection or have finished aborting. * * Transitions to 'kDone'. */ - ExecutorFuture<void> _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection( + ExecutorFuture<void> _awaitAllParticipantShardsDone( const std::shared_ptr<executor::ScopedTaskExecutor>& executor); /** diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index e7d3b6d3ed0..62ebb7305d8 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -40,6 +40,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/db/op_observer.h" +#include "mongo/db/ops/delete.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/wait_for_majority_service.h" @@ -51,10 +52,12 @@ #include "mongo/logv2/log.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/grid.h" +#include "mongo/util/future_util.h" namespace mongo { MONGO_FAIL_POINT_DEFINE(reshardingDonorFailsBeforePreparingToMirror); +MONGO_FAIL_POINT_DEFINE(removeDonorDocFailpoint); using namespace fmt::literals; @@ -99,6 +102,18 @@ Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss, } /** + * Returns whether it is possible for the donor to be in 'state' when resharding will indefinitely + * abort. + */ +bool inPotentialAbortScenario(const DonorStateEnum& state) { + // Regardless of whether resharding will abort or commit, the donor will eventually reach state + // kDone. + // Additionally, if the donor is in state kError, it is guaranteed that the coordinator will + // eventually begin the abort process. + return state == DonorStateEnum::kError || state == DonorStateEnum::kDone; +} + +/** * Fulfills the promise if it is not already. Otherwise, does nothing. */ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) { @@ -113,6 +128,34 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp, Status error) } } +/** + * Automatically retries the callable until there is an error encountered that resharding cannot + * recover from or the cancelToken is canceled. + */ +template <typename Callable> +ExecutorFuture<void> withAutomaticRetry(std::shared_ptr<executor::TaskExecutor> executor, + CancellationToken cancelToken, + Callable&& callable) { + return AsyncTry<Callable>(std::move(callable)) + .until([cancelToken](Status status) { + if (status.isA<ErrorCategory::RetriableError>() || + status.isA<ErrorCategory::CursorInvalidatedError>() || + status == ErrorCodes::Interrupted || + status.isA<ErrorCategory::CancellationError>() || + status.isA<ErrorCategory::NotPrimaryError>()) { + // Retry on errors from stray killCursors and killOp commands being run. Also retry + // for notPrimary and cancellation errors to ensure the loop is not prematurely + // canceled if the errors originate from a remote shard instead of this shard - if + // there is a failover/stepdown, the cancelToken will eventually be canceled and + // bypass this .until() block altogether. + return false; + } + + return true; + }) + .on(std::move(executor), cancelToken); +} + class ExternalStateImpl : public ReshardingDonorService::DonorStateMachineExternalState { public: ShardId myShardId(ServiceContext* serviceContext) const override { @@ -168,96 +211,149 @@ ReshardingDonorService::DonorStateMachine::DonorStateMachine( ReshardingDonorService::DonorStateMachine::~DonorStateMachine() { stdx::lock_guard<Latch> lg(_mutex); - invariant(_allRecipientsDoneCloning.getFuture().isReady()); - invariant(_allRecipientsDoneApplying.getFuture().isReady()); - invariant(_coordinatorHasDecisionPersisted.getFuture().isReady()); invariant(_completionPromise.getFuture().isReady()); } -SemiFuture<void> ReshardingDonorService::DonorStateMachine::run( - std::shared_ptr<executor::ScopedTaskExecutor> executor, - const CancellationToken& token) noexcept { - return ExecutorFuture<void>(**executor) - .then( - [this] { _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData(); }) - .then([this, executor] { - return _awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries(executor); - }) - .then([this, executor] { - return _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(executor); - }) - .then([this] { _writeTransactionOplogEntryThenTransitionToBlockingWrites(); }) - .then([this, executor] { - return _awaitCoordinatorHasDecisionPersistedThenTransitionToDropping(executor); - }) - .then([this] { _dropOriginalCollection(); }) - .then([this, executor] { - auto opCtx = cc().makeOperationContext(); - return _updateCoordinator(opCtx.get(), executor); - }) - .onError([this, executor](Status status) { - Status error = status; - { - stdx::lock_guard<Latch> lk(_mutex); - if (_abortStatus) - error = *_abortStatus; +ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_runUntilBlockingWritesOrErrored( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) noexcept { + return withAutomaticRetry( + **executor, + abortToken, + [this, executor, abortToken] { + return ExecutorFuture(**executor) + .then([this] { + _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData(); + }) + .then([this, executor, abortToken] { + return _awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries( + executor, abortToken); + }) + .then([this, executor, abortToken] { + return _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites( + executor, abortToken); + }) + .then( + [this] { _writeTransactionOplogEntryThenTransitionToBlockingWrites(); }); + }) + .onError([this, executor, abortToken](Status status) { + if (abortToken.isCanceled()) { + return ExecutorFuture<void>(**executor, status); } LOGV2(4956400, "Resharding operation donor state machine failed", "namespace"_attr = _metadata.getSourceNss(), "reshardingUUID"_attr = _metadata.getReshardingUUID(), - "error"_attr = error); + "error"_attr = status); + + return withAutomaticRetry(**executor, abortToken, [this, status] { + // It is illegal to transition into kError if state has already surpassed + // kPreparingToBlockWrites. + invariant(_donorCtx.getState() < DonorStateEnum::kBlockingWrites); + _transitionToError(status); + + // Intentionally swallow the error - by transitioning to kError, the donor + // effectively recovers from encountering the error and should continue running in + // the future chain. + }); + }); +} - _transitionToError(error); - auto opCtx = cc().makeOperationContext(); - return _updateCoordinator(opCtx.get(), executor) - .then([this] { - // TODO SERVER-52838: Ensure all local collections that may have been created - // for resharding are removed, with the exception of the - // ReshardingDonorDocument, before transitioning to kDone. - _transitionState(DonorStateEnum::kDone); - }) - .then([this, executor] { - auto opCtx = cc().makeOperationContext(); - return _updateCoordinator(opCtx.get(), executor); - }) - .then([this, error] { return error; }); - }) - .onCompletion([this, self = shared_from_this()](Status status) { +ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_notifyCoordinatorAndAwaitDecision( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) noexcept { + if (_donorCtx.getState() == DonorStateEnum::kDone) { + return ExecutorFuture(**executor); + } + + return withAutomaticRetry(**executor, + abortToken, + [this, executor] { + auto opCtx = cc().makeOperationContext(); + return _updateCoordinator(opCtx.get(), executor); + }) + .then([this, abortToken] { + return future_util::withCancellation(_coordinatorHasDecisionPersisted.getFuture(), + abortToken); + }); +} + +ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_finishReshardingOperation( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& stepdownToken, + bool aborted) noexcept { + return withAutomaticRetry(**executor, stepdownToken, [this, executor, stepdownToken, aborted] { + if (!aborted) { + // If a failover occured after the donor transitioned to done locally, but before it + // notified the coordinator, it will already be in state done here. Otherwise, it must + // be in blocking-writes before transitioning to done. + invariant(_donorCtx.getState() == DonorStateEnum::kBlockingWrites || + _donorCtx.getState() == DonorStateEnum::kDone); + + _dropOriginalCollectionThenTransitionToDone(); + } else if (_donorCtx.getState() != DonorStateEnum::kDone) { + // If aborted, the donor must be allowed to transition to done from any state. + _transitionState(DonorStateEnum::kDone); + } + + auto opCtx = cc().makeOperationContext(); + return _updateCoordinator(opCtx.get(), executor).then([this] { { - stdx::lock_guard<Latch> lg(_mutex); - if (_completionPromise.getFuture().isReady()) { - // interrupt() was called before we got here. - return; - } + auto opCtx = cc().makeOperationContext(); + removeDonorDocFailpoint.pauseWhileSet(opCtx.get()); } + _removeDonorDocument(); + }); + }); +} - if (status.isOK()) { - // The shared_ptr stored in the PrimaryOnlyService's map for the - // ReshardingDonorService Instance is removed when the donor state document tied to - // the instance is deleted. It is necessary to use shared_from_this() to extend the - // lifetime so the code can safely finish executing. - _removeDonorDocument(); - stdx::lock_guard<Latch> lg(_mutex); - if (!_completionPromise.getFuture().isReady()) { - _completionPromise.emplaceValue(); - } - } else { - stdx::lock_guard<Latch> lg(_mutex); - if (!_completionPromise.getFuture().isReady()) { - _completionPromise.setError(status); - } +SemiFuture<void> ReshardingDonorService::DonorStateMachine::run( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& stepdownToken) noexcept { + auto abortToken = _initAbortSource(stepdownToken); + + return _runUntilBlockingWritesOrErrored(executor, abortToken) + .then([this, executor, abortToken] { + return _notifyCoordinatorAndAwaitDecision(executor, abortToken); + }) + .onCompletion([executor, stepdownToken, abortToken](Status status) { + if (stepdownToken.isCanceled()) { + // Propagate any errors from the donor stepping down. + return ExecutorFuture<bool>(**executor, status); } + + if (!status.isOK() && !abortToken.isCanceled()) { + // Propagate any errors from the donor failing to notify the coordinator. + return ExecutorFuture<bool>(**executor, status); + } + + return ExecutorFuture(**executor, abortToken.isCanceled()); }) + .then([this, executor, stepdownToken](bool aborted) { + return _finishReshardingOperation(executor, stepdownToken, aborted); + }) + .onError([this, stepdownToken](Status status) { + if (stepdownToken.isCanceled()) { + // The operation will continue on a new DonorStateMachine. + return status; + } + + LOGV2_FATAL(5160600, + "Unrecoverable error occurred past the point donor was prepared to " + "complete the resharding operation", + "error"_attr = redact(status)); + }) + // The shared_ptr stored in the PrimaryOnlyService's map for the ReshardingDonorService + // Instance is removed when the donor state document tied to the instance is deleted. It is + // necessary to use shared_from_this() to extend the lifetime so the all earlier code can + // safely finish executing. + .onCompletion([anchor = shared_from_this()](Status status) { return status; }) .semi(); } void ReshardingDonorService::DonorStateMachine::interrupt(Status status) { - // Resolve any unresolved promises to avoid hanging. stdx::lock_guard<Latch> lk(_mutex); - _abortStatus.emplace(status); - _onAbortOrStepdown(lk, status); if (!_completionPromise.getFuture().isReady()) { _completionPromise.setError(status); } @@ -276,15 +372,13 @@ boost::optional<BSONObj> ReshardingDonorService::DonorStateMachine::reportForCur void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( OperationContext* opCtx, const TypeCollectionReshardingFields& reshardingFields) { - stdx::lock_guard<Latch> lk(_mutex); if (reshardingFields.getAbortReason()) { - auto status = getStatusFromAbortReason(reshardingFields); - _abortStatus.emplace(status); - _onAbortOrStepdown(lk, status); - _critSec.reset(); + auto abortReason = getStatusFromAbortReason(reshardingFields); + _onAbortEncountered(abortReason); return; } + stdx::lock_guard<Latch> lk(_mutex); auto coordinatorState = reshardingFields.getState(); if (coordinatorState >= CoordinatorStateEnum::kApplying) { ensureFulfilledPromise(lk, _allRecipientsDoneCloning); @@ -308,9 +402,14 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( void ReshardingDonorService::DonorStateMachine:: _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData() { if (_donorCtx.getState() > DonorStateEnum::kPreparingToDonate) { - invariant(_donorCtx.getMinFetchTimestamp()); - invariant(_donorCtx.getBytesToClone()); - invariant(_donorCtx.getDocumentsToClone()); + if (!inPotentialAbortScenario(_donorCtx.getState())) { + // The invariants won't hold if an unrecoverable error is encountered before the donor + // makes enough progress to transition to kDonatingInitialData and then a failover + // occurs. + invariant(_donorCtx.getMinFetchTimestamp()); + invariant(_donorCtx.getBytesToClone()); + invariant(_donorCtx.getDocumentsToClone()); + } return; } @@ -360,14 +459,17 @@ void ReshardingDonorService::DonorStateMachine:: ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: _awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) { if (_donorCtx.getState() > DonorStateEnum::kDonatingInitialData) { return ExecutorFuture<void>(**executor, Status::OK()); } auto opCtx = cc().makeOperationContext(); return _updateCoordinator(opCtx.get(), executor) - .then([this] { return _allRecipientsDoneCloning.getFuture(); }) + .then([this, abortToken] { + return future_util::withCancellation(_allRecipientsDoneCloning.getFuture(), abortToken); + }) .thenRunOn(**executor) .then([this]() { _transitionState(DonorStateEnum::kDonatingOplogEntries); }) .onCompletion([=](Status s) { @@ -379,14 +481,15 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) { if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) { return ExecutorFuture<void>(**executor, Status::OK()); } - return _allRecipientsDoneApplying.getFuture().thenRunOn(**executor).then([this]() { - _transitionState(DonorStateEnum::kPreparingToBlockWrites); - }); + return future_util::withCancellation(_allRecipientsDoneApplying.getFuture(), abortToken) + .thenRunOn(**executor) + .then([this]() { _transitionState(DonorStateEnum::kPreparingToBlockWrites); }); } void ReshardingDonorService::DonorStateMachine:: @@ -468,20 +571,8 @@ SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitFinalOplo return _finalOplogEntriesWritten.getFuture(); } -ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: - _awaitCoordinatorHasDecisionPersistedThenTransitionToDropping( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { +void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenTransitionToDone() { if (_donorCtx.getState() > DonorStateEnum::kBlockingWrites) { - return ExecutorFuture<void>(**executor, Status::OK()); - } - - return _coordinatorHasDecisionPersisted.getFuture().thenRunOn(**executor).then([this]() { - _transitionState(DonorStateEnum::kDropping); - }); -} - -void ReshardingDonorService::DonorStateMachine::_dropOriginalCollection() { - if (_donorCtx.getState() > DonorStateEnum::kDropping) { return; } @@ -557,10 +648,12 @@ BSONObj ReshardingDonorService::DonorStateMachine::_makeQueryForCoordinatorUpdat {DonorStateEnum::kDonatingInitialData, {DonorStateEnum::kUnused}}, {DonorStateEnum::kError, {DonorStateEnum::kUnused, DonorStateEnum::kDonatingInitialData}}, + {DonorStateEnum::kBlockingWrites, {DonorStateEnum::kDonatingInitialData}}, {DonorStateEnum::kDone, {DonorStateEnum::kUnused, DonorStateEnum::kDonatingInitialData, - DonorStateEnum::kError}}, + DonorStateEnum::kError, + DonorStateEnum::kBlockingWrites}}, }; auto it = validPreviousStateMap.find(newState); @@ -651,30 +744,80 @@ void ReshardingDonorService::DonorStateMachine::_updateDonorDocument( void ReshardingDonorService::DonorStateMachine::_removeDonorDocument() { auto opCtx = cc().makeOperationContext(); - PersistentTaskStore<ReshardingDonorDocument> store( - NamespaceString::kDonorReshardingOperationsNamespace); - store.remove( - opCtx.get(), - BSON(ReshardingDonorDocument::kReshardingUUIDFieldName << _metadata.getReshardingUUID()), - kNoWaitWriteConcern); + + const auto& nss = NamespaceString::kDonorReshardingOperationsNamespace; + writeConflictRetry(opCtx.get(), "DonorStateMachine::_removeDonorDocument", nss.toString(), [&] { + AutoGetCollection coll(opCtx.get(), nss, MODE_IX); + + if (!coll) { + return; + } + + WriteUnitOfWork wuow(opCtx.get()); + + opCtx->recoveryUnit()->onCommit([this](boost::optional<Timestamp> unusedCommitTime) { + stdx::lock_guard<Latch> lk(_mutex); + if (_abortReason) { + ensureFulfilledPromise(lk, _completionPromise, _abortReason.get()); + } else { + ensureFulfilledPromise(lk, _completionPromise); + } + }); + + deleteObjects(opCtx.get(), + *coll, + nss, + BSON(ReshardingDonorDocument::kReshardingUUIDFieldName + << _metadata.getReshardingUUID()), + true /* justOne */); + + wuow.commit(); + }); } -void ReshardingDonorService::DonorStateMachine::_onAbortOrStepdown(WithLock, Status status) { - if (!_allRecipientsDoneCloning.getFuture().isReady()) { - _allRecipientsDoneCloning.setError(status); +CancellationToken ReshardingDonorService::DonorStateMachine::_initAbortSource( + const CancellationToken& stepdownToken) { + { + stdx::lock_guard<Latch> lk(_mutex); + _abortSource = CancellationSource(stepdownToken); } - if (!_allRecipientsDoneApplying.getFuture().isReady()) { - _allRecipientsDoneApplying.setError(status); + if (auto future = _coordinatorHasDecisionPersisted.getFuture(); future.isReady()) { + if (auto status = future.getNoThrow(); !status.isOK()) { + // onReshardingFieldsChanges() missed canceling _abortSource because _initAbortSource() + // hadn't been called yet. We used an error status stored in + // _coordinatorHasDecisionPersisted as an indication that an abort had been received. + // Canceling _abortSource immediately allows callers to use the returned abortToken as a + // definitive means of checking whether the operation has been aborted. + _abortSource->cancel(); + } } - if (!_finalOplogEntriesWritten.getFuture().isReady()) { - _finalOplogEntriesWritten.setError(status); - } + return _abortSource->token(); +} - if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) { - _coordinatorHasDecisionPersisted.setError(status); +void ReshardingDonorService::DonorStateMachine::_onAbortEncountered(const Status& abortReason) { + auto abortSource = [&]() -> boost::optional<CancellationSource> { + stdx::lock_guard<Latch> lk(_mutex); + _abortReason = abortReason; + invariant(!_abortReason->isOK()); + + if (_abortSource) { + return _abortSource; + } else { + // run() hasn't been called, notify the operation should be aborted by setting an + // error. + invariant(!_coordinatorHasDecisionPersisted.getFuture().isReady()); + _coordinatorHasDecisionPersisted.setError(_abortReason.get()); + return boost::none; + } + }(); + + if (abortSource) { + abortSource->cancel(); } + + _critSec.reset(); } } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 50d5a457faa..6d6ddd44c00 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -79,7 +79,7 @@ public: ~DonorStateMachine(); SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor, - const CancellationToken& token) noexcept override; + const CancellationToken& stepdownToken) noexcept override; void interrupt(Status status) override; @@ -107,25 +107,49 @@ private: DonorStateMachine(const ReshardingDonorDocument& donorDoc, std::unique_ptr<DonorStateMachineExternalState> externalState); + /** + * Runs up until the donor is either in state kBlockingWrites or encountered an error. + */ + ExecutorFuture<void> _runUntilBlockingWritesOrErrored( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) noexcept; + + /** + * Notifies the coordinator if the donor is in kBlockingWrites or kError and waits for + * _coordinatorHasDecisionPersisted to be fulfilled (success) or for the abortToken to be + * canceled (failure or stepdown). + */ + ExecutorFuture<void> _notifyCoordinatorAndAwaitDecision( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken) noexcept; + + /** + * Finishes the work left remaining on the donor after the coordinator persists its decision to + * abort or complete resharding. + */ + ExecutorFuture<void> _finishReshardingOperation( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& stepdownToken, + bool aborted) noexcept; + // The following functions correspond to the actions to take at a particular donor state. void _transitionToPreparingToDonate(); void _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData(); ExecutorFuture<void> _awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken); ExecutorFuture<void> _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const CancellationToken& abortToken); void _writeTransactionOplogEntryThenTransitionToBlockingWrites(); - ExecutorFuture<void> _awaitCoordinatorHasDecisionPersistedThenTransitionToDropping( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); - // Drops the original collection and throws if the returned status is not either Status::OK() // or NamespaceNotFound. - void _dropOriginalCollection(); + void _dropOriginalCollectionThenTransitionToDone(); // Transitions the on-disk and in-memory state to 'newState'. void _transitionState(DonorStateEnum newState); @@ -151,9 +175,15 @@ private: // Removes the local donor document from disk. void _removeDonorDocument(); - // Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors - // (abort resharding). - void _onAbortOrStepdown(WithLock lk, Status status); + // Initializes the _abortSource and generates a token from it to return back the caller. If an + // abort was reported prior to the initialization, automatically cancels the _abortSource before + // returning the token. + // + // Should only be called once per lifetime. + CancellationToken _initAbortSource(const CancellationToken& stepdownToken); + + // Initiates the cancellation of the resharding operation. + void _onAbortEncountered(const Status& abortReason); // The in-memory representation of the immutable portion of the document in // config.localReshardingOperations.donor. @@ -169,13 +199,20 @@ private: // Protects the state below Mutex _mutex = MONGO_MAKE_LATCH("DonorStateMachine::_mutex"); - // Contains the status with which the operation was aborted. - boost::optional<Status> _abortStatus; + // Canceled by 2 different sources: (1) This DonorStateMachine when it learns of an + // unrecoverable error (2) The primary-only service instance driving this DonorStateMachine that + // cancels the parent CancellationSource upon stepdown/failover. + boost::optional<CancellationSource> _abortSource; + + // Holds the unrecoverable error reported by the coordinator that caused the entire resharding + // operation to fail. + boost::optional<Status> _abortReason; boost::optional<ReshardingCriticalSection> _critSec; // Each promise below corresponds to a state on the donor state machine. They are listed in - // ascending order, such that the first promise below will be the first promise fulfilled. + // ascending order, such that the first promise below will be the first promise fulfilled - + // fulfillment order is not necessarily maintained if the operation gets aborted. SharedPromise<void> _allRecipientsDoneCloning; SharedPromise<void> _allRecipientsDoneApplying; diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp index d128f581a8c..ae24821b064 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -55,7 +55,7 @@ namespace mongo { namespace { class OpObserverForTest; -class PauseDuringStateTransition; +class PauseDuringStateTransitions; class DonorStateTransitionController { public: @@ -68,7 +68,7 @@ public: private: friend OpObserverForTest; - friend PauseDuringStateTransition; + friend PauseDuringStateTransitions; void setPauseDuringTransition(DonorStateEnum state) { stdx::lock_guard lk(_mutex); @@ -97,24 +97,33 @@ private: DonorStateEnum _state = DonorStateEnum::kUnused; }; -class PauseDuringStateTransition { +class PauseDuringStateTransitions { public: - PauseDuringStateTransition(DonorStateTransitionController* controller, DonorStateEnum state) - : _controller{controller}, _state{state} { - _controller->setPauseDuringTransition(_state); + PauseDuringStateTransitions(DonorStateTransitionController* controller, + std::vector<DonorStateEnum> states) + : _controller{controller}, _states{std::move(states)} { + for (auto state : _states) { + _controller->setPauseDuringTransition(state); + } + } + + ~PauseDuringStateTransitions() { + for (auto state : _states) { + _controller->unsetPauseDuringTransition(state); + } } - ~PauseDuringStateTransition() { - _controller->unsetPauseDuringTransition(_state); + void wait(DonorStateEnum state) { + _controller->waitUntilStateIsReached(state); } - void wait() { - _controller->waitUntilStateIsReached(_state); + void unset(DonorStateEnum state) { + _controller->unsetPauseDuringTransition(state); } private: DonorStateTransitionController* const _controller; - const DonorStateEnum _state; + const std::vector<DonorStateEnum> _states; }; class OpObserverForTest : public OpObserverNoop { @@ -185,6 +194,11 @@ public: _opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>(_controller)); } + void stepUp() { + auto opCtx = cc().makeOperationContext(); + PrimaryOnlyServiceMongoDTest::stepUp(opCtx.get()); + } + DonorStateTransitionController* controller() { return _controller.get(); } @@ -210,6 +224,15 @@ public: return doc; } + void createOriginalCollection(OperationContext* opCtx, + const ReshardingDonorDocument& donorDoc) { + CollectionOptions options; + options.uuid = donorDoc.getSourceUUID(); + OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( + opCtx); + resharding::data_copy::ensureCollectionExists(opCtx, donorDoc.getSourceNss(), options); + } + void notifyRecipientsDoneCloning(OperationContext* opCtx, DonorStateMachine& donor, const ReshardingDonorDocument& donorDoc) { @@ -268,15 +291,15 @@ TEST_F(ReshardingDonorServiceTest, CanTransitionThroughEachStateToCompletion) { } TEST_F(ReshardingDonorServiceTest, WritesNoOpOplogEntryToGenerateMinFetchTimestamp) { - boost::optional<PauseDuringStateTransition> donatingInitialDataTransitionGuard = - PauseDuringStateTransition{controller(), DonorStateEnum::kDonatingInitialData}; + boost::optional<PauseDuringStateTransitions> donatingInitialDataTransitionGuard = + PauseDuringStateTransitions{controller(), {DonorStateEnum::kDonatingInitialData}}; auto doc = makeStateDocument(); auto opCtx = makeOperationContext(); DonorStateMachine::insertStateDocument(opCtx.get(), doc); auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); - donatingInitialDataTransitionGuard->wait(); + donatingInitialDataTransitionGuard->wait(DonorStateEnum::kDonatingInitialData); stepDown(); donatingInitialDataTransitionGuard.reset(); @@ -301,8 +324,8 @@ TEST_F(ReshardingDonorServiceTest, WritesNoOpOplogEntryToGenerateMinFetchTimesta } TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBlocked) { - boost::optional<PauseDuringStateTransition> blockingWritesTransitionGuard = - PauseDuringStateTransition{controller(), DonorStateEnum::kBlockingWrites}; + boost::optional<PauseDuringStateTransitions> blockingWritesTransitionGuard = + PauseDuringStateTransitions{controller(), {DonorStateEnum::kBlockingWrites}}; auto doc = makeStateDocument(); auto opCtx = makeOperationContext(); @@ -312,7 +335,7 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl notifyRecipientsDoneCloning(opCtx.get(), *donor, doc); notifyToStartBlockingWrites(opCtx.get(), *donor, doc); - blockingWritesTransitionGuard->wait(); + blockingWritesTransitionGuard->wait(DonorStateEnum::kBlockingWrites); stepDown(); blockingWritesTransitionGuard.reset(); @@ -348,17 +371,80 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl << cursor->nextSafe(); } -TEST_F(ReshardingDonorServiceTest, DropsSourceCollectionWhenDone) { +TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) { + const std::vector<DonorStateEnum> _donorStates{DonorStateEnum::kDonatingInitialData, + DonorStateEnum::kDonatingOplogEntries, + DonorStateEnum::kPreparingToBlockWrites, + DonorStateEnum::kBlockingWrites, + DonorStateEnum::kDone}; + boost::optional<PauseDuringStateTransitions> stateTransitionsGuard = + PauseDuringStateTransitions{controller(), _donorStates}; auto doc = makeStateDocument(); - auto opCtx = makeOperationContext(); + { + auto opCtx = makeOperationContext(); + DonorStateMachine::insertStateDocument(opCtx.get(), doc); + } + auto prevState = DonorStateEnum::kUnused; + for (const auto state : _donorStates) { + { + auto opCtx = makeOperationContext(); + auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); + + if (prevState != DonorStateEnum::kUnused) { + // Allow the transition to prevState to succeed on this primary-only service + // instance. + stateTransitionsGuard->unset(prevState); + } + + // Signal a change in the coordinator's state for donor state transitions dependent + // on it. + switch (state) { + case DonorStateEnum::kDonatingOplogEntries: { + notifyRecipientsDoneCloning(opCtx.get(), *donor, doc); + break; + } + case DonorStateEnum::kPreparingToBlockWrites: { + notifyToStartBlockingWrites(opCtx.get(), *donor, doc); + break; + } + case DonorStateEnum::kDone: { + notifyReshardingOutcomeDecided(opCtx.get(), *donor, doc, Status::OK()); + break; + } + default: + break; + } + + // Step down before the transition to state can complete. + stateTransitionsGuard->wait(state); + stepDown(); + + ASSERT_EQ(donor->getCompletionFuture().getNoThrow(), + ErrorCodes::InterruptedDueToReplStateChange); + + prevState = state; + } + + stepUp(); + } + + // Finally complete the operation and ensure its success. { - OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( - opCtx.get()); - CollectionOptions options; - options.uuid = doc.getSourceUUID(); - resharding::data_copy::ensureCollectionExists(opCtx.get(), doc.getSourceNss(), options); + auto opCtx = makeOperationContext(); + auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); + + stateTransitionsGuard->unset(DonorStateEnum::kDone); + notifyReshardingOutcomeDecided(opCtx.get(), *donor, doc, Status::OK()); + ASSERT_OK(donor->getCompletionFuture().getNoThrow()); } +} + +TEST_F(ReshardingDonorServiceTest, DropsSourceCollectionWhenDone) { + auto doc = makeStateDocument(); + auto opCtx = makeOperationContext(); + + createOriginalCollection(opCtx.get(), doc); DonorStateMachine::insertStateDocument(opCtx.get(), doc); auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); @@ -381,17 +467,50 @@ TEST_F(ReshardingDonorServiceTest, DropsSourceCollectionWhenDone) { } } -TEST_F(ReshardingDonorServiceTest, RetainsSourceCollectionOnError) { +TEST_F(ReshardingDonorServiceTest, CompletesWithStepdownAfterError) { + boost::optional<PauseDuringStateTransitions> stateTransitionsGuard = + PauseDuringStateTransitions{controller(), {DonorStateEnum::kDone}}; auto doc = makeStateDocument(); - auto opCtx = makeOperationContext(); + { + auto opCtx = makeOperationContext(); + + createOriginalCollection(opCtx.get(), doc); + + DonorStateMachine::insertStateDocument(opCtx.get(), doc); + auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); + + notifyRecipientsDoneCloning(opCtx.get(), *donor, doc); + notifyReshardingOutcomeDecided(opCtx.get(), *donor, doc, {ErrorCodes::InternalError, ""}); + stateTransitionsGuard->wait(DonorStateEnum::kDone); + stepDown(); + + ASSERT_EQ(donor->getCompletionFuture().getNoThrow(), + ErrorCodes::InterruptedDueToReplStateChange); + } + stepUp(); { - CollectionOptions options; - options.uuid = doc.getSourceUUID(); - OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection( - opCtx.get()); - resharding::data_copy::ensureCollectionExists(opCtx.get(), doc.getSourceNss(), options); + auto opCtx = makeOperationContext(); + auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); + + stateTransitionsGuard->unset(DonorStateEnum::kDone); + + notifyReshardingOutcomeDecided(opCtx.get(), *donor, doc, {ErrorCodes::InternalError, ""}); + ASSERT_EQ(donor->getCompletionFuture().getNoThrow(), ErrorCodes::InternalError); + { + // Verify original collection still exists even with stepdown. + AutoGetCollection coll(opCtx.get(), doc.getSourceNss(), MODE_IS); + ASSERT_TRUE(bool(coll)); + ASSERT_EQ(coll->uuid(), doc.getSourceUUID()); + } } +} + +TEST_F(ReshardingDonorServiceTest, RetainsSourceCollectionOnError) { + auto doc = makeStateDocument(); + auto opCtx = makeOperationContext(); + + createOriginalCollection(opCtx.get(), doc); DonorStateMachine::insertStateDocument(opCtx.get(), doc); auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl index bfbcb63d3a3..ee3d88aab52 100644 --- a/src/mongo/s/resharding/common_types.idl +++ b/src/mongo/s/resharding/common_types.idl @@ -60,10 +60,9 @@ enums: kDonatingInitialData: "donating-initial-data" kDonatingOplogEntries: "donating-oplog-entries" kPreparingToBlockWrites: "preparing-to-block-writes" + kError: "error" kBlockingWrites: "blocking-writes" - kDropping: "dropping" kDone: "done" - kError: "error" RecipientState: description: "The current state of a recipient shard for a resharding operation." @@ -75,10 +74,10 @@ enums: kCloning: "cloning" kApplying: "applying" kSteadyState: "steady-state" + kError: "error" kStrictConsistency: "strict-consistency" kRenaming: "renaming" kDone: "done" - kError: "error" ReshardingOperationStatus: description: "The status of the current or most recent resharding operation." |