summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2021-03-11 20:52:05 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-08 17:33:32 +0000
commit60f2db4bd2f1bad10f1c92f3eb0b626eca933477 (patch)
tree339b11ffe910e0ae7b6301b8ab3338a8646708c1 /src
parent4b8f97ab352255b4d9cb265f3f5f5665998a6c46 (diff)
downloadmongo-60f2db4bd2f1bad10f1c92f3eb0b626eca933477.tar.gz
SERVER-51606 Handle recovery from resharding donors
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.cpp76
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.h27
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp34
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp19
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h6
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp371
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h63
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service_test.cpp181
-rw-r--r--src/mongo/s/resharding/common_types.idl5
9 files changed, 513 insertions, 269 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
index 2cc932b201e..3722b55eee9 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
@@ -136,41 +136,6 @@ boost::optional<Status> getAbortReasonIfExists(
return boost::none;
}
-
-template <class TState, class TParticipant>
-bool allParticipantsDoneWithAbortReason(WithLock lk,
- TState expectedState,
- const std::vector<TParticipant>& participants) {
- for (const auto& shard : participants) {
- if (!(shard.getMutableState().getState() == expectedState &&
- shard.getMutableState().getAbortReason().is_initialized())) {
- return false;
- }
- }
- return true;
-}
-
-/**
- * Fulfills allParticipantsDoneAbortingSp if all participants have reported to the coordinator that
- * they have finished aborting locally.
- */
-void checkAllParticipantsAborted(WithLock lk,
- SharedPromise<void>& allParticipantsDoneAbortingSp,
- const ReshardingCoordinatorDocument& updatedStateDoc) {
- if (allParticipantsDoneAbortingSp.getFuture().isReady()) {
- return;
- }
-
- bool allDonorsAborted = allParticipantsDoneWithAbortReason(
- lk, DonorStateEnum::kDone, updatedStateDoc.getDonorShards());
- bool allRecipientsAborted = allParticipantsDoneWithAbortReason(
- lk, RecipientStateEnum::kDone, updatedStateDoc.getRecipientShards());
-
- if (allDonorsAborted && allRecipientsAborted) {
- allParticipantsDoneAbortingSp.emplaceValue();
- }
-}
-
} // namespace
ReshardingCoordinatorObserver::ReshardingCoordinatorObserver() = default;
@@ -181,18 +146,16 @@ ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() {
invariant(_allRecipientsFinishedCloning.getFuture().isReady());
invariant(_allRecipientsFinishedApplying.getFuture().isReady());
invariant(_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady());
- invariant(_allRecipientsRenamedCollection.getFuture().isReady());
- invariant(_allDonorsDroppedOriginalCollection.getFuture().isReady());
+ invariant(_allRecipientsDone.getFuture().isReady());
+ invariant(_allDonorsDone.getFuture().isReady());
}
void ReshardingCoordinatorObserver::onReshardingParticipantTransition(
const ReshardingCoordinatorDocument& updatedStateDoc) {
stdx::lock_guard<Latch> lk(_mutex);
-
if (auto abortReason = getAbortReasonIfExists(updatedStateDoc)) {
_onAbortOrStepdown(lk, abortReason.get());
- checkAllParticipantsAborted(lk, _allParticipantsDoneAborting, updatedStateDoc);
- return;
+ // Don't exit early since the coordinator waits for all participants to report state 'done'.
}
if (!stateTransistionsComplete(lk,
@@ -222,12 +185,11 @@ void ReshardingCoordinatorObserver::onReshardingParticipantTransition(
}
if (!stateTransistionsComplete(
- lk, _allRecipientsRenamedCollection, RecipientStateEnum::kDone, updatedStateDoc)) {
+ lk, _allRecipientsDone, RecipientStateEnum::kDone, updatedStateDoc)) {
return;
}
- if (!stateTransistionsComplete(
- lk, _allDonorsDroppedOriginalCollection, DonorStateEnum::kDone, updatedStateDoc)) {
+ if (!stateTransistionsComplete(lk, _allDonorsDone, DonorStateEnum::kDone, updatedStateDoc)) {
return;
}
}
@@ -253,25 +215,25 @@ ReshardingCoordinatorObserver::awaitAllRecipientsInStrictConsistency() {
}
SharedSemiFuture<ReshardingCoordinatorDocument>
-ReshardingCoordinatorObserver::awaitAllDonorsDroppedOriginalCollection() {
- return _allDonorsDroppedOriginalCollection.getFuture();
+ReshardingCoordinatorObserver::awaitAllDonorsDone() {
+ return _allDonorsDone.getFuture();
}
SharedSemiFuture<ReshardingCoordinatorDocument>
-ReshardingCoordinatorObserver::awaitAllRecipientsRenamedCollection() {
- return _allRecipientsRenamedCollection.getFuture();
-}
-
-SharedSemiFuture<void> ReshardingCoordinatorObserver::awaitAllParticipantsDoneAborting() {
- return _allParticipantsDoneAborting.getFuture();
+ReshardingCoordinatorObserver::awaitAllRecipientsDone() {
+ return _allRecipientsDone.getFuture();
}
void ReshardingCoordinatorObserver::interrupt(Status status) {
stdx::lock_guard<Latch> lk(_mutex);
_onAbortOrStepdown(lk, status);
- if (!_allParticipantsDoneAborting.getFuture().isReady()) {
- _allParticipantsDoneAborting.setError(status);
+ if (!_allRecipientsDone.getFuture().isReady()) {
+ _allRecipientsDone.setError(status);
+ }
+
+ if (!_allDonorsDone.getFuture().isReady()) {
+ _allDonorsDone.setError(status);
}
}
@@ -300,14 +262,6 @@ void ReshardingCoordinatorObserver::_onAbortOrStepdown(WithLock, Status status)
if (!_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady()) {
_allRecipientsReportedStrictConsistencyTimestamp.setError(status);
}
-
- if (!_allRecipientsRenamedCollection.getFuture().isReady()) {
- _allRecipientsRenamedCollection.setError(status);
- }
-
- if (!_allDonorsDroppedOriginalCollection.getFuture().isReady()) {
- _allDonorsDroppedOriginalCollection.setError(status);
- }
}
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
index b0c6eedfe95..5860945df66 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
@@ -92,22 +92,16 @@ public:
SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllRecipientsInStrictConsistency();
/**
- * Fulfills the '_allRecipientsRenamedCollection' promise when the last recipient writes
+ * Fulfills the '_allRecipientsDone' promise when the last recipient writes
* that it is in 'done' state.
*/
- SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllRecipientsRenamedCollection();
+ SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllRecipientsDone();
/**
- * Fulfills the '_allDonorsDroppedOriginalCollection' promise when the last donor writes that it
+ * Fulfills the '_allDonorsDone' promise when the last donor writes that it
* is in 'done' state.
*/
- SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllDonorsDroppedOriginalCollection();
-
- /**
- * Fulfills the '_allParticipantsDoneAborting' promise when the last recipient or donor writes
- * that it is in 'kDone' with an abortReason.
- */
- SharedSemiFuture<void> awaitAllParticipantsDoneAborting();
+ SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllDonorsDone();
/**
* Checks if all recipients are in steady state. Otherwise, sets an error state so that
@@ -142,11 +136,8 @@ private:
* {_allRecipientsFinishedCloning, RecipientStateEnum::kApplying}
* {_allRecipientsFinishedApplying, RecipientStateEnum::kSteadyState}
* {_allRecipientsReportedStrictConsistencyTimestamp, RecipientStateEnum::kStrictConsistency}
- * {_allRecipientsRenamedCollection, RecipientStateEnum::kDone}
- * {_allDonorsDroppedOriginalCollection, DonorStateEnum::kDone}
- * {_allParticipantsDoneAborting,
- * DonorStateEnum::kDone with abortReason AND
- * RecipientStateEnum::kDone with abortReason}
+ * {_allRecipientsDone, RecipientStateEnum::kDone}
+ * {_allDonorsDone, DonorStateEnum::kDone}
*/
SharedPromise<ReshardingCoordinatorDocument> _allDonorsReportedMinFetchTimestamp;
@@ -157,11 +148,9 @@ private:
SharedPromise<ReshardingCoordinatorDocument> _allRecipientsReportedStrictConsistencyTimestamp;
- SharedPromise<ReshardingCoordinatorDocument> _allRecipientsRenamedCollection;
-
- SharedPromise<ReshardingCoordinatorDocument> _allDonorsDroppedOriginalCollection;
+ SharedPromise<ReshardingCoordinatorDocument> _allRecipientsDone;
- SharedPromise<void> _allParticipantsDoneAborting;
+ SharedPromise<ReshardingCoordinatorDocument> _allDonorsDone;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
index ffb44e7abdd..a9f5fc29524 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
@@ -59,15 +59,18 @@ protected:
DonorStateEnum donorState,
boost::optional<Timestamp> timestamp = boost::none,
boost::optional<Status> abortReason = boost::none) {
- return {makeDonorShard(ShardId{"s1"}, donorState, timestamp, abortReason),
+ // The mock state here is simulating only one donor shard having errored locally.
+ return {makeDonorShard(ShardId{"s1"}, donorState, timestamp),
makeDonorShard(ShardId{"s2"}, donorState, timestamp, abortReason),
- makeDonorShard(ShardId{"s3"}, donorState, timestamp, abortReason)};
+ makeDonorShard(ShardId{"s3"}, donorState, timestamp)};
}
std::vector<RecipientShardEntry> makeMockRecipientsInState(
RecipientStateEnum recipientState,
boost::optional<Timestamp> timestamp = boost::none,
boost::optional<Status> abortReason = boost::none) {
+ // TODO SERVER-55511: Make the mock state here simulate only one recipient shard errored
+ // locally.
return {makeRecipientShard(ShardId{"s1"}, recipientState, abortReason),
makeRecipientShard(ShardId{"s2"}, recipientState, abortReason),
makeRecipientShard(ShardId{"s3"}, recipientState, abortReason)};
@@ -160,27 +163,30 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) {
TEST_F(ReshardingCoordinatorObserverTest, participantsDoneAborting) {
auto reshardingObserver = std::make_shared<ReshardingCoordinatorObserver>();
- auto fut = reshardingObserver->awaitAllParticipantsDoneAborting();
+ auto fut = reshardingObserver->awaitAllDonorsDone();
ASSERT_FALSE(fut.isReady());
auto abortReason = Status{ErrorCodes::InternalError, "We gotta abort"};
- // All participants have an abortReason, but not all are in state kDone yet.
- auto donorShards = makeMockDonorsInState(DonorStateEnum::kDone, Timestamp(1, 1), abortReason);
- std::vector<RecipientShardEntry> recipientShards0{
- {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kError, abortReason)},
- {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kDone, abortReason)},
- {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kDone, abortReason)}};
+ // All recipients and donors are done (including the donor who caused the abort) except a single
+ // donor who hasn't seen there was an error yet.
+ auto recipientShards = makeMockRecipientsInState(RecipientStateEnum::kDone, Timestamp(1, 1));
+ std::vector<DonorShardEntry> donorShards0{
+ {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)},
+ {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDonatingOplogEntries, Timestamp(1, 1))},
+ {makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}};
auto coordinatorDoc0 =
- makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards, abortReason);
+ makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards0, abortReason);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
ASSERT_FALSE(fut.isReady());
- // All participants in state kDone with abortReason.
- auto recipientShards1 =
- makeMockRecipientsInState(RecipientStateEnum::kDone, boost::none, abortReason);
+ // All participants are done.
+ std::vector<DonorShardEntry> donorShards1{
+ {makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDone, Timestamp(1, 1), abortReason)},
+ {makeDonorShard(ShardId{"s2"}, DonorStateEnum::kDone, Timestamp(1, 1))},
+ {makeDonorShard(ShardId{"s3"}, DonorStateEnum::kDone, Timestamp(1, 1))}};
auto coordinatorDoc1 =
- makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards, abortReason);
+ makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards1, abortReason);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
ASSERT_TRUE(fut.isReady());
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 3a323652fe8..35df4d6b38b 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -993,7 +993,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishRe
// The shared_ptr maintaining the ReshardingCoordinatorService Instance object gets
// deleted from the PrimaryOnlyService's map. Thus, shared_from_this() is necessary to
// keep 'this' pointer alive for the remaining callbacks.
- return _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(executor);
+ return _awaitAllParticipantShardsDone(executor);
})
.onError([this, self = shared_from_this(), executor](Status status) {
{
@@ -1088,9 +1088,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_onAbort(
// Wait for all participants to acknowledge the operation reached an unrecoverable
// error.
- future_util::withCancellation(
- _reshardingCoordinatorObserver->awaitAllParticipantsDoneAborting(),
- _ctHolder->getStepdownToken())
+ future_util::withCancellation(_awaitAllParticipantShardsDone(executor),
+ _ctHolder->getStepdownToken())
.get();
}
@@ -1368,20 +1367,18 @@ Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisi
return Status::OK();
};
-ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::
- _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ExecutorFuture<void>
+ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsDone(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kDecisionPersisted) {
return ExecutorFuture<void>(**executor, Status::OK());
}
std::vector<ExecutorFuture<ReshardingCoordinatorDocument>> futures;
futures.emplace_back(
- _reshardingCoordinatorObserver->awaitAllRecipientsRenamedCollection().thenRunOn(
- **executor));
+ _reshardingCoordinatorObserver->awaitAllRecipientsDone().thenRunOn(**executor));
futures.emplace_back(
- _reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection().thenRunOn(
- **executor));
+ _reshardingCoordinatorObserver->awaitAllDonorsDone().thenRunOn(**executor));
// We only allow the stepdown token to cancel operations after progressing past
// kDecisionPersisted.
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index 6a675fe9121..9f3df91fc42 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -313,13 +313,13 @@ private:
/**
* Waits on _reshardingCoordinatorObserver to notify that:
* 1. All recipient shards have renamed the temporary collection to the original collection
- * namespace, and
+ * namespace or have finished aborting, and
* 2. All donor shards that were not also recipient shards have dropped the original
- * collection.
+ * collection or have finished aborting.
*
* Transitions to 'kDone'.
*/
- ExecutorFuture<void> _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(
+ ExecutorFuture<void> _awaitAllParticipantShardsDone(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
/**
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index e7d3b6d3ed0..62ebb7305d8 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/ops/delete.h"
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/wait_for_majority_service.h"
@@ -51,10 +52,12 @@
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
#include "mongo/s/grid.h"
+#include "mongo/util/future_util.h"
namespace mongo {
MONGO_FAIL_POINT_DEFINE(reshardingDonorFailsBeforePreparingToMirror);
+MONGO_FAIL_POINT_DEFINE(removeDonorDocFailpoint);
using namespace fmt::literals;
@@ -99,6 +102,18 @@ Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss,
}
/**
+ * Returns whether it is possible for the donor to be in 'state' when resharding will indefinitely
+ * abort.
+ */
+bool inPotentialAbortScenario(const DonorStateEnum& state) {
+ // Regardless of whether resharding will abort or commit, the donor will eventually reach state
+ // kDone.
+ // Additionally, if the donor is in state kError, it is guaranteed that the coordinator will
+ // eventually begin the abort process.
+ return state == DonorStateEnum::kError || state == DonorStateEnum::kDone;
+}
+
+/**
* Fulfills the promise if it is not already. Otherwise, does nothing.
*/
void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) {
@@ -113,6 +128,34 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp, Status error)
}
}
+/**
+ * Automatically retries the callable until there is an error encountered that resharding cannot
+ * recover from or the cancelToken is canceled.
+ */
+template <typename Callable>
+ExecutorFuture<void> withAutomaticRetry(std::shared_ptr<executor::TaskExecutor> executor,
+ CancellationToken cancelToken,
+ Callable&& callable) {
+ return AsyncTry<Callable>(std::move(callable))
+ .until([cancelToken](Status status) {
+ if (status.isA<ErrorCategory::RetriableError>() ||
+ status.isA<ErrorCategory::CursorInvalidatedError>() ||
+ status == ErrorCodes::Interrupted ||
+ status.isA<ErrorCategory::CancellationError>() ||
+ status.isA<ErrorCategory::NotPrimaryError>()) {
+ // Retry on errors from stray killCursors and killOp commands being run. Also retry
+ // for notPrimary and cancellation errors to ensure the loop is not prematurely
+ // canceled if the errors originate from a remote shard instead of this shard - if
+ // there is a failover/stepdown, the cancelToken will eventually be canceled and
+ // bypass this .until() block altogether.
+ return false;
+ }
+
+ return true;
+ })
+ .on(std::move(executor), cancelToken);
+}
+
class ExternalStateImpl : public ReshardingDonorService::DonorStateMachineExternalState {
public:
ShardId myShardId(ServiceContext* serviceContext) const override {
@@ -168,96 +211,149 @@ ReshardingDonorService::DonorStateMachine::DonorStateMachine(
ReshardingDonorService::DonorStateMachine::~DonorStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
- invariant(_allRecipientsDoneCloning.getFuture().isReady());
- invariant(_allRecipientsDoneApplying.getFuture().isReady());
- invariant(_coordinatorHasDecisionPersisted.getFuture().isReady());
invariant(_completionPromise.getFuture().isReady());
}
-SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
- std::shared_ptr<executor::ScopedTaskExecutor> executor,
- const CancellationToken& token) noexcept {
- return ExecutorFuture<void>(**executor)
- .then(
- [this] { _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData(); })
- .then([this, executor] {
- return _awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries(executor);
- })
- .then([this, executor] {
- return _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(executor);
- })
- .then([this] { _writeTransactionOplogEntryThenTransitionToBlockingWrites(); })
- .then([this, executor] {
- return _awaitCoordinatorHasDecisionPersistedThenTransitionToDropping(executor);
- })
- .then([this] { _dropOriginalCollection(); })
- .then([this, executor] {
- auto opCtx = cc().makeOperationContext();
- return _updateCoordinator(opCtx.get(), executor);
- })
- .onError([this, executor](Status status) {
- Status error = status;
- {
- stdx::lock_guard<Latch> lk(_mutex);
- if (_abortStatus)
- error = *_abortStatus;
+ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_runUntilBlockingWritesOrErrored(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) noexcept {
+ return withAutomaticRetry(
+ **executor,
+ abortToken,
+ [this, executor, abortToken] {
+ return ExecutorFuture(**executor)
+ .then([this] {
+ _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData();
+ })
+ .then([this, executor, abortToken] {
+ return _awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries(
+ executor, abortToken);
+ })
+ .then([this, executor, abortToken] {
+ return _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
+ executor, abortToken);
+ })
+ .then(
+ [this] { _writeTransactionOplogEntryThenTransitionToBlockingWrites(); });
+ })
+ .onError([this, executor, abortToken](Status status) {
+ if (abortToken.isCanceled()) {
+ return ExecutorFuture<void>(**executor, status);
}
LOGV2(4956400,
"Resharding operation donor state machine failed",
"namespace"_attr = _metadata.getSourceNss(),
"reshardingUUID"_attr = _metadata.getReshardingUUID(),
- "error"_attr = error);
+ "error"_attr = status);
+
+ return withAutomaticRetry(**executor, abortToken, [this, status] {
+ // It is illegal to transition into kError if state has already surpassed
+ // kPreparingToBlockWrites.
+ invariant(_donorCtx.getState() < DonorStateEnum::kBlockingWrites);
+ _transitionToError(status);
+
+ // Intentionally swallow the error - by transitioning to kError, the donor
+ // effectively recovers from encountering the error and should continue running in
+ // the future chain.
+ });
+ });
+}
- _transitionToError(error);
- auto opCtx = cc().makeOperationContext();
- return _updateCoordinator(opCtx.get(), executor)
- .then([this] {
- // TODO SERVER-52838: Ensure all local collections that may have been created
- // for resharding are removed, with the exception of the
- // ReshardingDonorDocument, before transitioning to kDone.
- _transitionState(DonorStateEnum::kDone);
- })
- .then([this, executor] {
- auto opCtx = cc().makeOperationContext();
- return _updateCoordinator(opCtx.get(), executor);
- })
- .then([this, error] { return error; });
- })
- .onCompletion([this, self = shared_from_this()](Status status) {
+ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_notifyCoordinatorAndAwaitDecision(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) noexcept {
+ if (_donorCtx.getState() == DonorStateEnum::kDone) {
+ return ExecutorFuture(**executor);
+ }
+
+ return withAutomaticRetry(**executor,
+ abortToken,
+ [this, executor] {
+ auto opCtx = cc().makeOperationContext();
+ return _updateCoordinator(opCtx.get(), executor);
+ })
+ .then([this, abortToken] {
+ return future_util::withCancellation(_coordinatorHasDecisionPersisted.getFuture(),
+ abortToken);
+ });
+}
+
+ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_finishReshardingOperation(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& stepdownToken,
+ bool aborted) noexcept {
+ return withAutomaticRetry(**executor, stepdownToken, [this, executor, stepdownToken, aborted] {
+ if (!aborted) {
+ // If a failover occured after the donor transitioned to done locally, but before it
+ // notified the coordinator, it will already be in state done here. Otherwise, it must
+ // be in blocking-writes before transitioning to done.
+ invariant(_donorCtx.getState() == DonorStateEnum::kBlockingWrites ||
+ _donorCtx.getState() == DonorStateEnum::kDone);
+
+ _dropOriginalCollectionThenTransitionToDone();
+ } else if (_donorCtx.getState() != DonorStateEnum::kDone) {
+ // If aborted, the donor must be allowed to transition to done from any state.
+ _transitionState(DonorStateEnum::kDone);
+ }
+
+ auto opCtx = cc().makeOperationContext();
+ return _updateCoordinator(opCtx.get(), executor).then([this] {
{
- stdx::lock_guard<Latch> lg(_mutex);
- if (_completionPromise.getFuture().isReady()) {
- // interrupt() was called before we got here.
- return;
- }
+ auto opCtx = cc().makeOperationContext();
+ removeDonorDocFailpoint.pauseWhileSet(opCtx.get());
}
+ _removeDonorDocument();
+ });
+ });
+}
- if (status.isOK()) {
- // The shared_ptr stored in the PrimaryOnlyService's map for the
- // ReshardingDonorService Instance is removed when the donor state document tied to
- // the instance is deleted. It is necessary to use shared_from_this() to extend the
- // lifetime so the code can safely finish executing.
- _removeDonorDocument();
- stdx::lock_guard<Latch> lg(_mutex);
- if (!_completionPromise.getFuture().isReady()) {
- _completionPromise.emplaceValue();
- }
- } else {
- stdx::lock_guard<Latch> lg(_mutex);
- if (!_completionPromise.getFuture().isReady()) {
- _completionPromise.setError(status);
- }
+SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& stepdownToken) noexcept {
+ auto abortToken = _initAbortSource(stepdownToken);
+
+ return _runUntilBlockingWritesOrErrored(executor, abortToken)
+ .then([this, executor, abortToken] {
+ return _notifyCoordinatorAndAwaitDecision(executor, abortToken);
+ })
+ .onCompletion([executor, stepdownToken, abortToken](Status status) {
+ if (stepdownToken.isCanceled()) {
+ // Propagate any errors from the donor stepping down.
+ return ExecutorFuture<bool>(**executor, status);
}
+
+ if (!status.isOK() && !abortToken.isCanceled()) {
+ // Propagate any errors from the donor failing to notify the coordinator.
+ return ExecutorFuture<bool>(**executor, status);
+ }
+
+ return ExecutorFuture(**executor, abortToken.isCanceled());
})
+ .then([this, executor, stepdownToken](bool aborted) {
+ return _finishReshardingOperation(executor, stepdownToken, aborted);
+ })
+ .onError([this, stepdownToken](Status status) {
+ if (stepdownToken.isCanceled()) {
+ // The operation will continue on a new DonorStateMachine.
+ return status;
+ }
+
+ LOGV2_FATAL(5160600,
+ "Unrecoverable error occurred past the point donor was prepared to "
+ "complete the resharding operation",
+ "error"_attr = redact(status));
+ })
+ // The shared_ptr stored in the PrimaryOnlyService's map for the ReshardingDonorService
+ // Instance is removed when the donor state document tied to the instance is deleted. It is
+ // necessary to use shared_from_this() to extend the lifetime so the all earlier code can
+ // safely finish executing.
+ .onCompletion([anchor = shared_from_this()](Status status) { return status; })
.semi();
}
void ReshardingDonorService::DonorStateMachine::interrupt(Status status) {
- // Resolve any unresolved promises to avoid hanging.
stdx::lock_guard<Latch> lk(_mutex);
- _abortStatus.emplace(status);
- _onAbortOrStepdown(lk, status);
if (!_completionPromise.getFuture().isReady()) {
_completionPromise.setError(status);
}
@@ -276,15 +372,13 @@ boost::optional<BSONObj> ReshardingDonorService::DonorStateMachine::reportForCur
void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
OperationContext* opCtx, const TypeCollectionReshardingFields& reshardingFields) {
- stdx::lock_guard<Latch> lk(_mutex);
if (reshardingFields.getAbortReason()) {
- auto status = getStatusFromAbortReason(reshardingFields);
- _abortStatus.emplace(status);
- _onAbortOrStepdown(lk, status);
- _critSec.reset();
+ auto abortReason = getStatusFromAbortReason(reshardingFields);
+ _onAbortEncountered(abortReason);
return;
}
+ stdx::lock_guard<Latch> lk(_mutex);
auto coordinatorState = reshardingFields.getState();
if (coordinatorState >= CoordinatorStateEnum::kApplying) {
ensureFulfilledPromise(lk, _allRecipientsDoneCloning);
@@ -308,9 +402,14 @@ void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
void ReshardingDonorService::DonorStateMachine::
_onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData() {
if (_donorCtx.getState() > DonorStateEnum::kPreparingToDonate) {
- invariant(_donorCtx.getMinFetchTimestamp());
- invariant(_donorCtx.getBytesToClone());
- invariant(_donorCtx.getDocumentsToClone());
+ if (!inPotentialAbortScenario(_donorCtx.getState())) {
+ // The invariants won't hold if an unrecoverable error is encountered before the donor
+ // makes enough progress to transition to kDonatingInitialData and then a failover
+ // occurs.
+ invariant(_donorCtx.getMinFetchTimestamp());
+ invariant(_donorCtx.getBytesToClone());
+ invariant(_donorCtx.getDocumentsToClone());
+ }
return;
}
@@ -360,14 +459,17 @@ void ReshardingDonorService::DonorStateMachine::
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
_awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
if (_donorCtx.getState() > DonorStateEnum::kDonatingInitialData) {
return ExecutorFuture<void>(**executor, Status::OK());
}
auto opCtx = cc().makeOperationContext();
return _updateCoordinator(opCtx.get(), executor)
- .then([this] { return _allRecipientsDoneCloning.getFuture(); })
+ .then([this, abortToken] {
+ return future_util::withCancellation(_allRecipientsDoneCloning.getFuture(), abortToken);
+ })
.thenRunOn(**executor)
.then([this]() { _transitionState(DonorStateEnum::kDonatingOplogEntries); })
.onCompletion([=](Status s) {
@@ -379,14 +481,15 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
_awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) {
if (_donorCtx.getState() > DonorStateEnum::kDonatingOplogEntries) {
return ExecutorFuture<void>(**executor, Status::OK());
}
- return _allRecipientsDoneApplying.getFuture().thenRunOn(**executor).then([this]() {
- _transitionState(DonorStateEnum::kPreparingToBlockWrites);
- });
+ return future_util::withCancellation(_allRecipientsDoneApplying.getFuture(), abortToken)
+ .thenRunOn(**executor)
+ .then([this]() { _transitionState(DonorStateEnum::kPreparingToBlockWrites); });
}
void ReshardingDonorService::DonorStateMachine::
@@ -468,20 +571,8 @@ SharedSemiFuture<void> ReshardingDonorService::DonorStateMachine::awaitFinalOplo
return _finalOplogEntriesWritten.getFuture();
}
-ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
- _awaitCoordinatorHasDecisionPersistedThenTransitionToDropping(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenTransitionToDone() {
if (_donorCtx.getState() > DonorStateEnum::kBlockingWrites) {
- return ExecutorFuture<void>(**executor, Status::OK());
- }
-
- return _coordinatorHasDecisionPersisted.getFuture().thenRunOn(**executor).then([this]() {
- _transitionState(DonorStateEnum::kDropping);
- });
-}
-
-void ReshardingDonorService::DonorStateMachine::_dropOriginalCollection() {
- if (_donorCtx.getState() > DonorStateEnum::kDropping) {
return;
}
@@ -557,10 +648,12 @@ BSONObj ReshardingDonorService::DonorStateMachine::_makeQueryForCoordinatorUpdat
{DonorStateEnum::kDonatingInitialData, {DonorStateEnum::kUnused}},
{DonorStateEnum::kError,
{DonorStateEnum::kUnused, DonorStateEnum::kDonatingInitialData}},
+ {DonorStateEnum::kBlockingWrites, {DonorStateEnum::kDonatingInitialData}},
{DonorStateEnum::kDone,
{DonorStateEnum::kUnused,
DonorStateEnum::kDonatingInitialData,
- DonorStateEnum::kError}},
+ DonorStateEnum::kError,
+ DonorStateEnum::kBlockingWrites}},
};
auto it = validPreviousStateMap.find(newState);
@@ -651,30 +744,80 @@ void ReshardingDonorService::DonorStateMachine::_updateDonorDocument(
void ReshardingDonorService::DonorStateMachine::_removeDonorDocument() {
auto opCtx = cc().makeOperationContext();
- PersistentTaskStore<ReshardingDonorDocument> store(
- NamespaceString::kDonorReshardingOperationsNamespace);
- store.remove(
- opCtx.get(),
- BSON(ReshardingDonorDocument::kReshardingUUIDFieldName << _metadata.getReshardingUUID()),
- kNoWaitWriteConcern);
+
+ const auto& nss = NamespaceString::kDonorReshardingOperationsNamespace;
+ writeConflictRetry(opCtx.get(), "DonorStateMachine::_removeDonorDocument", nss.toString(), [&] {
+ AutoGetCollection coll(opCtx.get(), nss, MODE_IX);
+
+ if (!coll) {
+ return;
+ }
+
+ WriteUnitOfWork wuow(opCtx.get());
+
+ opCtx->recoveryUnit()->onCommit([this](boost::optional<Timestamp> unusedCommitTime) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (_abortReason) {
+ ensureFulfilledPromise(lk, _completionPromise, _abortReason.get());
+ } else {
+ ensureFulfilledPromise(lk, _completionPromise);
+ }
+ });
+
+ deleteObjects(opCtx.get(),
+ *coll,
+ nss,
+ BSON(ReshardingDonorDocument::kReshardingUUIDFieldName
+ << _metadata.getReshardingUUID()),
+ true /* justOne */);
+
+ wuow.commit();
+ });
}
-void ReshardingDonorService::DonorStateMachine::_onAbortOrStepdown(WithLock, Status status) {
- if (!_allRecipientsDoneCloning.getFuture().isReady()) {
- _allRecipientsDoneCloning.setError(status);
+CancellationToken ReshardingDonorService::DonorStateMachine::_initAbortSource(
+ const CancellationToken& stepdownToken) {
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _abortSource = CancellationSource(stepdownToken);
}
- if (!_allRecipientsDoneApplying.getFuture().isReady()) {
- _allRecipientsDoneApplying.setError(status);
+ if (auto future = _coordinatorHasDecisionPersisted.getFuture(); future.isReady()) {
+ if (auto status = future.getNoThrow(); !status.isOK()) {
+ // onReshardingFieldsChanges() missed canceling _abortSource because _initAbortSource()
+ // hadn't been called yet. We used an error status stored in
+ // _coordinatorHasDecisionPersisted as an indication that an abort had been received.
+ // Canceling _abortSource immediately allows callers to use the returned abortToken as a
+ // definitive means of checking whether the operation has been aborted.
+ _abortSource->cancel();
+ }
}
- if (!_finalOplogEntriesWritten.getFuture().isReady()) {
- _finalOplogEntriesWritten.setError(status);
- }
+ return _abortSource->token();
+}
- if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) {
- _coordinatorHasDecisionPersisted.setError(status);
+void ReshardingDonorService::DonorStateMachine::_onAbortEncountered(const Status& abortReason) {
+ auto abortSource = [&]() -> boost::optional<CancellationSource> {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _abortReason = abortReason;
+ invariant(!_abortReason->isOK());
+
+ if (_abortSource) {
+ return _abortSource;
+ } else {
+ // run() hasn't been called, notify the operation should be aborted by setting an
+ // error.
+ invariant(!_coordinatorHasDecisionPersisted.getFuture().isReady());
+ _coordinatorHasDecisionPersisted.setError(_abortReason.get());
+ return boost::none;
+ }
+ }();
+
+ if (abortSource) {
+ abortSource->cancel();
}
+
+ _critSec.reset();
}
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index 50d5a457faa..6d6ddd44c00 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -79,7 +79,7 @@ public:
~DonorStateMachine();
SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
- const CancellationToken& token) noexcept override;
+ const CancellationToken& stepdownToken) noexcept override;
void interrupt(Status status) override;
@@ -107,25 +107,49 @@ private:
DonorStateMachine(const ReshardingDonorDocument& donorDoc,
std::unique_ptr<DonorStateMachineExternalState> externalState);
+ /**
+ * Runs up until the donor is either in state kBlockingWrites or encountered an error.
+ */
+ ExecutorFuture<void> _runUntilBlockingWritesOrErrored(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) noexcept;
+
+ /**
+ * Notifies the coordinator if the donor is in kBlockingWrites or kError and waits for
+ * _coordinatorHasDecisionPersisted to be fulfilled (success) or for the abortToken to be
+ * canceled (failure or stepdown).
+ */
+ ExecutorFuture<void> _notifyCoordinatorAndAwaitDecision(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken) noexcept;
+
+ /**
+ * Finishes the work left remaining on the donor after the coordinator persists its decision to
+ * abort or complete resharding.
+ */
+ ExecutorFuture<void> _finishReshardingOperation(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& stepdownToken,
+ bool aborted) noexcept;
+
// The following functions correspond to the actions to take at a particular donor state.
void _transitionToPreparingToDonate();
void _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData();
ExecutorFuture<void> _awaitAllRecipientsDoneCloningThenTransitionToDonatingOplogEntries(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken);
ExecutorFuture<void> _awaitAllRecipientsDoneApplyingThenTransitionToPreparingToBlockWrites(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
+ const CancellationToken& abortToken);
void _writeTransactionOplogEntryThenTransitionToBlockingWrites();
- ExecutorFuture<void> _awaitCoordinatorHasDecisionPersistedThenTransitionToDropping(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
-
// Drops the original collection and throws if the returned status is not either Status::OK()
// or NamespaceNotFound.
- void _dropOriginalCollection();
+ void _dropOriginalCollectionThenTransitionToDone();
// Transitions the on-disk and in-memory state to 'newState'.
void _transitionState(DonorStateEnum newState);
@@ -151,9 +175,15 @@ private:
// Removes the local donor document from disk.
void _removeDonorDocument();
- // Does work necessary for both recoverable errors (failover/stepdown) and unrecoverable errors
- // (abort resharding).
- void _onAbortOrStepdown(WithLock lk, Status status);
+ // Initializes the _abortSource and generates a token from it to return back the caller. If an
+ // abort was reported prior to the initialization, automatically cancels the _abortSource before
+ // returning the token.
+ //
+ // Should only be called once per lifetime.
+ CancellationToken _initAbortSource(const CancellationToken& stepdownToken);
+
+ // Initiates the cancellation of the resharding operation.
+ void _onAbortEncountered(const Status& abortReason);
// The in-memory representation of the immutable portion of the document in
// config.localReshardingOperations.donor.
@@ -169,13 +199,20 @@ private:
// Protects the state below
Mutex _mutex = MONGO_MAKE_LATCH("DonorStateMachine::_mutex");
- // Contains the status with which the operation was aborted.
- boost::optional<Status> _abortStatus;
+ // Canceled by 2 different sources: (1) This DonorStateMachine when it learns of an
+ // unrecoverable error (2) The primary-only service instance driving this DonorStateMachine that
+ // cancels the parent CancellationSource upon stepdown/failover.
+ boost::optional<CancellationSource> _abortSource;
+
+ // Holds the unrecoverable error reported by the coordinator that caused the entire resharding
+ // operation to fail.
+ boost::optional<Status> _abortReason;
boost::optional<ReshardingCriticalSection> _critSec;
// Each promise below corresponds to a state on the donor state machine. They are listed in
- // ascending order, such that the first promise below will be the first promise fulfilled.
+ // ascending order, such that the first promise below will be the first promise fulfilled -
+ // fulfillment order is not necessarily maintained if the operation gets aborted.
SharedPromise<void> _allRecipientsDoneCloning;
SharedPromise<void> _allRecipientsDoneApplying;
diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
index d128f581a8c..ae24821b064 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
@@ -55,7 +55,7 @@ namespace mongo {
namespace {
class OpObserverForTest;
-class PauseDuringStateTransition;
+class PauseDuringStateTransitions;
class DonorStateTransitionController {
public:
@@ -68,7 +68,7 @@ public:
private:
friend OpObserverForTest;
- friend PauseDuringStateTransition;
+ friend PauseDuringStateTransitions;
void setPauseDuringTransition(DonorStateEnum state) {
stdx::lock_guard lk(_mutex);
@@ -97,24 +97,33 @@ private:
DonorStateEnum _state = DonorStateEnum::kUnused;
};
-class PauseDuringStateTransition {
+class PauseDuringStateTransitions {
public:
- PauseDuringStateTransition(DonorStateTransitionController* controller, DonorStateEnum state)
- : _controller{controller}, _state{state} {
- _controller->setPauseDuringTransition(_state);
+ PauseDuringStateTransitions(DonorStateTransitionController* controller,
+ std::vector<DonorStateEnum> states)
+ : _controller{controller}, _states{std::move(states)} {
+ for (auto state : _states) {
+ _controller->setPauseDuringTransition(state);
+ }
+ }
+
+ ~PauseDuringStateTransitions() {
+ for (auto state : _states) {
+ _controller->unsetPauseDuringTransition(state);
+ }
}
- ~PauseDuringStateTransition() {
- _controller->unsetPauseDuringTransition(_state);
+ void wait(DonorStateEnum state) {
+ _controller->waitUntilStateIsReached(state);
}
- void wait() {
- _controller->waitUntilStateIsReached(_state);
+ void unset(DonorStateEnum state) {
+ _controller->unsetPauseDuringTransition(state);
}
private:
DonorStateTransitionController* const _controller;
- const DonorStateEnum _state;
+ const std::vector<DonorStateEnum> _states;
};
class OpObserverForTest : public OpObserverNoop {
@@ -185,6 +194,11 @@ public:
_opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>(_controller));
}
+ void stepUp() {
+ auto opCtx = cc().makeOperationContext();
+ PrimaryOnlyServiceMongoDTest::stepUp(opCtx.get());
+ }
+
DonorStateTransitionController* controller() {
return _controller.get();
}
@@ -210,6 +224,15 @@ public:
return doc;
}
+ void createOriginalCollection(OperationContext* opCtx,
+ const ReshardingDonorDocument& donorDoc) {
+ CollectionOptions options;
+ options.uuid = donorDoc.getSourceUUID();
+ OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
+ opCtx);
+ resharding::data_copy::ensureCollectionExists(opCtx, donorDoc.getSourceNss(), options);
+ }
+
void notifyRecipientsDoneCloning(OperationContext* opCtx,
DonorStateMachine& donor,
const ReshardingDonorDocument& donorDoc) {
@@ -268,15 +291,15 @@ TEST_F(ReshardingDonorServiceTest, CanTransitionThroughEachStateToCompletion) {
}
TEST_F(ReshardingDonorServiceTest, WritesNoOpOplogEntryToGenerateMinFetchTimestamp) {
- boost::optional<PauseDuringStateTransition> donatingInitialDataTransitionGuard =
- PauseDuringStateTransition{controller(), DonorStateEnum::kDonatingInitialData};
+ boost::optional<PauseDuringStateTransitions> donatingInitialDataTransitionGuard =
+ PauseDuringStateTransitions{controller(), {DonorStateEnum::kDonatingInitialData}};
auto doc = makeStateDocument();
auto opCtx = makeOperationContext();
DonorStateMachine::insertStateDocument(opCtx.get(), doc);
auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
- donatingInitialDataTransitionGuard->wait();
+ donatingInitialDataTransitionGuard->wait(DonorStateEnum::kDonatingInitialData);
stepDown();
donatingInitialDataTransitionGuard.reset();
@@ -301,8 +324,8 @@ TEST_F(ReshardingDonorServiceTest, WritesNoOpOplogEntryToGenerateMinFetchTimesta
}
TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBlocked) {
- boost::optional<PauseDuringStateTransition> blockingWritesTransitionGuard =
- PauseDuringStateTransition{controller(), DonorStateEnum::kBlockingWrites};
+ boost::optional<PauseDuringStateTransitions> blockingWritesTransitionGuard =
+ PauseDuringStateTransitions{controller(), {DonorStateEnum::kBlockingWrites}};
auto doc = makeStateDocument();
auto opCtx = makeOperationContext();
@@ -312,7 +335,7 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl
notifyRecipientsDoneCloning(opCtx.get(), *donor, doc);
notifyToStartBlockingWrites(opCtx.get(), *donor, doc);
- blockingWritesTransitionGuard->wait();
+ blockingWritesTransitionGuard->wait(DonorStateEnum::kBlockingWrites);
stepDown();
blockingWritesTransitionGuard.reset();
@@ -348,17 +371,80 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl
<< cursor->nextSafe();
}
-TEST_F(ReshardingDonorServiceTest, DropsSourceCollectionWhenDone) {
+TEST_F(ReshardingDonorServiceTest, StepDownStepUpEachTransition) {
+ const std::vector<DonorStateEnum> _donorStates{DonorStateEnum::kDonatingInitialData,
+ DonorStateEnum::kDonatingOplogEntries,
+ DonorStateEnum::kPreparingToBlockWrites,
+ DonorStateEnum::kBlockingWrites,
+ DonorStateEnum::kDone};
+ boost::optional<PauseDuringStateTransitions> stateTransitionsGuard =
+ PauseDuringStateTransitions{controller(), _donorStates};
auto doc = makeStateDocument();
- auto opCtx = makeOperationContext();
+ {
+ auto opCtx = makeOperationContext();
+ DonorStateMachine::insertStateDocument(opCtx.get(), doc);
+ }
+ auto prevState = DonorStateEnum::kUnused;
+ for (const auto state : _donorStates) {
+ {
+ auto opCtx = makeOperationContext();
+ auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
+
+ if (prevState != DonorStateEnum::kUnused) {
+ // Allow the transition to prevState to succeed on this primary-only service
+ // instance.
+ stateTransitionsGuard->unset(prevState);
+ }
+
+ // Signal a change in the coordinator's state for donor state transitions dependent
+ // on it.
+ switch (state) {
+ case DonorStateEnum::kDonatingOplogEntries: {
+ notifyRecipientsDoneCloning(opCtx.get(), *donor, doc);
+ break;
+ }
+ case DonorStateEnum::kPreparingToBlockWrites: {
+ notifyToStartBlockingWrites(opCtx.get(), *donor, doc);
+ break;
+ }
+ case DonorStateEnum::kDone: {
+ notifyReshardingOutcomeDecided(opCtx.get(), *donor, doc, Status::OK());
+ break;
+ }
+ default:
+ break;
+ }
+
+ // Step down before the transition to state can complete.
+ stateTransitionsGuard->wait(state);
+ stepDown();
+
+ ASSERT_EQ(donor->getCompletionFuture().getNoThrow(),
+ ErrorCodes::InterruptedDueToReplStateChange);
+
+ prevState = state;
+ }
+
+ stepUp();
+ }
+
+ // Finally complete the operation and ensure its success.
{
- OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
- opCtx.get());
- CollectionOptions options;
- options.uuid = doc.getSourceUUID();
- resharding::data_copy::ensureCollectionExists(opCtx.get(), doc.getSourceNss(), options);
+ auto opCtx = makeOperationContext();
+ auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
+
+ stateTransitionsGuard->unset(DonorStateEnum::kDone);
+ notifyReshardingOutcomeDecided(opCtx.get(), *donor, doc, Status::OK());
+ ASSERT_OK(donor->getCompletionFuture().getNoThrow());
}
+}
+
+TEST_F(ReshardingDonorServiceTest, DropsSourceCollectionWhenDone) {
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+
+ createOriginalCollection(opCtx.get(), doc);
DonorStateMachine::insertStateDocument(opCtx.get(), doc);
auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
@@ -381,17 +467,50 @@ TEST_F(ReshardingDonorServiceTest, DropsSourceCollectionWhenDone) {
}
}
-TEST_F(ReshardingDonorServiceTest, RetainsSourceCollectionOnError) {
+TEST_F(ReshardingDonorServiceTest, CompletesWithStepdownAfterError) {
+ boost::optional<PauseDuringStateTransitions> stateTransitionsGuard =
+ PauseDuringStateTransitions{controller(), {DonorStateEnum::kDone}};
auto doc = makeStateDocument();
- auto opCtx = makeOperationContext();
+ {
+ auto opCtx = makeOperationContext();
+
+ createOriginalCollection(opCtx.get(), doc);
+
+ DonorStateMachine::insertStateDocument(opCtx.get(), doc);
+ auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
+
+ notifyRecipientsDoneCloning(opCtx.get(), *donor, doc);
+ notifyReshardingOutcomeDecided(opCtx.get(), *donor, doc, {ErrorCodes::InternalError, ""});
+ stateTransitionsGuard->wait(DonorStateEnum::kDone);
+ stepDown();
+
+ ASSERT_EQ(donor->getCompletionFuture().getNoThrow(),
+ ErrorCodes::InterruptedDueToReplStateChange);
+ }
+ stepUp();
{
- CollectionOptions options;
- options.uuid = doc.getSourceUUID();
- OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE unsafeCreateCollection(
- opCtx.get());
- resharding::data_copy::ensureCollectionExists(opCtx.get(), doc.getSourceNss(), options);
+ auto opCtx = makeOperationContext();
+ auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
+
+ stateTransitionsGuard->unset(DonorStateEnum::kDone);
+
+ notifyReshardingOutcomeDecided(opCtx.get(), *donor, doc, {ErrorCodes::InternalError, ""});
+ ASSERT_EQ(donor->getCompletionFuture().getNoThrow(), ErrorCodes::InternalError);
+ {
+ // Verify original collection still exists even with stepdown.
+ AutoGetCollection coll(opCtx.get(), doc.getSourceNss(), MODE_IS);
+ ASSERT_TRUE(bool(coll));
+ ASSERT_EQ(coll->uuid(), doc.getSourceUUID());
+ }
}
+}
+
+TEST_F(ReshardingDonorServiceTest, RetainsSourceCollectionOnError) {
+ auto doc = makeStateDocument();
+ auto opCtx = makeOperationContext();
+
+ createOriginalCollection(opCtx.get(), doc);
DonorStateMachine::insertStateDocument(opCtx.get(), doc);
auto donor = DonorStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl
index bfbcb63d3a3..ee3d88aab52 100644
--- a/src/mongo/s/resharding/common_types.idl
+++ b/src/mongo/s/resharding/common_types.idl
@@ -60,10 +60,9 @@ enums:
kDonatingInitialData: "donating-initial-data"
kDonatingOplogEntries: "donating-oplog-entries"
kPreparingToBlockWrites: "preparing-to-block-writes"
+ kError: "error"
kBlockingWrites: "blocking-writes"
- kDropping: "dropping"
kDone: "done"
- kError: "error"
RecipientState:
description: "The current state of a recipient shard for a resharding operation."
@@ -75,10 +74,10 @@ enums:
kCloning: "cloning"
kApplying: "applying"
kSteadyState: "steady-state"
+ kError: "error"
kStrictConsistency: "strict-consistency"
kRenaming: "renaming"
kDone: "done"
- kError: "error"
ReshardingOperationStatus:
description: "The status of the current or most recent resharding operation."