summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-02-24 15:08:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-24 16:41:46 +0000
commit6ca44ce30689b3fe93d3afe75f07c70a0fdc3d8d (patch)
treea84afaf9fbebe581a7af6d103e58ea974cc366a2 /src
parentfdfc0daa5e3c46ac7a2b6e08d98528b161217467 (diff)
downloadmongo-6ca44ce30689b3fe93d3afe75f07c70a0fdc3d8d.tar.gz
SERVER-54513 Add kAwaitingFetchTimestamp to resharding recipient states.
Also changes the RecoverRefreshThread to insert the donor and recipient state documents rather than the primary-only service Instance itself.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp7
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp63
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp14
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h5
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp14
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h6
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp60
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h11
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp23
-rw-r--r--src/mongo/s/resharding/common_types.idl1
10 files changed, 134 insertions, 70 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index b31eebd66a0..097e97e6fff 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -60,6 +60,7 @@ namespace {
using namespace fmt::literals;
+MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCloning);
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorInSteadyState);
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeDecisionPersisted);
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCompletion)
@@ -924,6 +925,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
.then([this, executor] { _insertCoordDocAndChangeOrigCollEntry(); })
.then([this, executor] { _calculateParticipantsAndChunksThenWriteToDisk(); })
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
+ .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
.then([this, executor] { return _awaitAllDonorsReadyToDonate(executor); })
.then([this, executor] { _tellAllRecipientsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); })
@@ -1086,6 +1088,11 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate()
.thenRunOn(**executor)
.then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
+ {
+ auto opCtx = cc().makeOperationContext();
+ reshardingPauseCoordinatorBeforeCloning.pauseWhileSet(opCtx.get());
+ }
+
auto highestMinFetchTimestamp =
getHighestMinFetchTimestamp(coordinatorDocChangedOnDisk.getDonorShards());
_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kCloning,
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
index 99872599e00..dfd42fe7bb9 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -32,6 +32,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
+#include "mongo/db/storage/duplicate_key_error_info.h"
#include <fmt/format.h>
@@ -59,14 +60,43 @@ std::vector<DonorShardMirroringEntry> createDonorShardMirroringEntriesFromDonorS
}
/*
- * Creates a ReshardingStateMachine with the assumption that the state machine does not already
- * exist.
+ * Creates a ReshardingStateMachine if this node is primary and the ReshardingStateMachine doesn't
+ * already exist.
+ *
+ * It is safe to call this function when this node is actually a secondary.
*/
template <class Service, class StateMachine, class ReshardingDocument>
void createReshardingStateMachine(OperationContext* opCtx, const ReshardingDocument& doc) {
- auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
- auto service = registry->lookupServiceByName(Service::kServiceName);
- StateMachine::getOrCreate(opCtx, service, doc.toBSON());
+ try {
+ // Inserting the resharding state document must happen synchronously with the shard version
+ // refresh for the w:majority wait from the resharding coordinator to mean that this replica
+ // set shard cannot forget about being a participant.
+ StateMachine::insertStateDocument(opCtx, doc);
+
+ auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
+ auto service = registry->lookupServiceByName(Service::kServiceName);
+ StateMachine::getOrCreate(opCtx, service, doc.toBSON());
+ } catch (const ExceptionForCat<ErrorCategory::NotPrimaryError>&) {
+ // resharding::processReshardingFieldsForCollection() is called on both primary and
+ // secondary nodes as part of the shard version being refreshed. Due to the RSTL lock not
+ // being held throughout the shard version refresh, it is also possible for the node to
+ // arbitrarily step down and step up during the shard version refresh. Rather than
+ // attempt to prevent replica set member state transitions during the shard version refresh,
+ // we instead swallow the NotPrimaryError exception. This is safe because there is no work a
+ // secondary (or primary which stepped down) must do for an active resharding operation upon
+ // refreshing its shard version. The primary is solely responsible for advancing the
+ // participant state as a result of the shard version refresh.
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
+ // Similar to the ErrorCategory::NotPrimaryError clause above, it is theoretically possible
+ // for a series of stepdowns and step-ups to lead a scenario where a stale but now
+ // re-elected primary attempts to insert the state document when another node which was
+ // primary had already done so. Again, rather than attempt to prevent replica set member
+ // state transitions during the shard version refresh, we instead swallow the DuplicateKey
+ // exception. This is safe because PrimaryOnlyService::onStepUp() will have constructed a
+ // new instance of the resharding state machine.
+ auto dupeKeyInfo = ex.extraInfo<DuplicateKeyErrorInfo>();
+ invariant(dupeKeyInfo->getDuplicatedKeyValue().binaryEqual(BSON("_id" << doc.get_id())));
+ }
}
/**
@@ -207,17 +237,15 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx,
return;
}
- // If a resharding operation is past state kCloning but does not currently have a recipient
- // document in-memory, this means that the document will be recovered by the
+ // If a resharding operation is past state kPreparingToDonate but does not currently have a
+ // recipient document in-memory, this means that the document will be recovered by the
// ReshardingRecipientService, and at that time the latest instance of 'reshardingFields'
// will be read. Return no-op.
//
- // The RecipientStateMachine creates the temporary resharding collection immediately after being
- // constructed. If a resharding operation has yet to reach state kCloning, then some donor
- // shards may not be prepared for the recipient to start cloning. We avoid constructing the
- // RecipientStateMachine until all donor shards are known to be prepared for the recipient to
- // start cloning.
- if (reshardingFields.getState() != CoordinatorStateEnum::kCloning) {
+ // We construct the RecipientStateMachine in the kPreparingToDonate state (which is the same
+ // state as when we would construct the DonorStateMachine) so the resharding coordinator can
+ // rely on all of the state machines being constructed as part of the same state transition.
+ if (reshardingFields.getState() != CoordinatorStateEnum::kPreparingToDonate) {
return;
}
@@ -288,11 +316,15 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
OperationContext* opCtx,
const CollectionMetadata& metadata,
const ReshardingFields& reshardingFields) {
+ // The recipient state machines are created before the donor shards are prepared to donate but
+ // will remain idle until the donor shards are prepared to donate.
+ invariant(!reshardingFields.getRecipientFields()->getFetchTimestamp());
+
std::vector<DonorShardMirroringEntry> donorShards =
createDonorShardMirroringEntriesFromDonorShardIds(
reshardingFields.getRecipientFields()->getDonorShardIds());
- auto recipientDoc = ReshardingRecipientDocument(RecipientStateEnum::kCreatingCollection,
+ auto recipientDoc = ReshardingRecipientDocument(RecipientStateEnum::kAwaitingFetchTimestamp,
std::move(donorShards));
auto commonMetadata =
@@ -302,9 +334,6 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
metadata.getShardKeyPattern().toBSON());
recipientDoc.setCommonReshardingMetadata(std::move(commonMetadata));
- emplaceFetchTimestampIfExists(recipientDoc,
- reshardingFields.getRecipientFields()->getFetchTimestamp());
-
return recipientDoc;
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
index a582302f029..0da9f7f56b1 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -66,10 +66,10 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest,
auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard);
auto reshardingFields =
- createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning);
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
appendRecipientFieldsToReshardingFields(
- reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp);
+ reshardingFields, kShardIds, kExistingUUID, kOriginalNss);
auto recipientDoc = resharding::constructRecipientDocumentFromReshardingFields(
opCtx, metadata, reshardingFields);
@@ -103,9 +103,9 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) {
auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard);
auto reshardingFields =
- createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning);
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
appendRecipientFieldsToReshardingFields(
- reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp);
+ reshardingFields, kShardIds, kExistingUUID, kOriginalNss);
resharding::processReshardingFieldsForCollection(
opCtx, kTemporaryReshardingNss, metadata, reshardingFields);
@@ -185,9 +185,9 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessRecipientFieldsWhenShardDoesnt
auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kOtherShard);
auto reshardingFields =
- createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning);
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
appendRecipientFieldsToReshardingFields(
- reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp);
+ reshardingFields, kShardIds, kExistingUUID, kOriginalNss);
resharding::processReshardingFieldsForCollection(
opCtx, kTemporaryReshardingNss, metadata, reshardingFields);
@@ -206,7 +206,7 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessReshardingFieldsWithoutDonorOr
auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard);
auto reshardingFields =
- createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning);
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
ASSERT_THROWS_CODE(resharding::processReshardingFieldsForCollection(
opCtx, kTemporaryReshardingNss, metadata, reshardingFields),
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
index 1f6ec86230c..8b8c2390a06 100644
--- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h
@@ -187,9 +187,8 @@ protected:
metadata.getShardKeyPattern().toBSON(),
recipientDoc);
- ASSERT(recipientDoc.getState() == RecipientStateEnum::kCreatingCollection);
- ASSERT(recipientDoc.getFetchTimestamp() ==
- reshardingFields.getRecipientFields()->getFetchTimestamp());
+ ASSERT(recipientDoc.getState() == RecipientStateEnum::kAwaitingFetchTimestamp);
+ ASSERT(!recipientDoc.getFetchTimestamp());
auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds();
auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end());
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 816f2084fb4..c7aacf2e4b9 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -58,6 +58,9 @@ MONGO_FAIL_POINT_DEFINE(reshardingDonorFailsBeforePreparingToMirror);
using namespace fmt::literals;
namespace {
+
+const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)};
+
ChunkManager getShardedCollectionRoutingInfoWithRefreshAndFlush(const NamespaceString& nss) {
auto opCtx = cc().makeOperationContext();
@@ -259,8 +262,6 @@ void ReshardingDonorService::DonorStateMachine::
return;
}
- _insertDonorDocument(_donorDoc);
-
ReshardingCloneSize cloneSizeEstimate;
{
auto opCtx = cc().makeOperationContext();
@@ -506,14 +507,11 @@ void ReshardingDonorService::DonorStateMachine::_transitionStateAndUpdateCoordin
ShardingCatalogClient::kMajorityWriteConcern));
}
-void ReshardingDonorService::DonorStateMachine::_insertDonorDocument(
- const ReshardingDonorDocument& doc) {
- auto opCtx = cc().makeOperationContext();
+void ReshardingDonorService::DonorStateMachine::insertStateDocument(
+ OperationContext* opCtx, const ReshardingDonorDocument& donorDoc) {
PersistentTaskStore<ReshardingDonorDocument> store(
NamespaceString::kDonorReshardingOperationsNamespace);
- store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern);
-
- _donorDoc = doc;
+ store.add(opCtx, donorDoc, kNoWaitWriteConcern);
}
void ReshardingDonorService::DonorStateMachine::_updateDonorDocument(
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index d0d4b8fc626..d2fe055e11a 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -97,6 +97,9 @@ public:
SharedSemiFuture<void> awaitFinalOplogEntriesWritten();
+ static void insertStateDocument(OperationContext* opCtx,
+ const ReshardingDonorDocument& donorDoc);
+
private:
// The following functions correspond to the actions to take at a particular donor state.
void _transitionToPreparingToDonate();
@@ -129,9 +132,6 @@ private:
boost::optional<Status> abortReason = boost::none,
boost::optional<ReshardingCloneSize> cloneSizeEstimate = boost::none);
- // Inserts 'doc' on-disk and sets '_donorDoc' in-memory.
- void _insertDonorDocument(const ReshardingDonorDocument& doc);
-
// Updates the donor document on-disk and in-memory with the 'replacementDoc.'
void _updateDonorDocument(ReshardingDonorDocument&& replacementDoc);
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 8ebd4e0298b..1f718f4a031 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -66,6 +66,8 @@ MONGO_FAIL_POINT_DEFINE(removeRecipientDocFailpoint);
namespace {
+const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)};
+
std::shared_ptr<executor::ThreadPoolTaskExecutor> makeTaskExecutor(StringData name,
size_t maxThreads) {
ThreadPool::Limits threadPoolLimits;
@@ -92,6 +94,16 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) {
}
}
+void ensureFulfilledPromise(WithLock lk, SharedPromise<Timestamp>& sp, Timestamp ts) {
+ auto future = sp.getFuture();
+ if (!future.isReady()) {
+ sp.emplaceValue(ts);
+ } else {
+ // Ensure that we would only attempt to fulfill the promise with the same Timestamp value.
+ invariant(future.get() == ts);
+ }
+}
+
} // namespace
namespace resharding {
@@ -213,6 +225,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
+ invariant(_allDonorsPreparedToDonate.getFuture().isReady());
invariant(_coordinatorHasDecisionPersisted.getFuture().isReady());
invariant(_completionPromise.getFuture().isReady());
}
@@ -221,9 +234,9 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancelationToken& cancelToken) noexcept {
return ExecutorFuture<void>(**executor)
- .then([this] {
+ .then([this, executor] {
_metrics()->onStart();
- _transitionToCreatingTemporaryReshardingCollection();
+ return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(executor);
})
.then([this] { _createTemporaryReshardingCollectionThenTransitionToCloning(); })
.then([this, executor, cancelToken] {
@@ -316,18 +329,31 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange
}
auto coordinatorState = reshardingFields.getState();
+
+ if (coordinatorState >= CoordinatorStateEnum::kCloning) {
+ auto fetchTimestamp = reshardingFields.getRecipientFields()->getFetchTimestamp();
+ invariant(fetchTimestamp);
+ ensureFulfilledPromise(lk, _allDonorsPreparedToDonate, *fetchTimestamp);
+ }
+
if (coordinatorState >= CoordinatorStateEnum::kDecisionPersisted) {
ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted);
}
}
-void ReshardingRecipientService::RecipientStateMachine::
- _transitionToCreatingTemporaryReshardingCollection() {
- if (_recipientDoc.getState() > RecipientStateEnum::kCreatingCollection) {
- return;
+ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
+ _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ if (_recipientDoc.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
+ invariant(_recipientDoc.getFetchTimestamp());
+ return ExecutorFuture(**executor);
}
- _transitionState(RecipientStateEnum::kCreatingCollection);
+ return _allDonorsPreparedToDonate.getFuture()
+ .thenRunOn(**executor)
+ .then([this](Timestamp fetchTimestamp) {
+ _transitionState(RecipientStateEnum::kCreatingCollection, fetchTimestamp);
+ });
}
void ReshardingRecipientService::RecipientStateMachine::
@@ -632,15 +658,10 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
RecipientStateEnum endState,
boost::optional<Timestamp> fetchTimestamp,
boost::optional<Status> abortReason) {
+ invariant(endState != RecipientStateEnum::kAwaitingFetchTimestamp);
ReshardingRecipientDocument replacementDoc(_recipientDoc);
replacementDoc.setState(endState);
- if (endState == RecipientStateEnum::kCreatingCollection) {
- _insertRecipientDocument(replacementDoc);
- _metrics()->setRecipientState(endState);
- return;
- }
-
emplaceFetchTimestampIfExists(replacementDoc, std::move(fetchTimestamp));
emplaceAbortReasonIfExists(replacementDoc, std::move(abortReason));
@@ -685,14 +706,11 @@ void ReshardingRecipientService::RecipientStateMachine::_updateCoordinator() {
ShardingCatalogClient::kMajorityWriteConcern));
}
-void ReshardingRecipientService::RecipientStateMachine::_insertRecipientDocument(
- const ReshardingRecipientDocument& doc) {
- auto opCtx = cc().makeOperationContext();
+void ReshardingRecipientService::RecipientStateMachine::insertStateDocument(
+ OperationContext* opCtx, const ReshardingRecipientDocument& recipientDoc) {
PersistentTaskStore<ReshardingRecipientDocument> store(
NamespaceString::kRecipientReshardingOperationsNamespace);
- store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern);
-
- _recipientDoc = doc;
+ store.add(opCtx, recipientDoc, kNoWaitWriteConcern);
}
void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument(
@@ -770,6 +788,10 @@ void ReshardingRecipientService::RecipientStateMachine::_onAbortOrStepdown(WithL
threadPool->shutdown();
}
+ if (!_allDonorsPreparedToDonate.getFuture().isReady()) {
+ _allDonorsPreparedToDonate.setError(status);
+ }
+
if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) {
_coordinatorHasDecisionPersisted.setError(status);
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 43cf216f367..a8f8f22899d 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -130,9 +130,13 @@ public:
void onReshardingFieldsChanges(OperationContext* opCtx,
const TypeCollectionReshardingFields& reshardingFields);
+ static void insertStateDocument(OperationContext* opCtx,
+ const ReshardingRecipientDocument& recipientDoc);
+
private:
// The following functions correspond to the actions to take at a particular recipient state.
- void _transitionToCreatingTemporaryReshardingCollection();
+ ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
void _createTemporaryReshardingCollectionThenTransitionToCloning();
@@ -157,9 +161,6 @@ private:
void _updateCoordinator();
- // Inserts 'doc' on-disk and sets '_replacementDoc' in-memory.
- void _insertRecipientDocument(const ReshardingRecipientDocument& doc);
-
// Updates the recipient document on-disk and in-memory with the 'replacementDoc.'
void _updateRecipientDocument(ReshardingRecipientDocument&& replacementDoc);
@@ -206,6 +207,8 @@ private:
// Each promise below corresponds to a state on the recipient state machine. They are listed in
// ascending order, such that the first promise below will be the first promise fulfilled.
+ SharedPromise<Timestamp> _allDonorsPreparedToDonate;
+
SharedPromise<void> _coordinatorHasDecisionPersisted;
SharedPromise<void> _completionPromise;
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index 545b27cd2bb..55779d166bf 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -147,7 +147,7 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext
auto opCtxHolder = tc->makeOperationContext();
auto const opCtx = opCtxHolder.get();
- boost::optional<CollectionMetadata> currentMetadata;
+ boost::optional<CollectionMetadata> currentMetadataToInstall;
ON_BLOCK_EXIT([&] {
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
@@ -161,11 +161,12 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext
auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
- if (currentMetadata) {
- csr->setFilteringMetadata(opCtx, *currentMetadata);
+ if (currentMetadataToInstall) {
+ csr->setFilteringMetadata(opCtx, *currentMetadataToInstall);
} else {
- // If currentMetadata is uninitialized, an error occurred in the current spawned
- // thread. Filtering metadata is cleared to force a new recover/refresh.
+ // If currentMetadataToInstall is uninitialized, an error occurred in the
+ // current spawned thread. Filtering metadata is cleared to force a new
+ // recover/refresh.
csr->clearFilteringMetadata(opCtx);
}
@@ -180,17 +181,21 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext
}
}
- currentMetadata = forceGetCurrentMetadata(opCtx, nss);
+ auto currentMetadata = forceGetCurrentMetadata(opCtx, nss);
- if (currentMetadata && currentMetadata->isSharded()) {
+ if (currentMetadata.isSharded()) {
// If the collection metadata after a refresh has 'reshardingFields', then pass it
// to the resharding subsystem to process.
- const auto& reshardingFields = currentMetadata->getReshardingFields();
+ const auto& reshardingFields = currentMetadata.getReshardingFields();
if (reshardingFields) {
resharding::processReshardingFieldsForCollection(
- opCtx, nss, *currentMetadata, *reshardingFields);
+ opCtx, nss, currentMetadata, *reshardingFields);
}
}
+
+ // Only if all actions taken as part of refreshing the shard version completed
+ // successfully do we want to install the current metadata.
+ currentMetadataToInstall = std::move(currentMetadata);
})
.semi()
.share();
diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl
index e3ef7b6c2e4..c50746e61c2 100644
--- a/src/mongo/s/resharding/common_types.idl
+++ b/src/mongo/s/resharding/common_types.idl
@@ -70,6 +70,7 @@ enums:
type: string
values:
kUnused: "unused"
+ kAwaitingFetchTimestamp: "awaiting-fetch-timestamp"
kCreatingCollection: "creating-collection"
kCloning: "cloning"
kApplying: "applying"