diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_coordinator_service.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_service.cpp | 245 |
1 files changed, 72 insertions, 173 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 7308d9dca45..88aa8f87930 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -503,46 +503,8 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, opCtx, TagsType::ConfigNS, tagsRequest, txnNumber); } -// -// Helper methods for ensuring donors/ recipients are able to notice when certain state transitions -// occur. -// -// Donors/recipients learn when to transition states by noticing a change in shard versions for one -// of the two collections involved in the resharding operations. -// -// Before the resharding operation persists the decision whether to succeed or fail: -// * Donors are notified when the original resharding collection's shard versions are incremented. -// * Recipients are notified when the temporary resharding collection's shard versions are -// incremented. -// -// After the resharding operation persists its decision: -// * Both donors and recipients are notified when the original resharding collection's shard -// versions are incremented. -// - -/** - * Maps which participants are to be notified when the coordinator transitions into a given state. - */ -enum class ParticipantsToNotifyEnum { - kDonors, - kRecipients, - kAllParticipantsPostDecisionPersisted, - kNone -}; -stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNotifyEnum> notifyForStateTransition{ - {CoordinatorStateEnum::kUnused, ParticipantsToNotifyEnum::kNone}, - {CoordinatorStateEnum::kInitializing, ParticipantsToNotifyEnum::kNone}, - {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNotifyEnum::kDonors}, - {CoordinatorStateEnum::kCloning, ParticipantsToNotifyEnum::kRecipients}, - {CoordinatorStateEnum::kApplying, ParticipantsToNotifyEnum::kDonors}, - {CoordinatorStateEnum::kBlockingWrites, ParticipantsToNotifyEnum::kDonors}, - {CoordinatorStateEnum::kDecisionPersisted, ParticipantsToNotifyEnum::kNone}, - {CoordinatorStateEnum::kDone, ParticipantsToNotifyEnum::kNone}, - {CoordinatorStateEnum::kError, ParticipantsToNotifyEnum::kDonors}, -}; - /** - * Executes metadata changes in a transaction. + * Executes metadata changes in a transaction without bumping the collection version. */ void executeMetadataChangesInTxn( OperationContext* opCtx, @@ -553,73 +515,6 @@ void executeMetadataChangesInTxn( changeMetadataFunc(opCtx, txnNumber); }); } - -/** - * Runs resharding metadata changes in a transaction. - * - * This function should only be called if donor and recipient shards DO NOT need to be informed of - * the updatedCoordinatorDoc's state transition. If donor or recipient shards need to be informed, - * instead call bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(). - */ -void executeStateTransitionAndMetadataChangesInTxn( - OperationContext* opCtx, - const ReshardingCoordinatorDocument& updatedCoordinatorDoc, - unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) { - const auto& state = updatedCoordinatorDoc.getState(); - invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end()); - invariant(notifyForStateTransition[state] == ParticipantsToNotifyEnum::kNone); - - // Neither donors nor recipients need to be informed of the transition to - // updatedCoordinatorDoc's state. - executeMetadataChangesInTxn(opCtx, std::move(changeMetadataFunc)); -} - -/** - * In a single transaction, bumps the shard version for each shard spanning the corresponding - * resharding collection and executes changeMetadataFunc. - * - * This function should only be called if donor or recipient shards need to be informed of the - * updatedCoordinatorDoc's state transition. If donor or recipient shards do not need to be - * informed, instead call executeStateTransitionAndMetadataChangesInTxn(). - */ -void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( - OperationContext* opCtx, - const ReshardingCoordinatorDocument& updatedCoordinatorDoc, - unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) { - const auto& state = updatedCoordinatorDoc.getState(); - invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end()); - invariant(notifyForStateTransition[state] != ParticipantsToNotifyEnum::kNone); - - auto participantsToNotify = notifyForStateTransition[state]; - if (participantsToNotify == ParticipantsToNotifyEnum::kDonors) { - // Bump the donor shard versions for the original namespace along with updating the - // metadata. - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, - updatedCoordinatorDoc.getSourceNss(), - resharding::extractShardIds(updatedCoordinatorDoc.getDonorShards()), - std::move(changeMetadataFunc)); - } else if (participantsToNotify == ParticipantsToNotifyEnum::kRecipients) { - // Bump the recipient shard versions for the temporary resharding namespace along with - // updating the metadata. - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, - updatedCoordinatorDoc.getTempReshardingNss(), - resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()), - std::move(changeMetadataFunc)); - } else if (participantsToNotify == - ParticipantsToNotifyEnum::kAllParticipantsPostDecisionPersisted) { - // Bump the recipient shard versions for the original resharding namespace along with - // updating the metadata. Only the recipient shards will have chunks for the namespace after - // the coordinator is in state kDecisionPersisted, bumping chunk versions on the donor - // shards would not apply. - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, - updatedCoordinatorDoc.getSourceNss(), - resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()), - std::move(changeMetadataFunc)); - } -} } // namespace namespace resharding { @@ -653,29 +548,30 @@ void insertCoordDocAndChangeOrigCollEntry(OperationContext* opCtx, opCtx, coordinatorDoc.getSourceNss(), repl::ReadConcernLevel::kMajorityReadConcern); const auto collation = originalCollType.getDefaultCollation(); - executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) { - // Insert the coordinator document to config.reshardingOperations. - invariant(coordinatorDoc.getActive()); - try { - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) { - auto extraInfo = ex.extraInfo<DuplicateKeyErrorInfo>(); - if (extraInfo->getKeyPattern().woCompare(BSON("active" << 1)) == 0) { - uasserted(ErrorCodes::ReshardCollectionInProgress, - str::stream() - << "Only one resharding operation is allowed to be active at a " - "time, aborting resharding op for " - << coordinatorDoc.getSourceNss()); - } + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, coordinatorDoc.getSourceNss(), [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Insert the coordinator document to config.reshardingOperations. + invariant(coordinatorDoc.getActive()); + try { + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) { + auto extraInfo = ex.extraInfo<DuplicateKeyErrorInfo>(); + if (extraInfo->getKeyPattern().woCompare(BSON("active" << 1)) == 0) { + uasserted(ErrorCodes::ReshardCollectionInProgress, + str::stream() + << "Only one resharding operation is allowed to be active at a " + "time, aborting resharding op for " + << coordinatorDoc.getSourceNss()); + } - throw; - } + throw; + } - // Update the config.collections entry for the original collection to include - // 'reshardingFields' - updateConfigCollectionsForOriginalNss( - opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); - }); + // Update the config.collections entry for the original collection to include + // 'reshardingFields' + updateConfigCollectionsForOriginalNss( + opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + }); } ChunkVersion calculateChunkVersionForInitialChunks(OperationContext* opCtx) { @@ -787,8 +683,10 @@ void writeParticipantShardsAndTempCollInfo( const ReshardingCoordinatorDocument& updatedCoordinatorDoc, std::vector<ChunkType> initialChunks, std::vector<BSONObj> zones) { - bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( - opCtx, updatedCoordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, + updatedCoordinatorDoc.getSourceNss(), + [&](OperationContext* opCtx, TxnNumber txnNumber) { // Update on-disk state to reflect latest state transition. writeToCoordinatorStateNss(opCtx, updatedCoordinatorDoc, txnNumber); updateConfigCollectionsForOriginalNss( @@ -808,61 +706,60 @@ void writeDecisionPersistedState(OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc, OID newCollectionEpoch, boost::optional<Timestamp> newCollectionTimestamp) { - executeStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { - // Update the config.reshardingOperations entry - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + // No need to bump originalNss version because its epoch will be changed. + executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Update the config.reshardingOperations entry + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - // Remove the config.collections entry for the temporary collection - writeToConfigCollectionsForTempNss( - opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + // Remove the config.collections entry for the temporary collection + writeToConfigCollectionsForTempNss( + opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); - // Update the config.collections entry for the original namespace to reflect the new - // shard key, new epoch, and new UUID - updateConfigCollectionsForOriginalNss( - opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber); - - // Remove all chunk and tag documents associated with the original collection, then - // update the chunk and tag docs currently associated with the temp nss to be associated - // with the original nss - removeChunkAndTagsDocsForOriginalNss( - opCtx, coordinatorDoc, newCollectionTimestamp, txnNumber); - updateChunkAndTagsDocsForTempNss( - opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber); - }); + // Update the config.collections entry for the original namespace to reflect the new + // shard key, new epoch, and new UUID + updateConfigCollectionsForOriginalNss( + opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber); + + // Remove all chunk and tag documents associated with the original collection, then + // update the chunk and tag docs currently associated with the temp nss to be associated + // with the original nss + removeChunkAndTagsDocsForOriginalNss( + opCtx, coordinatorDoc, newCollectionTimestamp, txnNumber); + updateChunkAndTagsDocsForTempNss( + opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber); + }); } void writeStateTransitionAndCatalogUpdatesThenBumpShardVersions( OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) { // Run updates to config.reshardingOperations and config.collections in a transaction auto nextState = coordinatorDoc.getState(); - invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end()); - invariant(notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone, - "failed to write state transition with nextState {}"_format( - CoordinatorState_serializer(nextState))); - // Resharding metadata changes to be executed. - auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) { - // Update the config.reshardingOperations entry - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + std::vector<NamespaceString> collNames = {coordinatorDoc.getSourceNss()}; + if (nextState < CoordinatorStateEnum::kDecisionPersisted || + nextState == CoordinatorStateEnum::kError) { + collNames.emplace_back(coordinatorDoc.getTempReshardingNss()); + } - // Update the config.collections entry for the original collection - updateConfigCollectionsForOriginalNss( - opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + ShardingCatalogManager::get(opCtx)->bumpMultipleCollectionVersionsAndChangeMetadataInTxn( + opCtx, collNames, [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Update the config.reshardingOperations entry + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - // Update the config.collections entry for the temporary resharding collection. If we've - // already persisted the decision that the operation will succeed, we've removed the entry - // for the temporary collection and updated the entry with original namespace to have the - // new shard key, UUID, and epoch - if (nextState < CoordinatorStateEnum::kDecisionPersisted || - nextState == CoordinatorStateEnum::kError) { - writeToConfigCollectionsForTempNss( + // Update the config.collections entry for the original collection + updateConfigCollectionsForOriginalNss( opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); - } - }; - bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, std::move(changeMetadataFunc)); + // Update the config.collections entry for the temporary resharding collection. If we've + // already persisted the decision that the operation will succeed, we've removed the + // entry for the temporary collection and updated the entry with original namespace to + // have the new shard key, UUID, and epoch + if (nextState < CoordinatorStateEnum::kDecisionPersisted || + nextState == CoordinatorStateEnum::kError) { + writeToConfigCollectionsForTempNss( + opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + } + }); } void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, @@ -872,8 +769,10 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDone); - executeStateTransitionAndMetadataChangesInTxn( - opCtx, updatedCoordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, + updatedCoordinatorDoc.getSourceNss(), + [&](OperationContext* opCtx, TxnNumber txnNumber) { // Remove entry for this resharding operation from config.reshardingOperations writeToCoordinatorStateNss(opCtx, updatedCoordinatorDoc, txnNumber); |