summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2020-10-06 18:56:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-14 18:47:33 +0000
commitc0517dc19f8fac5f9d7472098a345ab7ce9ce661 (patch)
treed27c9702798c0b14ba9675139e473780220e3b3c
parent5a9dd0cbec1f4293e966321885cccb549c77e59c (diff)
downloadmongo-c0517dc19f8fac5f9d7472098a345ab7ce9ce661.tar.gz
SERVER-51222 Calculate fetchTimestamp before creating temporary resharding collection
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.cpp16
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.h8
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp51
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp27
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h7
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_test.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp44
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h7
-rw-r--r--src/mongo/s/resharding/common_types.idl4
10 files changed, 59 insertions, 119 deletions
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index ab881abd19d..d024f4f1e4e 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -207,6 +207,12 @@ public:
opCtx, service, coordinatorDoc.toBSON());
instance->setInitialChunksAndZones(initialChunks, newZones);
+
+ // This promise will currently be falsely fulfilled by a call to interrupt() inside
+ // the ReshardingCoordinatorService. This is to enable jsTests to pass while code
+ // is still being committed.
+ // TODO SERVER-51212 Change this comment and assess the current call to .wait().
+ instance->getObserver()->awaitAllDonorsReadyToDonate().wait();
}
private:
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
index 2d70149edc7..961e24d93ac 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
@@ -144,7 +144,6 @@ ReshardingCoordinatorObserver::ReshardingCoordinatorObserver() = default;
ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() {
stdx::lock_guard<Latch> lg(_mutex);
- invariant(_allRecipientsCreatedCollection.getFuture().isReady());
invariant(_allDonorsReportedMinFetchTimestamp.getFuture().isReady());
invariant(_allRecipientsFinishedCloning.getFuture().isReady());
invariant(_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady());
@@ -157,13 +156,6 @@ void ReshardingCoordinatorObserver::onReshardingParticipantTransition(
stdx::lock_guard<Latch> lk(_mutex);
- if (stateTransitionIncomplete(lk,
- _allRecipientsCreatedCollection,
- RecipientStateEnum::kInitialized,
- updatedStateDoc)) {
- return;
- }
-
if (stateTransitionIncomplete(
lk, _allDonorsReportedMinFetchTimestamp, DonorStateEnum::kDonating, updatedStateDoc)) {
return;
@@ -193,11 +185,6 @@ void ReshardingCoordinatorObserver::onReshardingParticipantTransition(
}
SharedSemiFuture<ReshardingCoordinatorDocument>
-ReshardingCoordinatorObserver::awaitAllRecipientsCreatedCollection() {
- return _allRecipientsCreatedCollection.getFuture();
-}
-
-SharedSemiFuture<ReshardingCoordinatorDocument>
ReshardingCoordinatorObserver::awaitAllDonorsReadyToDonate() {
return _allDonorsReportedMinFetchTimestamp.getFuture();
}
@@ -224,9 +211,6 @@ ReshardingCoordinatorObserver::awaitAllRecipientsRenamedCollection() {
void ReshardingCoordinatorObserver::interrupt(Status status) {
stdx::lock_guard<Latch> lg(_mutex);
- if (!_allRecipientsCreatedCollection.getFuture().isReady()) {
- _allRecipientsCreatedCollection.setError(status);
- }
if (!_allDonorsReportedMinFetchTimestamp.getFuture().isReady()) {
_allDonorsReportedMinFetchTimestamp.setError(status);
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
index f721e04d8fc..4199c000f43 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
@@ -65,12 +65,6 @@ public:
void onReshardingParticipantTransition(const ReshardingCoordinatorDocument& updatedStateDoc);
/**
- * Fulfills the '_allRecipientsCreatedCollection' promise when the last recipient writes that it
- * is in 'initializing' state.
- */
- SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllRecipientsCreatedCollection();
-
- /**
* When the last donor reports its 'minFetchTimestamp', selects the highest 'minFetchTimestamp'
* of all donors to be the 'fetchTimestamp'. Fulfills the '_allDonorsReportedMinFetchTimestamp'
* promise with this 'fetchTimestamp'.
@@ -119,14 +113,12 @@ private:
* Below are the relationships between promise and expected state in
* format: {promiseToFulfill, expectedState}
*
- * {_allRecipientsCreatedCollection, RecipientStateEnum::kInitialized}
* {_allDonorsReportedMinFetchTimestamp, DonorStateEnum::kDonating}
* {_allRecipientsFinishedCloning, RecipientStateEnum::kSteadyState}
* {_allRecipientsReportedStrictConsistencyTimestamp, RecipientStateEnum::kStrictConsistency}
* {_allRecipientsRenamedCollection, RecipientStateEnum::kDone}
* {_allDonorsDroppedOriginalCollection, DonorStateEnum::kDone}
*/
- SharedPromise<ReshardingCoordinatorDocument> _allRecipientsCreatedCollection;
SharedPromise<ReshardingCoordinatorDocument> _allDonorsReportedMinFetchTimestamp;
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
index 37c245bde0b..d2db3ab71f2 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
@@ -69,20 +69,20 @@ protected:
TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionSucceeds) {
auto reshardingObserver = std::make_shared<ReshardingCoordinatorObserver>();
- auto fut = reshardingObserver->awaitAllRecipientsCreatedCollection();
+ auto fut = reshardingObserver->awaitAllRecipientsFinishedCloning();
ASSERT_FALSE(fut.isReady());
- auto donorShards = makeMockDonorsInState(DonorStateEnum::kUnused);
+ auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonating, Timestamp());
std::vector<RecipientShardEntry> recipientShards0{
- makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kInitializing),
- makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kInitialized)};
+ makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning),
+ makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kSteadyState)};
auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
ASSERT_FALSE(fut.isReady());
std::vector<RecipientShardEntry> recipientShards1{
- makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kInitialized),
- makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kInitialized)};
+ makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kSteadyState),
+ makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kSteadyState)};
auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
ASSERT_TRUE(fut.isReady());
@@ -92,32 +92,32 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionSucce
TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionTwoOutOfOrder) {
auto reshardingObserver = std::make_shared<ReshardingCoordinatorObserver>();
- auto fut = reshardingObserver->awaitAllRecipientsCreatedCollection();
+ auto fut = reshardingObserver->awaitAllRecipientsFinishedCloning();
ASSERT_FALSE(fut.isReady());
- // By default, all donors should be kUnused at this stage.
- auto donorShards = makeMockDonorsInState(DonorStateEnum::kUnused);
+ // By default, all donors should be kDonating at this stage.
+ auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonating, Timestamp());
std::vector<RecipientShardEntry> recipientShards0{
- {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kInitializing)},
- {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kInitialized)},
- {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kInitialized)}};
+ {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
+ {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kSteadyState)},
+ {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kSteadyState)}};
auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
ASSERT_FALSE(fut.isReady());
std::vector<RecipientShardEntry> recipientShards1{
- {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kInitializing)},
- {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kInitialized)},
- {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kInitializing)}};
+ {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
+ {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kSteadyState)},
+ {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kCloning)}};
auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
ASSERT_FALSE(fut.isReady());
std::vector<RecipientShardEntry> recipientShards2{
- {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kInitialized)},
- {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kInitialized)},
- {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kInitialized)}};
+ {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kSteadyState)},
+ {makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kSteadyState)},
+ {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kSteadyState)}};
auto coordinatorDoc2 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards2, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc2);
ASSERT_TRUE(fut.isReady());
@@ -127,16 +127,16 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionTwoOu
TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) {
auto reshardingObserver = std::make_shared<ReshardingCoordinatorObserver>();
- auto fut = reshardingObserver->awaitAllRecipientsCreatedCollection();
+ auto fut = reshardingObserver->awaitAllRecipientsFinishedCloning();
ASSERT_FALSE(fut.isReady());
- // By default, all donors should be kUnused at this stage.
- auto donorShards = makeMockDonorsInState(DonorStateEnum::kUnused);
+ // By default, all donors should be kDonating at this stage.
+ auto donorShards = makeMockDonorsInState(DonorStateEnum::kDonating, Timestamp());
std::vector<RecipientShardEntry> recipientShards0{
- {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kInitializing)},
+ {makeRecipientShard(ShardId{"s1"}, RecipientStateEnum::kCloning)},
{makeRecipientShard(ShardId{"s2"}, RecipientStateEnum::kError)},
- {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kInitialized)}};
+ {makeRecipientShard(ShardId{"s3"}, RecipientStateEnum::kSteadyState)}};
auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards0, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
auto resp = fut.getNoThrow();
@@ -153,8 +153,8 @@ TEST_F(ReshardingCoordinatorObserverTest, onDonorsReportedMinFetchTimestamp) {
auto fut = reshardingObserver->awaitAllDonorsReadyToDonate();
ASSERT_FALSE(fut.isReady());
- // By default, all recipients should be kInitialized at this stage.
- auto recipientShards = makeMockRecipientsInState(RecipientStateEnum::kInitialized);
+ // By default, all recipients should be kUnused at this stage.
+ auto recipientShards = makeMockRecipientsInState(RecipientStateEnum::kUnused);
std::vector<DonorShardEntry> donorShards0{
{makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDonating, Timestamp())},
@@ -162,7 +162,6 @@ TEST_F(ReshardingCoordinatorObserverTest, onDonorsReportedMinFetchTimestamp) {
auto coordinatorDoc0 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards0);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc0);
ASSERT_FALSE(fut.isReady());
- ASSERT_TRUE(reshardingObserver->awaitAllRecipientsCreatedCollection().isReady());
std::vector<DonorShardEntry> donorShards1{
{makeDonorShard(ShardId{"s1"}, DonorStateEnum::kDonating, Timestamp())},
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 1c7c864dfb4..e17e86635bd 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -92,7 +92,7 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
BatchedCommandRequest request([&] {
auto nextState = coordinatorDoc.getState();
switch (nextState) {
- case CoordinatorStateEnum::kInitialized:
+ case CoordinatorStateEnum::kPreparingToDonate:
// Insert the new coordinator state document
return buildInsertOp(NamespaceString::kConfigReshardingOperationsNamespace,
std::vector<BSONObj>{coordinatorDoc.toBSON()});
@@ -130,7 +130,7 @@ BSONObj createReshardingFieldsUpdateForOriginalNss(
boost::optional<OID> newCollectionEpoch) {
auto nextState = coordinatorDoc.getState();
switch (nextState) {
- case CoordinatorStateEnum::kInitialized: {
+ case CoordinatorStateEnum::kPreparingToDonate: {
// Append 'reshardingFields' to the config.collections entry for the original nss
TypeCollectionReshardingFields originalEntryReshardingFields(coordinatorDoc.get_id());
originalEntryReshardingFields.setState(coordinatorDoc.getState());
@@ -200,7 +200,7 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx,
BatchedCommandRequest request([&] {
auto nextState = coordinatorDoc.getState();
switch (nextState) {
- case CoordinatorStateEnum::kInitialized: {
+ case CoordinatorStateEnum::kPreparingToDonate: {
// Insert new entry for the temporary nss into config.collections
auto collType = resharding::createTempReshardingCollectionType(
opCtx, coordinatorDoc, chunkVersion.get(), collation.get());
@@ -528,8 +528,6 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
.then([this, executor] { return _init(executor); })
- .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
- .then([this, executor] { return _awaitAllRecipientsCreatedCollection(executor); })
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllDonorsReadyToDonate(executor); })
.then([this, executor] { _tellAllRecipientsToRefresh(executor); })
@@ -634,7 +632,7 @@ ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::_init(
// Create state document that will be written to disk and afterward set to the in-memory
// _coordinatorDoc
ReshardingCoordinatorDocument updatedCoordinatorDoc = _coordinatorDoc;
- updatedCoordinatorDoc.setState(CoordinatorStateEnum::kInitialized);
+ updatedCoordinatorDoc.setState(CoordinatorStateEnum::kPreparingToDonate);
auto opCtx = cc().makeOperationContext();
resharding::persistInitialStateAndCatalogUpdates(
@@ -646,26 +644,15 @@ ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::_init(
}
ExecutorFuture<void>
-ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsCreatedCollection(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- if (_coordinatorDoc.getState() > CoordinatorStateEnum::kInitialized) {
- return ExecutorFuture<void>(**executor, Status::OK());
- }
-
- return _reshardingCoordinatorObserver->awaitAllRecipientsCreatedCollection()
- .thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
- this->_runUpdates(CoordinatorStateEnum::kPreparingToDonate, updatedStateDoc);
- });
-}
-
-ExecutorFuture<void>
ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonate(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kPreparingToDonate) {
return ExecutorFuture<void>(**executor, Status::OK());
}
+ // TODO SERVER-51212 Remove this call.
+ interrupt({ErrorCodes::InternalError, "Early exit to support jsTesting"});
+
return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate()
.thenRunOn(**executor)
.then([this](ReshardingCoordinatorDocument updatedStateDoc) {
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index 0f1a6ba316d..7875068c763 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -146,13 +146,6 @@ private:
ExecutorFuture<void> _init(const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
/**
- * Waits on _reshardingCoordinatorObserver to notify that all recipients have created the
- * temporary collection. Transitions to 'kPreparingToDonate'.
- */
- ExecutorFuture<void> _awaitAllRecipientsCreatedCollection(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
-
- /**
* Waits on _reshardingCoordinatorObserver to notify that all donors have picked a
* minFetchTimestamp and are ready to donate. Transitions to 'kCloning'.
*/
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
index e7e15866b58..66256efcac6 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
@@ -541,7 +541,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) {
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
- expectedCoordinatorDoc.setState(CoordinatorStateEnum::kInitialized);
+ expectedCoordinatorDoc.setState(CoordinatorStateEnum::kPreparingToDonate);
persistInitialStateAndCatalogUpdatesExpectSuccess(
operationContext(), expectedCoordinatorDoc, initialChunks, newZones);
@@ -549,11 +549,11 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) {
TEST_F(ReshardingCoordinatorPersistenceTest, PersistBasicStateTransitionSucceeds) {
auto coordinatorDoc =
- insertStateAndCatalogEntries(CoordinatorStateEnum::kInitialized, _originalEpoch);
+ insertStateAndCatalogEntries(CoordinatorStateEnum::kCloning, _originalEpoch);
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
- expectedCoordinatorDoc.setState(CoordinatorStateEnum::kPreparingToDonate);
+ expectedCoordinatorDoc.setState(CoordinatorStateEnum::kMirroring);
persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc);
}
@@ -667,7 +667,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest,
auto newZones = makeZones(_tempNss, _newShardKey);
auto expectedCoordinatorDoc = coordinatorDoc;
- expectedCoordinatorDoc.setState(CoordinatorStateEnum::kInitialized);
+ expectedCoordinatorDoc.setState(CoordinatorStateEnum::kPreparingToDonate);
// Do not create the config.collections entry for the original collection
ASSERT_THROWS_CODE(resharding::persistInitialStateAndCatalogUpdates(
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 6243147a204..6fa3ca44f4d 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -134,7 +134,6 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
- invariant(_allDonorsPreparedToDonate.getFuture().isReady());
invariant(_allDonorsMirroring.getFuture().isReady());
invariant(_coordinatorHasCommitted.getFuture().isReady());
invariant(_completionPromise.getFuture().isReady());
@@ -143,10 +142,8 @@ ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
void ReshardingRecipientService::RecipientStateMachine::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
- .then([this] { _createTemporaryReshardingCollectionThenTransitionToInitialized(); })
- .then([this, executor] {
- return _awaitAllDonorsPreparedToDonateThenTransitionToCloning(executor);
- })
+ .then([this] { _transitionToCreatingTemporaryReshardingCollection(); })
+ .then([this] { _createTemporaryReshardingCollectionThenTransitionToCloning(); })
.then([this] { return _cloneThenTransitionToApplying(); })
.then([this] { return _applyThenTransitionToSteadyState(); })
.then([this, executor] {
@@ -186,9 +183,6 @@ void ReshardingRecipientService::RecipientStateMachine::run(
void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) {
// Resolve any unresolved promises to avoid hanging.
stdx::lock_guard<Latch> lg(_mutex);
- if (!_allDonorsPreparedToDonate.getFuture().isReady()) {
- _allDonorsPreparedToDonate.setError(status);
- }
if (!_allDonorsMirroring.getFuture().isReady()) {
_allDonorsMirroring.setError(status);
@@ -207,29 +201,24 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange
boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
void ReshardingRecipientService::RecipientStateMachine::
- _createTemporaryReshardingCollectionThenTransitionToInitialized() {
- if (_recipientDoc.getState() > RecipientStateEnum::kInitializing) {
+ _transitionToCreatingTemporaryReshardingCollection() {
+ if (_recipientDoc.getState() > RecipientStateEnum::kCreatingCollection) {
return;
}
- // TODO SERVER-51217: Call
- // resharding_recipient_service_util::createTemporaryReshardingCollectionLocally()
-
- _transitionState(RecipientStateEnum::kInitialized);
+ _transitionState(RecipientStateEnum::kCreatingCollection);
}
-ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
- _awaitAllDonorsPreparedToDonateThenTransitionToCloning(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- if (_recipientDoc.getState() > RecipientStateEnum::kInitialized) {
- return ExecutorFuture<void>(**executor, Status::OK());
+void ReshardingRecipientService::RecipientStateMachine::
+ _createTemporaryReshardingCollectionThenTransitionToCloning() {
+ if (_recipientDoc.getState() > RecipientStateEnum::kCreatingCollection) {
+ return;
}
- return _allDonorsPreparedToDonate.getFuture()
- .thenRunOn(**executor)
- .then([this](Timestamp fetchTimestamp) {
- _transitionState(RecipientStateEnum::kCloning, fetchTimestamp);
- });
+ // TODO SERVER-51217: Call
+ // resharding_recipient_service_util::createTemporaryReshardingCollectionLocally()
+
+ _transitionState(RecipientStateEnum::kCloning);
}
void ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying() {
@@ -281,16 +270,11 @@ void ReshardingRecipientService::RecipientStateMachine::
_transitionState(RecipientStateEnum::kDone);
}
-void ReshardingRecipientService::RecipientStateMachine::_fulfillAllDonorsPreparedToDonate(
- Timestamp fetchTimestamp) {
- _allDonorsPreparedToDonate.emplaceValue(fetchTimestamp);
-}
-
void ReshardingRecipientService::RecipientStateMachine::_transitionState(
RecipientStateEnum endState, boost::optional<Timestamp> fetchTimestamp) {
ReshardingRecipientDocument replacementDoc(_recipientDoc);
replacementDoc.setState(endState);
- if (endState == RecipientStateEnum::kInitialized) {
+ if (endState == RecipientStateEnum::kCreatingCollection) {
_insertRecipientDocument(replacementDoc);
return;
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 34ccea35ab8..6b13827e31b 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -111,10 +111,9 @@ public:
private:
// The following functions correspond to the actions to take at a particular recipient state.
- void _createTemporaryReshardingCollectionThenTransitionToInitialized();
+ void _transitionToCreatingTemporaryReshardingCollection();
- ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCloning(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+ void _createTemporaryReshardingCollectionThenTransitionToCloning();
void _cloneThenTransitionToApplying();
@@ -155,8 +154,6 @@ private:
// Each promise below corresponds to a state on the recipient state machine. They are listed in
// ascending order, such that the first promise below will be the first promise fulfilled.
- SharedPromise<Timestamp> _allDonorsPreparedToDonate;
-
SharedPromise<void> _allDonorsMirroring;
SharedPromise<void> _coordinatorHasCommitted;
diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl
index db35041619c..52e3886d415 100644
--- a/src/mongo/s/resharding/common_types.idl
+++ b/src/mongo/s/resharding/common_types.idl
@@ -42,7 +42,6 @@ enums:
values:
kUnused: "unused"
kInitializing: "initializing"
- kInitialized: "initialized"
kPreparingToDonate: "preparing-to-donate"
kCloning: "cloning"
kMirroring: "mirroring"
@@ -69,8 +68,7 @@ enums:
type: string
values:
kUnused: "unused"
- kInitializing: "initializing"
- kInitialized: "initialized"
+ kCreatingCollection: "creating-collection"
kCloning: "cloning"
kApplying: "applying"
kSteadyState: "steady-state"