From 6c3c2cd287996ccc1821b7f04595edc7a4be3104 Mon Sep 17 00:00:00 2001 From: Jordi Serra Torrens Date: Tue, 16 Mar 2021 14:32:42 +0000 Subject: SERVER-55146: Bump collection version on any modification of config.collections reshardingFields or allowMigrations fields --- src/mongo/db/s/config/sharding_catalog_manager.h | 15 +- ...ump_shard_versions_and_change_metadata_test.cpp | 36 ++- .../sharding_catalog_manager_chunk_operations.cpp | 70 +++++- .../resharding/resharding_coordinator_service.cpp | 245 ++++++--------------- .../s/resharding/resharding_coordinator_test.cpp | 45 +++- src/mongo/s/chunk_manager.h | 2 +- 6 files changed, 200 insertions(+), 213 deletions(-) diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index f2b0d94b92c..10afc542b35 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -283,14 +283,19 @@ public: * In a single transaction, effectively bumps the shard version for each shard in the collection * to be the current collection version's major version + 1 inside an already-running * transaction. - * - * Note: it's the responsibility of the caller to ensure that the list of shards is stable, - * as any shards added after the shard ids have been passed in will be missed. */ - void bumpCollShardVersionsAndChangeMetadataInTxn( + void bumpCollectionVersionAndChangeMetadataInTxn( OperationContext* opCtx, const NamespaceString& nss, - const std::vector& shardIds, + unique_function changeMetadataFunc); + + /** + * Same as bumpCollectionVersionAndChangeMetadataInTxn, but bumps the version for several + * collections in a single transaction. + */ + void bumpMultipleCollectionVersionsAndChangeMetadataInTxn( + OperationContext* opCtx, + const std::vector& collNames, unique_function changeMetadataFunc); /** diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp index 9c36446fd15..9ff72e16ffe 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp @@ -161,9 +161,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, auto opCtx = operationContext(); - std::vector shardIds{kShard0.getName(), kShard1.getName()}; - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {}); + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, kNss, [&](OperationContext*, TxnNumber) {}); ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( shard0Chunk0, getChunkDoc(operationContext(), shard0Chunk0.getMin()), targetChunkVersion)); @@ -203,9 +202,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard0Chunk1, shard1Chunk0}); auto opCtx = operationContext(); - std::vector shardIds{kShard0.getName(), kShard1.getName()}; - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {}); + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, kNss, [&](OperationContext*, TxnNumber) {}); assertOnlyOneChunkVersionBumped( operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion); @@ -251,9 +249,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard0Chunk1, shard1Chunk0, shard1Chunk1}); auto opCtx = operationContext(); - std::vector shardIds{kShard0.getName(), kShard1.getName()}; - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {}); + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, kNss, [&](OperationContext*, TxnNumber) {}); assertOnlyOneChunkVersionBumped( operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion); @@ -282,10 +279,9 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard1Chunk0}); size_t numCalls = 0; - const std::vector shardIds{kShard0.getName(), kShard1.getName()}; ShardingCatalogManager::get(operationContext()) - ->bumpCollShardVersionsAndChangeMetadataInTxn( - operationContext(), kNss, shardIds, [&](OperationContext*, TxnNumber) { + ->bumpCollectionVersionAndChangeMetadataInTxn( + operationContext(), kNss, [&](OperationContext*, TxnNumber) { ++numCalls; if (numCalls < 5) { throw WriteConflictException(); @@ -313,8 +309,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, numCalls = 0; ShardingCatalogManager::get(operationContext()) - ->bumpCollShardVersionsAndChangeMetadataInTxn( - operationContext(), kNss, shardIds, [&](OperationContext*, TxnNumber) { + ->bumpCollectionVersionAndChangeMetadataInTxn( + operationContext(), kNss, [&](OperationContext*, TxnNumber) { ++numCalls; if (numCalls >= 5) { fp.reset(); @@ -354,12 +350,10 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard1Chunk0}); size_t numCalls = 0; - const std::vector shardIds{kShard0.getName(), kShard1.getName()}; ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext()) - ->bumpCollShardVersionsAndChangeMetadataInTxn( + ->bumpCollectionVersionAndChangeMetadataInTxn( operationContext(), kNss, - shardIds, [&](OperationContext*, TxnNumber) { ++numCalls; uasserted(ErrorCodes::ShutdownInProgress, @@ -382,10 +376,9 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, numCalls = 0; ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext()) - ->bumpCollShardVersionsAndChangeMetadataInTxn( + ->bumpCollectionVersionAndChangeMetadataInTxn( operationContext(), kNss, - shardIds, [&](OperationContext*, TxnNumber) { ++numCalls; uasserted(ErrorCodes::NotWritablePrimary, @@ -408,16 +401,15 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, numCalls = 0; ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext()) - ->bumpCollShardVersionsAndChangeMetadataInTxn( + ->bumpCollectionVersionAndChangeMetadataInTxn( operationContext(), kNss, - shardIds, [&](OperationContext*, TxnNumber) { ++numCalls; cc().getOperationContext()->markKilled(ErrorCodes::Interrupted); // Throw a LockTimeout exception so - // bumpCollShardVersionsAndChangeMetadataInTxn() makes another + // bumpCollectionVersionAndChangeMetadataInTxn() makes another // retry attempt and discovers operation context has been killed. uasserted(ErrorCodes::LockTimeout, "simulating lock timeout error from test"); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index f4a1d7c5349..3544004dedb 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -43,6 +43,7 @@ #include "mongo/db/logical_session_cache.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/query/distinct_command_gen.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/sharding_logging.h" #include "mongo/db/s/sharding_util.h" @@ -477,6 +478,50 @@ NamespaceStringOrUUID getNsOrUUIDForChunkTargeting(const CollectionType& coll) { } } +std::vector getShardsOwningChunksForCollection(OperationContext* opCtx, + const NamespaceString& nss) { + auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto findCollResponse = uassertStatusOK( + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + CollectionType::ConfigNS, + BSON(CollectionType::kNssFieldName << nss.ns()), + {}, + 1)); + uassert( + ErrorCodes::Error(5514600), "Collection does not exist", !findCollResponse.docs.empty()); + const CollectionType coll(findCollResponse.docs[0]); + const auto nsOrUUID = getNsOrUUIDForChunkTargeting(coll); + + DistinctCommand distinctCmd(ChunkType::ConfigNS, ChunkType::shard.name()); + if (nsOrUUID.uuid()) { + distinctCmd.setQuery(BSON(ChunkType::collectionUUID << *(nsOrUUID.uuid()))); + } else { + distinctCmd.setQuery(BSON(ChunkType::ns(nsOrUUID.nss()->ns()))); + } + + const auto distinctResult = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + NamespaceString::kConfigDb.toString(), + distinctCmd.toBSON({}), + Shard::RetryPolicy::kIdempotent)); + uassertStatusOK(distinctResult.commandStatus); + + const auto valuesElem = distinctResult.response.getField("values"); + std::vector shardIds; + for (const auto shard : valuesElem.Array()) { + shardIds.emplace_back(shard.String()); + } + uassert(ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find shardIds owning chunks for collection '" << nss.ns() + << ", but found none", + !shardIds.empty()); + + return shardIds; +} + } // namespace StatusWith ShardingCatalogManager::commitChunkSplit( @@ -1353,19 +1398,38 @@ void ShardingCatalogManager::ensureChunkVersionIsGreaterThan(OperationContext* o } } -void ShardingCatalogManager::bumpCollShardVersionsAndChangeMetadataInTxn( +void ShardingCatalogManager::bumpCollectionVersionAndChangeMetadataInTxn( OperationContext* opCtx, const NamespaceString& nss, - const std::vector& shardIds, + unique_function changeMetadataFunc) { + + bumpMultipleCollectionVersionsAndChangeMetadataInTxn( + opCtx, {nss}, std::move(changeMetadataFunc)); +} + +void ShardingCatalogManager::bumpMultipleCollectionVersionsAndChangeMetadataInTxn( + OperationContext* opCtx, + const std::vector& collNames, unique_function changeMetadataFunc) { // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and // migrations Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + + using NssAndShardIds = std::pair>; + std::vector nssAndShardIds; + for (const auto& nss : collNames) { + auto shardIds = getShardsOwningChunksForCollection(opCtx, nss); + nssAndShardIds.emplace_back(nss, std::move(shardIds)); + } + withTransaction(opCtx, NamespaceString::kConfigReshardingOperationsNamespace, [&](OperationContext* opCtx, TxnNumber txnNumber) { - bumpMajorVersionOneChunkPerShard(opCtx, nss, txnNumber, shardIds); + for (const auto& nssAndShardId : nssAndShardIds) { + bumpMajorVersionOneChunkPerShard( + opCtx, nssAndShardId.first, txnNumber, nssAndShardId.second); + } changeMetadataFunc(opCtx, txnNumber); }); } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 7308d9dca45..88aa8f87930 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -503,46 +503,8 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, opCtx, TagsType::ConfigNS, tagsRequest, txnNumber); } -// -// Helper methods for ensuring donors/ recipients are able to notice when certain state transitions -// occur. -// -// Donors/recipients learn when to transition states by noticing a change in shard versions for one -// of the two collections involved in the resharding operations. -// -// Before the resharding operation persists the decision whether to succeed or fail: -// * Donors are notified when the original resharding collection's shard versions are incremented. -// * Recipients are notified when the temporary resharding collection's shard versions are -// incremented. -// -// After the resharding operation persists its decision: -// * Both donors and recipients are notified when the original resharding collection's shard -// versions are incremented. -// - -/** - * Maps which participants are to be notified when the coordinator transitions into a given state. - */ -enum class ParticipantsToNotifyEnum { - kDonors, - kRecipients, - kAllParticipantsPostDecisionPersisted, - kNone -}; -stdx::unordered_map notifyForStateTransition{ - {CoordinatorStateEnum::kUnused, ParticipantsToNotifyEnum::kNone}, - {CoordinatorStateEnum::kInitializing, ParticipantsToNotifyEnum::kNone}, - {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNotifyEnum::kDonors}, - {CoordinatorStateEnum::kCloning, ParticipantsToNotifyEnum::kRecipients}, - {CoordinatorStateEnum::kApplying, ParticipantsToNotifyEnum::kDonors}, - {CoordinatorStateEnum::kBlockingWrites, ParticipantsToNotifyEnum::kDonors}, - {CoordinatorStateEnum::kDecisionPersisted, ParticipantsToNotifyEnum::kNone}, - {CoordinatorStateEnum::kDone, ParticipantsToNotifyEnum::kNone}, - {CoordinatorStateEnum::kError, ParticipantsToNotifyEnum::kDonors}, -}; - /** - * Executes metadata changes in a transaction. + * Executes metadata changes in a transaction without bumping the collection version. */ void executeMetadataChangesInTxn( OperationContext* opCtx, @@ -553,73 +515,6 @@ void executeMetadataChangesInTxn( changeMetadataFunc(opCtx, txnNumber); }); } - -/** - * Runs resharding metadata changes in a transaction. - * - * This function should only be called if donor and recipient shards DO NOT need to be informed of - * the updatedCoordinatorDoc's state transition. If donor or recipient shards need to be informed, - * instead call bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(). - */ -void executeStateTransitionAndMetadataChangesInTxn( - OperationContext* opCtx, - const ReshardingCoordinatorDocument& updatedCoordinatorDoc, - unique_function changeMetadataFunc) { - const auto& state = updatedCoordinatorDoc.getState(); - invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end()); - invariant(notifyForStateTransition[state] == ParticipantsToNotifyEnum::kNone); - - // Neither donors nor recipients need to be informed of the transition to - // updatedCoordinatorDoc's state. - executeMetadataChangesInTxn(opCtx, std::move(changeMetadataFunc)); -} - -/** - * In a single transaction, bumps the shard version for each shard spanning the corresponding - * resharding collection and executes changeMetadataFunc. - * - * This function should only be called if donor or recipient shards need to be informed of the - * updatedCoordinatorDoc's state transition. If donor or recipient shards do not need to be - * informed, instead call executeStateTransitionAndMetadataChangesInTxn(). - */ -void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( - OperationContext* opCtx, - const ReshardingCoordinatorDocument& updatedCoordinatorDoc, - unique_function changeMetadataFunc) { - const auto& state = updatedCoordinatorDoc.getState(); - invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end()); - invariant(notifyForStateTransition[state] != ParticipantsToNotifyEnum::kNone); - - auto participantsToNotify = notifyForStateTransition[state]; - if (participantsToNotify == ParticipantsToNotifyEnum::kDonors) { - // Bump the donor shard versions for the original namespace along with updating the - // metadata. - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, - updatedCoordinatorDoc.getSourceNss(), - resharding::extractShardIds(updatedCoordinatorDoc.getDonorShards()), - std::move(changeMetadataFunc)); - } else if (participantsToNotify == ParticipantsToNotifyEnum::kRecipients) { - // Bump the recipient shard versions for the temporary resharding namespace along with - // updating the metadata. - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, - updatedCoordinatorDoc.getTempReshardingNss(), - resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()), - std::move(changeMetadataFunc)); - } else if (participantsToNotify == - ParticipantsToNotifyEnum::kAllParticipantsPostDecisionPersisted) { - // Bump the recipient shard versions for the original resharding namespace along with - // updating the metadata. Only the recipient shards will have chunks for the namespace after - // the coordinator is in state kDecisionPersisted, bumping chunk versions on the donor - // shards would not apply. - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, - updatedCoordinatorDoc.getSourceNss(), - resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()), - std::move(changeMetadataFunc)); - } -} } // namespace namespace resharding { @@ -653,29 +548,30 @@ void insertCoordDocAndChangeOrigCollEntry(OperationContext* opCtx, opCtx, coordinatorDoc.getSourceNss(), repl::ReadConcernLevel::kMajorityReadConcern); const auto collation = originalCollType.getDefaultCollation(); - executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) { - // Insert the coordinator document to config.reshardingOperations. - invariant(coordinatorDoc.getActive()); - try { - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - } catch (const ExceptionFor& ex) { - auto extraInfo = ex.extraInfo(); - if (extraInfo->getKeyPattern().woCompare(BSON("active" << 1)) == 0) { - uasserted(ErrorCodes::ReshardCollectionInProgress, - str::stream() - << "Only one resharding operation is allowed to be active at a " - "time, aborting resharding op for " - << coordinatorDoc.getSourceNss()); - } + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, coordinatorDoc.getSourceNss(), [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Insert the coordinator document to config.reshardingOperations. + invariant(coordinatorDoc.getActive()); + try { + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + } catch (const ExceptionFor& ex) { + auto extraInfo = ex.extraInfo(); + if (extraInfo->getKeyPattern().woCompare(BSON("active" << 1)) == 0) { + uasserted(ErrorCodes::ReshardCollectionInProgress, + str::stream() + << "Only one resharding operation is allowed to be active at a " + "time, aborting resharding op for " + << coordinatorDoc.getSourceNss()); + } - throw; - } + throw; + } - // Update the config.collections entry for the original collection to include - // 'reshardingFields' - updateConfigCollectionsForOriginalNss( - opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); - }); + // Update the config.collections entry for the original collection to include + // 'reshardingFields' + updateConfigCollectionsForOriginalNss( + opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + }); } ChunkVersion calculateChunkVersionForInitialChunks(OperationContext* opCtx) { @@ -787,8 +683,10 @@ void writeParticipantShardsAndTempCollInfo( const ReshardingCoordinatorDocument& updatedCoordinatorDoc, std::vector initialChunks, std::vector zones) { - bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( - opCtx, updatedCoordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, + updatedCoordinatorDoc.getSourceNss(), + [&](OperationContext* opCtx, TxnNumber txnNumber) { // Update on-disk state to reflect latest state transition. writeToCoordinatorStateNss(opCtx, updatedCoordinatorDoc, txnNumber); updateConfigCollectionsForOriginalNss( @@ -808,61 +706,60 @@ void writeDecisionPersistedState(OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc, OID newCollectionEpoch, boost::optional newCollectionTimestamp) { - executeStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { - // Update the config.reshardingOperations entry - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + // No need to bump originalNss version because its epoch will be changed. + executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Update the config.reshardingOperations entry + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - // Remove the config.collections entry for the temporary collection - writeToConfigCollectionsForTempNss( - opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + // Remove the config.collections entry for the temporary collection + writeToConfigCollectionsForTempNss( + opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); - // Update the config.collections entry for the original namespace to reflect the new - // shard key, new epoch, and new UUID - updateConfigCollectionsForOriginalNss( - opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber); - - // Remove all chunk and tag documents associated with the original collection, then - // update the chunk and tag docs currently associated with the temp nss to be associated - // with the original nss - removeChunkAndTagsDocsForOriginalNss( - opCtx, coordinatorDoc, newCollectionTimestamp, txnNumber); - updateChunkAndTagsDocsForTempNss( - opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber); - }); + // Update the config.collections entry for the original namespace to reflect the new + // shard key, new epoch, and new UUID + updateConfigCollectionsForOriginalNss( + opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber); + + // Remove all chunk and tag documents associated with the original collection, then + // update the chunk and tag docs currently associated with the temp nss to be associated + // with the original nss + removeChunkAndTagsDocsForOriginalNss( + opCtx, coordinatorDoc, newCollectionTimestamp, txnNumber); + updateChunkAndTagsDocsForTempNss( + opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber); + }); } void writeStateTransitionAndCatalogUpdatesThenBumpShardVersions( OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) { // Run updates to config.reshardingOperations and config.collections in a transaction auto nextState = coordinatorDoc.getState(); - invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end()); - invariant(notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone, - "failed to write state transition with nextState {}"_format( - CoordinatorState_serializer(nextState))); - // Resharding metadata changes to be executed. - auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) { - // Update the config.reshardingOperations entry - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + std::vector collNames = {coordinatorDoc.getSourceNss()}; + if (nextState < CoordinatorStateEnum::kDecisionPersisted || + nextState == CoordinatorStateEnum::kError) { + collNames.emplace_back(coordinatorDoc.getTempReshardingNss()); + } - // Update the config.collections entry for the original collection - updateConfigCollectionsForOriginalNss( - opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + ShardingCatalogManager::get(opCtx)->bumpMultipleCollectionVersionsAndChangeMetadataInTxn( + opCtx, collNames, [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Update the config.reshardingOperations entry + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - // Update the config.collections entry for the temporary resharding collection. If we've - // already persisted the decision that the operation will succeed, we've removed the entry - // for the temporary collection and updated the entry with original namespace to have the - // new shard key, UUID, and epoch - if (nextState < CoordinatorStateEnum::kDecisionPersisted || - nextState == CoordinatorStateEnum::kError) { - writeToConfigCollectionsForTempNss( + // Update the config.collections entry for the original collection + updateConfigCollectionsForOriginalNss( opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); - } - }; - bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, std::move(changeMetadataFunc)); + // Update the config.collections entry for the temporary resharding collection. If we've + // already persisted the decision that the operation will succeed, we've removed the + // entry for the temporary collection and updated the entry with original namespace to + // have the new shard key, UUID, and epoch + if (nextState < CoordinatorStateEnum::kDecisionPersisted || + nextState == CoordinatorStateEnum::kError) { + writeToConfigCollectionsForTempNss( + opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + } + }); } void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, @@ -872,8 +769,10 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDone); - executeStateTransitionAndMetadataChangesInTxn( - opCtx, updatedCoordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, + updatedCoordinatorDoc.getSourceNss(), + [&](OperationContext* opCtx, TxnNumber txnNumber) { // Remove entry for this resharding operation from config.reshardingOperations writeToCoordinatorStateNss(opCtx, updatedCoordinatorDoc, txnNumber); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index 67450a78bed..577d8e65952 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -703,38 +703,52 @@ TEST_F(ReshardingCoordinatorPersistenceTest, WriteInitialInfoSucceeds) { writeInitialStateAndCatalogUpdatesExpectSuccess( operationContext(), expectedCoordinatorDoc, initialChunks, newZones); - // Confirm the shard version was increased for the donor shard. + // Confirm the shard version was increased for the donor shard. The collection version was + // bumped twice in 'writeInitialStateAndCatalogUpdatesExpectSuccess': once when reshardingFields + // is inserted to the collection doc, and once again when the state transitions to + // kPreparingToDonate. auto donorChunkPostTransition = getChunkDoc(operationContext(), donorChunk.getMin()); ASSERT_EQ(donorChunkPostTransition.getStatus(), Status::OK()); ASSERT_EQ(donorChunkPostTransition.getValue().getVersion().majorVersion(), - collectionVersion.majorVersion() + 1); + collectionVersion.majorVersion() + 2); } TEST_F(ReshardingCoordinatorPersistenceTest, BasicStateTransitionSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries(CoordinatorStateEnum::kCloning, _originalEpoch); - // Ensure the chunks for the original namespace exist since they will be bumped as a product of - // the state transition to kPreparingToDonate. + // Ensure the chunks for the original and temporary namespaces exist since they will be bumped + // as a product of the state transition to kBlockingWrites. auto donorChunk = makeAndInsertChunksForDonorShard( _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()}); - auto collectionVersion = donorChunk.getVersion(); + auto donorCollectionVersion = donorChunk.getVersion(); + + auto recipientChunk = makeAndInsertChunksForRecipientShard( + _tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); + auto recipientCollectionVersion = donorChunk.getVersion(); // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; expectedCoordinatorDoc.setState(CoordinatorStateEnum::kBlockingWrites); writeStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); - assertChunkVersionIncreasedAfterStateTransition(donorChunk, collectionVersion); + assertChunkVersionIncreasedAfterStateTransition(donorChunk, donorCollectionVersion); + assertChunkVersionIncreasedAfterStateTransition(recipientChunk, recipientCollectionVersion); } TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWithFetchTimestampSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch); + // Ensure the chunks for the original and temporary namespaces exist since they will be bumped + // as a product of the state transition to kCloning. + auto donorChunk = makeAndInsertChunksForDonorShard( + _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()}); + auto donorCollectionVersion = donorChunk.getVersion(); + auto recipientChunk = makeAndInsertChunksForRecipientShard( _tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); - auto collectionVersion = recipientChunk.getVersion(); + auto recipientCollectionVersion = recipientChunk.getVersion(); // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; @@ -742,7 +756,8 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWithFetchTimestampSu emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1)); writeStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); - assertChunkVersionIncreasedAfterStateTransition(recipientChunk, collectionVersion); + assertChunkVersionIncreasedAfterStateTransition(donorChunk, donorCollectionVersion); + assertChunkVersionIncreasedAfterStateTransition(recipientChunk, recipientCollectionVersion); } TEST_F(ReshardingCoordinatorPersistenceTest, StateTranstionToDecisionPersistedSucceeds) { @@ -779,6 +794,11 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTranstionToDecisionPersistedSu TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionToErrorSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch); + auto initialChunksIds = std::vector{OID::gen(), OID::gen()}; + + auto tempNssChunks = makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds); + auto recipientChunk = tempNssChunks[1]; + insertChunkAndZoneEntries(tempNssChunks, makeZones(_tempNss, _newShardKey)); insertChunkAndZoneEntries( makeChunks(_originalNss, OID::gen(), _oldShardKey, std::vector{OID::gen(), OID::gen()}), @@ -797,7 +817,14 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionToDoneSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries(CoordinatorStateEnum::kDecisionPersisted, _finalEpoch); + // Ensure the chunks for the original namespace exist since they will be bumped as a product of + // the state transition to kDone. + auto finalChunk = makeAndInsertChunksForRecipientShard( + _originalNss, _finalEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); + auto collectionVersion = finalChunk.getVersion(); + removeCoordinatorDocAndReshardingFieldsExpectSuccess(operationContext(), coordinatorDoc); + assertChunkVersionIncreasedAfterStateTransition(finalChunk, collectionVersion); } TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWhenCoordinatorDocDoesNotExistFails) { @@ -807,7 +834,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWhenCoordinatorDocDo ASSERT_THROWS_CODE(resharding::writeStateTransitionAndCatalogUpdatesThenBumpShardVersions( operationContext(), coordinatorDoc), AssertionException, - 5057701); + 5514600); } TEST_F(ReshardingCoordinatorPersistenceTest, diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 11e34c57239..f9e32bf6329 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -291,7 +291,7 @@ public: bool sameReshardingFields(const RoutingTableHistory& other) const { if (_reshardingFields && other._reshardingFields) { - return _reshardingFields->toBSON().woCompare(other._reshardingFields->toBSON()); + return _reshardingFields->toBSON().woCompare(other._reshardingFields->toBSON()) == 0; } else { return !_reshardingFields && !other._reshardingFields; } -- cgit v1.2.1