diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-02-24 15:08:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-24 16:41:46 +0000 |
commit | 6ca44ce30689b3fe93d3afe75f07c70a0fdc3d8d (patch) | |
tree | a84afaf9fbebe581a7af6d103e58ea974cc366a2 /src/mongo | |
parent | fdfc0daa5e3c46ac7a2b6e08d98528b161217467 (diff) | |
download | mongo-6ca44ce30689b3fe93d3afe75f07c70a0fdc3d8d.tar.gz |
SERVER-54513 Add kAwaitingFetchTimestamp to resharding recipient states.
Also changes the RecoverRefreshThread to insert the donor and recipient
state documents rather than the primary-only service Instance itself.
Diffstat (limited to 'src/mongo')
10 files changed, 134 insertions, 70 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index b31eebd66a0..097e97e6fff 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -60,6 +60,7 @@ namespace { using namespace fmt::literals; +MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCloning); MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorInSteadyState); MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeDecisionPersisted); MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCompletion) @@ -924,6 +925,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( .then([this, executor] { _insertCoordDocAndChangeOrigCollEntry(); }) .then([this, executor] { _calculateParticipantsAndChunksThenWriteToDisk(); }) .then([this, executor] { _tellAllDonorsToRefresh(executor); }) + .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) .then([this, executor] { return _awaitAllDonorsReadyToDonate(executor); }) .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); }) @@ -1086,6 +1088,11 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate() .thenRunOn(**executor) .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { + { + auto opCtx = cc().makeOperationContext(); + reshardingPauseCoordinatorBeforeCloning.pauseWhileSet(opCtx.get()); + } + auto highestMinFetchTimestamp = getHighestMinFetchTimestamp(coordinatorDocChangedOnDisk.getDonorShards()); _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kCloning, diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 99872599e00..dfd42fe7bb9 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -32,6 +32,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/s/resharding/resharding_donor_recipient_common.h" +#include "mongo/db/storage/duplicate_key_error_info.h" #include <fmt/format.h> @@ -59,14 +60,43 @@ std::vector<DonorShardMirroringEntry> createDonorShardMirroringEntriesFromDonorS } /* - * Creates a ReshardingStateMachine with the assumption that the state machine does not already - * exist. + * Creates a ReshardingStateMachine if this node is primary and the ReshardingStateMachine doesn't + * already exist. + * + * It is safe to call this function when this node is actually a secondary. */ template <class Service, class StateMachine, class ReshardingDocument> void createReshardingStateMachine(OperationContext* opCtx, const ReshardingDocument& doc) { - auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); - auto service = registry->lookupServiceByName(Service::kServiceName); - StateMachine::getOrCreate(opCtx, service, doc.toBSON()); + try { + // Inserting the resharding state document must happen synchronously with the shard version + // refresh for the w:majority wait from the resharding coordinator to mean that this replica + // set shard cannot forget about being a participant. + StateMachine::insertStateDocument(opCtx, doc); + + auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); + auto service = registry->lookupServiceByName(Service::kServiceName); + StateMachine::getOrCreate(opCtx, service, doc.toBSON()); + } catch (const ExceptionForCat<ErrorCategory::NotPrimaryError>&) { + // resharding::processReshardingFieldsForCollection() is called on both primary and + // secondary nodes as part of the shard version being refreshed. Due to the RSTL lock not + // being held throughout the shard version refresh, it is also possible for the node to + // arbitrarily step down and step up during the shard version refresh. Rather than + // attempt to prevent replica set member state transitions during the shard version refresh, + // we instead swallow the NotPrimaryError exception. This is safe because there is no work a + // secondary (or primary which stepped down) must do for an active resharding operation upon + // refreshing its shard version. The primary is solely responsible for advancing the + // participant state as a result of the shard version refresh. + } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) { + // Similar to the ErrorCategory::NotPrimaryError clause above, it is theoretically possible + // for a series of stepdowns and step-ups to lead a scenario where a stale but now + // re-elected primary attempts to insert the state document when another node which was + // primary had already done so. Again, rather than attempt to prevent replica set member + // state transitions during the shard version refresh, we instead swallow the DuplicateKey + // exception. This is safe because PrimaryOnlyService::onStepUp() will have constructed a + // new instance of the resharding state machine. + auto dupeKeyInfo = ex.extraInfo<DuplicateKeyErrorInfo>(); + invariant(dupeKeyInfo->getDuplicatedKeyValue().binaryEqual(BSON("_id" << doc.get_id()))); + } } /** @@ -207,17 +237,15 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx, return; } - // If a resharding operation is past state kCloning but does not currently have a recipient - // document in-memory, this means that the document will be recovered by the + // If a resharding operation is past state kPreparingToDonate but does not currently have a + // recipient document in-memory, this means that the document will be recovered by the // ReshardingRecipientService, and at that time the latest instance of 'reshardingFields' // will be read. Return no-op. // - // The RecipientStateMachine creates the temporary resharding collection immediately after being - // constructed. If a resharding operation has yet to reach state kCloning, then some donor - // shards may not be prepared for the recipient to start cloning. We avoid constructing the - // RecipientStateMachine until all donor shards are known to be prepared for the recipient to - // start cloning. - if (reshardingFields.getState() != CoordinatorStateEnum::kCloning) { + // We construct the RecipientStateMachine in the kPreparingToDonate state (which is the same + // state as when we would construct the DonorStateMachine) so the resharding coordinator can + // rely on all of the state machines being constructed as part of the same state transition. + if (reshardingFields.getState() != CoordinatorStateEnum::kPreparingToDonate) { return; } @@ -288,11 +316,15 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( OperationContext* opCtx, const CollectionMetadata& metadata, const ReshardingFields& reshardingFields) { + // The recipient state machines are created before the donor shards are prepared to donate but + // will remain idle until the donor shards are prepared to donate. + invariant(!reshardingFields.getRecipientFields()->getFetchTimestamp()); + std::vector<DonorShardMirroringEntry> donorShards = createDonorShardMirroringEntriesFromDonorShardIds( reshardingFields.getRecipientFields()->getDonorShardIds()); - auto recipientDoc = ReshardingRecipientDocument(RecipientStateEnum::kCreatingCollection, + auto recipientDoc = ReshardingRecipientDocument(RecipientStateEnum::kAwaitingFetchTimestamp, std::move(donorShards)); auto commonMetadata = @@ -302,9 +334,6 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( metadata.getShardKeyPattern().toBSON()); recipientDoc.setCommonReshardingMetadata(std::move(commonMetadata)); - emplaceFetchTimestampIfExists(recipientDoc, - reshardingFields.getRecipientFields()->getFetchTimestamp()); - return recipientDoc; } diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index a582302f029..0da9f7f56b1 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -66,10 +66,10 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard); auto reshardingFields = - createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning); + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); appendRecipientFieldsToReshardingFields( - reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp); + reshardingFields, kShardIds, kExistingUUID, kOriginalNss); auto recipientDoc = resharding::constructRecipientDocumentFromReshardingFields( opCtx, metadata, reshardingFields); @@ -103,9 +103,9 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) { auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard); auto reshardingFields = - createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning); + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); appendRecipientFieldsToReshardingFields( - reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp); + reshardingFields, kShardIds, kExistingUUID, kOriginalNss); resharding::processReshardingFieldsForCollection( opCtx, kTemporaryReshardingNss, metadata, reshardingFields); @@ -185,9 +185,9 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessRecipientFieldsWhenShardDoesnt auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kOtherShard); auto reshardingFields = - createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning); + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); appendRecipientFieldsToReshardingFields( - reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp); + reshardingFields, kShardIds, kExistingUUID, kOriginalNss); resharding::processReshardingFieldsForCollection( opCtx, kTemporaryReshardingNss, metadata, reshardingFields); @@ -206,7 +206,7 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessReshardingFieldsWithoutDonorOr auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard); auto reshardingFields = - createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning); + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); ASSERT_THROWS_CODE(resharding::processReshardingFieldsForCollection( opCtx, kTemporaryReshardingNss, metadata, reshardingFields), diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h index 1f6ec86230c..8b8c2390a06 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h @@ -187,9 +187,8 @@ protected: metadata.getShardKeyPattern().toBSON(), recipientDoc); - ASSERT(recipientDoc.getState() == RecipientStateEnum::kCreatingCollection); - ASSERT(recipientDoc.getFetchTimestamp() == - reshardingFields.getRecipientFields()->getFetchTimestamp()); + ASSERT(recipientDoc.getState() == RecipientStateEnum::kAwaitingFetchTimestamp); + ASSERT(!recipientDoc.getFetchTimestamp()); auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds(); auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end()); diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 816f2084fb4..c7aacf2e4b9 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -58,6 +58,9 @@ MONGO_FAIL_POINT_DEFINE(reshardingDonorFailsBeforePreparingToMirror); using namespace fmt::literals; namespace { + +const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)}; + ChunkManager getShardedCollectionRoutingInfoWithRefreshAndFlush(const NamespaceString& nss) { auto opCtx = cc().makeOperationContext(); @@ -259,8 +262,6 @@ void ReshardingDonorService::DonorStateMachine:: return; } - _insertDonorDocument(_donorDoc); - ReshardingCloneSize cloneSizeEstimate; { auto opCtx = cc().makeOperationContext(); @@ -506,14 +507,11 @@ void ReshardingDonorService::DonorStateMachine::_transitionStateAndUpdateCoordin ShardingCatalogClient::kMajorityWriteConcern)); } -void ReshardingDonorService::DonorStateMachine::_insertDonorDocument( - const ReshardingDonorDocument& doc) { - auto opCtx = cc().makeOperationContext(); +void ReshardingDonorService::DonorStateMachine::insertStateDocument( + OperationContext* opCtx, const ReshardingDonorDocument& donorDoc) { PersistentTaskStore<ReshardingDonorDocument> store( NamespaceString::kDonorReshardingOperationsNamespace); - store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern); - - _donorDoc = doc; + store.add(opCtx, donorDoc, kNoWaitWriteConcern); } void ReshardingDonorService::DonorStateMachine::_updateDonorDocument( diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index d0d4b8fc626..d2fe055e11a 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -97,6 +97,9 @@ public: SharedSemiFuture<void> awaitFinalOplogEntriesWritten(); + static void insertStateDocument(OperationContext* opCtx, + const ReshardingDonorDocument& donorDoc); + private: // The following functions correspond to the actions to take at a particular donor state. void _transitionToPreparingToDonate(); @@ -129,9 +132,6 @@ private: boost::optional<Status> abortReason = boost::none, boost::optional<ReshardingCloneSize> cloneSizeEstimate = boost::none); - // Inserts 'doc' on-disk and sets '_donorDoc' in-memory. - void _insertDonorDocument(const ReshardingDonorDocument& doc); - // Updates the donor document on-disk and in-memory with the 'replacementDoc.' void _updateDonorDocument(ReshardingDonorDocument&& replacementDoc); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 8ebd4e0298b..1f718f4a031 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -66,6 +66,8 @@ MONGO_FAIL_POINT_DEFINE(removeRecipientDocFailpoint); namespace { +const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)}; + std::shared_ptr<executor::ThreadPoolTaskExecutor> makeTaskExecutor(StringData name, size_t maxThreads) { ThreadPool::Limits threadPoolLimits; @@ -92,6 +94,16 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) { } } +void ensureFulfilledPromise(WithLock lk, SharedPromise<Timestamp>& sp, Timestamp ts) { + auto future = sp.getFuture(); + if (!future.isReady()) { + sp.emplaceValue(ts); + } else { + // Ensure that we would only attempt to fulfill the promise with the same Timestamp value. + invariant(future.get() == ts); + } +} + } // namespace namespace resharding { @@ -213,6 +225,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() { stdx::lock_guard<Latch> lg(_mutex); + invariant(_allDonorsPreparedToDonate.getFuture().isReady()); invariant(_coordinatorHasDecisionPersisted.getFuture().isReady()); invariant(_completionPromise.getFuture().isReady()); } @@ -221,9 +234,9 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancelationToken& cancelToken) noexcept { return ExecutorFuture<void>(**executor) - .then([this] { + .then([this, executor] { _metrics()->onStart(); - _transitionToCreatingTemporaryReshardingCollection(); + return _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(executor); }) .then([this] { _createTemporaryReshardingCollectionThenTransitionToCloning(); }) .then([this, executor, cancelToken] { @@ -316,18 +329,31 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange } auto coordinatorState = reshardingFields.getState(); + + if (coordinatorState >= CoordinatorStateEnum::kCloning) { + auto fetchTimestamp = reshardingFields.getRecipientFields()->getFetchTimestamp(); + invariant(fetchTimestamp); + ensureFulfilledPromise(lk, _allDonorsPreparedToDonate, *fetchTimestamp); + } + if (coordinatorState >= CoordinatorStateEnum::kDecisionPersisted) { ensureFulfilledPromise(lk, _coordinatorHasDecisionPersisted); } } -void ReshardingRecipientService::RecipientStateMachine:: - _transitionToCreatingTemporaryReshardingCollection() { - if (_recipientDoc.getState() > RecipientStateEnum::kCreatingCollection) { - return; +ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: + _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_recipientDoc.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { + invariant(_recipientDoc.getFetchTimestamp()); + return ExecutorFuture(**executor); } - _transitionState(RecipientStateEnum::kCreatingCollection); + return _allDonorsPreparedToDonate.getFuture() + .thenRunOn(**executor) + .then([this](Timestamp fetchTimestamp) { + _transitionState(RecipientStateEnum::kCreatingCollection, fetchTimestamp); + }); } void ReshardingRecipientService::RecipientStateMachine:: @@ -632,15 +658,10 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( RecipientStateEnum endState, boost::optional<Timestamp> fetchTimestamp, boost::optional<Status> abortReason) { + invariant(endState != RecipientStateEnum::kAwaitingFetchTimestamp); ReshardingRecipientDocument replacementDoc(_recipientDoc); replacementDoc.setState(endState); - if (endState == RecipientStateEnum::kCreatingCollection) { - _insertRecipientDocument(replacementDoc); - _metrics()->setRecipientState(endState); - return; - } - emplaceFetchTimestampIfExists(replacementDoc, std::move(fetchTimestamp)); emplaceAbortReasonIfExists(replacementDoc, std::move(abortReason)); @@ -685,14 +706,11 @@ void ReshardingRecipientService::RecipientStateMachine::_updateCoordinator() { ShardingCatalogClient::kMajorityWriteConcern)); } -void ReshardingRecipientService::RecipientStateMachine::_insertRecipientDocument( - const ReshardingRecipientDocument& doc) { - auto opCtx = cc().makeOperationContext(); +void ReshardingRecipientService::RecipientStateMachine::insertStateDocument( + OperationContext* opCtx, const ReshardingRecipientDocument& recipientDoc) { PersistentTaskStore<ReshardingRecipientDocument> store( NamespaceString::kRecipientReshardingOperationsNamespace); - store.add(opCtx.get(), doc, WriteConcerns::kMajorityWriteConcern); - - _recipientDoc = doc; + store.add(opCtx, recipientDoc, kNoWaitWriteConcern); } void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument( @@ -770,6 +788,10 @@ void ReshardingRecipientService::RecipientStateMachine::_onAbortOrStepdown(WithL threadPool->shutdown(); } + if (!_allDonorsPreparedToDonate.getFuture().isReady()) { + _allDonorsPreparedToDonate.setError(status); + } + if (!_coordinatorHasDecisionPersisted.getFuture().isReady()) { _coordinatorHasDecisionPersisted.setError(status); } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 43cf216f367..a8f8f22899d 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -130,9 +130,13 @@ public: void onReshardingFieldsChanges(OperationContext* opCtx, const TypeCollectionReshardingFields& reshardingFields); + static void insertStateDocument(OperationContext* opCtx, + const ReshardingRecipientDocument& recipientDoc); + private: // The following functions correspond to the actions to take at a particular recipient state. - void _transitionToCreatingTemporaryReshardingCollection(); + ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); void _createTemporaryReshardingCollectionThenTransitionToCloning(); @@ -157,9 +161,6 @@ private: void _updateCoordinator(); - // Inserts 'doc' on-disk and sets '_replacementDoc' in-memory. - void _insertRecipientDocument(const ReshardingRecipientDocument& doc); - // Updates the recipient document on-disk and in-memory with the 'replacementDoc.' void _updateRecipientDocument(ReshardingRecipientDocument&& replacementDoc); @@ -206,6 +207,8 @@ private: // Each promise below corresponds to a state on the recipient state machine. They are listed in // ascending order, such that the first promise below will be the first promise fulfilled. + SharedPromise<Timestamp> _allDonorsPreparedToDonate; + SharedPromise<void> _coordinatorHasDecisionPersisted; SharedPromise<void> _completionPromise; diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 545b27cd2bb..55779d166bf 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -147,7 +147,7 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext auto opCtxHolder = tc->makeOperationContext(); auto const opCtx = opCtxHolder.get(); - boost::optional<CollectionMetadata> currentMetadata; + boost::optional<CollectionMetadata> currentMetadataToInstall; ON_BLOCK_EXIT([&] { UninterruptibleLockGuard noInterrupt(opCtx->lockState()); @@ -161,11 +161,12 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext auto* const csr = CollectionShardingRuntime::get(opCtx, nss); - if (currentMetadata) { - csr->setFilteringMetadata(opCtx, *currentMetadata); + if (currentMetadataToInstall) { + csr->setFilteringMetadata(opCtx, *currentMetadataToInstall); } else { - // If currentMetadata is uninitialized, an error occurred in the current spawned - // thread. Filtering metadata is cleared to force a new recover/refresh. + // If currentMetadataToInstall is uninitialized, an error occurred in the + // current spawned thread. Filtering metadata is cleared to force a new + // recover/refresh. csr->clearFilteringMetadata(opCtx); } @@ -180,17 +181,21 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext } } - currentMetadata = forceGetCurrentMetadata(opCtx, nss); + auto currentMetadata = forceGetCurrentMetadata(opCtx, nss); - if (currentMetadata && currentMetadata->isSharded()) { + if (currentMetadata.isSharded()) { // If the collection metadata after a refresh has 'reshardingFields', then pass it // to the resharding subsystem to process. - const auto& reshardingFields = currentMetadata->getReshardingFields(); + const auto& reshardingFields = currentMetadata.getReshardingFields(); if (reshardingFields) { resharding::processReshardingFieldsForCollection( - opCtx, nss, *currentMetadata, *reshardingFields); + opCtx, nss, currentMetadata, *reshardingFields); } } + + // Only if all actions taken as part of refreshing the shard version completed + // successfully do we want to install the current metadata. + currentMetadataToInstall = std::move(currentMetadata); }) .semi() .share(); diff --git a/src/mongo/s/resharding/common_types.idl b/src/mongo/s/resharding/common_types.idl index e3ef7b6c2e4..c50746e61c2 100644 --- a/src/mongo/s/resharding/common_types.idl +++ b/src/mongo/s/resharding/common_types.idl @@ -70,6 +70,7 @@ enums: type: string values: kUnused: "unused" + kAwaitingFetchTimestamp: "awaiting-fetch-timestamp" kCreatingCollection: "creating-collection" kCloning: "cloning" kApplying: "applying" |