summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_coordinator_service.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp245
1 files changed, 72 insertions, 173 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 7308d9dca45..88aa8f87930 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -503,46 +503,8 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
opCtx, TagsType::ConfigNS, tagsRequest, txnNumber);
}
-//
-// Helper methods for ensuring donors/ recipients are able to notice when certain state transitions
-// occur.
-//
-// Donors/recipients learn when to transition states by noticing a change in shard versions for one
-// of the two collections involved in the resharding operations.
-//
-// Before the resharding operation persists the decision whether to succeed or fail:
-// * Donors are notified when the original resharding collection's shard versions are incremented.
-// * Recipients are notified when the temporary resharding collection's shard versions are
-// incremented.
-//
-// After the resharding operation persists its decision:
-// * Both donors and recipients are notified when the original resharding collection's shard
-// versions are incremented.
-//
-
-/**
- * Maps which participants are to be notified when the coordinator transitions into a given state.
- */
-enum class ParticipantsToNotifyEnum {
- kDonors,
- kRecipients,
- kAllParticipantsPostDecisionPersisted,
- kNone
-};
-stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNotifyEnum> notifyForStateTransition{
- {CoordinatorStateEnum::kUnused, ParticipantsToNotifyEnum::kNone},
- {CoordinatorStateEnum::kInitializing, ParticipantsToNotifyEnum::kNone},
- {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNotifyEnum::kDonors},
- {CoordinatorStateEnum::kCloning, ParticipantsToNotifyEnum::kRecipients},
- {CoordinatorStateEnum::kApplying, ParticipantsToNotifyEnum::kDonors},
- {CoordinatorStateEnum::kBlockingWrites, ParticipantsToNotifyEnum::kDonors},
- {CoordinatorStateEnum::kDecisionPersisted, ParticipantsToNotifyEnum::kNone},
- {CoordinatorStateEnum::kDone, ParticipantsToNotifyEnum::kNone},
- {CoordinatorStateEnum::kError, ParticipantsToNotifyEnum::kDonors},
-};
-
/**
- * Executes metadata changes in a transaction.
+ * Executes metadata changes in a transaction without bumping the collection version.
*/
void executeMetadataChangesInTxn(
OperationContext* opCtx,
@@ -553,73 +515,6 @@ void executeMetadataChangesInTxn(
changeMetadataFunc(opCtx, txnNumber);
});
}
-
-/**
- * Runs resharding metadata changes in a transaction.
- *
- * This function should only be called if donor and recipient shards DO NOT need to be informed of
- * the updatedCoordinatorDoc's state transition. If donor or recipient shards need to be informed,
- * instead call bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn().
- */
-void executeStateTransitionAndMetadataChangesInTxn(
- OperationContext* opCtx,
- const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
- unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
- const auto& state = updatedCoordinatorDoc.getState();
- invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end());
- invariant(notifyForStateTransition[state] == ParticipantsToNotifyEnum::kNone);
-
- // Neither donors nor recipients need to be informed of the transition to
- // updatedCoordinatorDoc's state.
- executeMetadataChangesInTxn(opCtx, std::move(changeMetadataFunc));
-}
-
-/**
- * In a single transaction, bumps the shard version for each shard spanning the corresponding
- * resharding collection and executes changeMetadataFunc.
- *
- * This function should only be called if donor or recipient shards need to be informed of the
- * updatedCoordinatorDoc's state transition. If donor or recipient shards do not need to be
- * informed, instead call executeStateTransitionAndMetadataChangesInTxn().
- */
-void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
- OperationContext* opCtx,
- const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
- unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
- const auto& state = updatedCoordinatorDoc.getState();
- invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end());
- invariant(notifyForStateTransition[state] != ParticipantsToNotifyEnum::kNone);
-
- auto participantsToNotify = notifyForStateTransition[state];
- if (participantsToNotify == ParticipantsToNotifyEnum::kDonors) {
- // Bump the donor shard versions for the original namespace along with updating the
- // metadata.
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx,
- updatedCoordinatorDoc.getSourceNss(),
- resharding::extractShardIds(updatedCoordinatorDoc.getDonorShards()),
- std::move(changeMetadataFunc));
- } else if (participantsToNotify == ParticipantsToNotifyEnum::kRecipients) {
- // Bump the recipient shard versions for the temporary resharding namespace along with
- // updating the metadata.
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx,
- updatedCoordinatorDoc.getTempReshardingNss(),
- resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()),
- std::move(changeMetadataFunc));
- } else if (participantsToNotify ==
- ParticipantsToNotifyEnum::kAllParticipantsPostDecisionPersisted) {
- // Bump the recipient shard versions for the original resharding namespace along with
- // updating the metadata. Only the recipient shards will have chunks for the namespace after
- // the coordinator is in state kDecisionPersisted, bumping chunk versions on the donor
- // shards would not apply.
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx,
- updatedCoordinatorDoc.getSourceNss(),
- resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()),
- std::move(changeMetadataFunc));
- }
-}
} // namespace
namespace resharding {
@@ -653,29 +548,30 @@ void insertCoordDocAndChangeOrigCollEntry(OperationContext* opCtx,
opCtx, coordinatorDoc.getSourceNss(), repl::ReadConcernLevel::kMajorityReadConcern);
const auto collation = originalCollType.getDefaultCollation();
- executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) {
- // Insert the coordinator document to config.reshardingOperations.
- invariant(coordinatorDoc.getActive());
- try {
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
- auto extraInfo = ex.extraInfo<DuplicateKeyErrorInfo>();
- if (extraInfo->getKeyPattern().woCompare(BSON("active" << 1)) == 0) {
- uasserted(ErrorCodes::ReshardCollectionInProgress,
- str::stream()
- << "Only one resharding operation is allowed to be active at a "
- "time, aborting resharding op for "
- << coordinatorDoc.getSourceNss());
- }
+ ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn(
+ opCtx, coordinatorDoc.getSourceNss(), [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Insert the coordinator document to config.reshardingOperations.
+ invariant(coordinatorDoc.getActive());
+ try {
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
+ auto extraInfo = ex.extraInfo<DuplicateKeyErrorInfo>();
+ if (extraInfo->getKeyPattern().woCompare(BSON("active" << 1)) == 0) {
+ uasserted(ErrorCodes::ReshardCollectionInProgress,
+ str::stream()
+ << "Only one resharding operation is allowed to be active at a "
+ "time, aborting resharding op for "
+ << coordinatorDoc.getSourceNss());
+ }
- throw;
- }
+ throw;
+ }
- // Update the config.collections entry for the original collection to include
- // 'reshardingFields'
- updateConfigCollectionsForOriginalNss(
- opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
- });
+ // Update the config.collections entry for the original collection to include
+ // 'reshardingFields'
+ updateConfigCollectionsForOriginalNss(
+ opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+ });
}
ChunkVersion calculateChunkVersionForInitialChunks(OperationContext* opCtx) {
@@ -787,8 +683,10 @@ void writeParticipantShardsAndTempCollInfo(
const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
std::vector<ChunkType> initialChunks,
std::vector<BSONObj> zones) {
- bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
- opCtx, updatedCoordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn(
+ opCtx,
+ updatedCoordinatorDoc.getSourceNss(),
+ [&](OperationContext* opCtx, TxnNumber txnNumber) {
// Update on-disk state to reflect latest state transition.
writeToCoordinatorStateNss(opCtx, updatedCoordinatorDoc, txnNumber);
updateConfigCollectionsForOriginalNss(
@@ -808,61 +706,60 @@ void writeDecisionPersistedState(OperationContext* opCtx,
const ReshardingCoordinatorDocument& coordinatorDoc,
OID newCollectionEpoch,
boost::optional<Timestamp> newCollectionTimestamp) {
- executeStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
- // Update the config.reshardingOperations entry
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ // No need to bump originalNss version because its epoch will be changed.
+ executeMetadataChangesInTxn(opCtx, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Update the config.reshardingOperations entry
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- // Remove the config.collections entry for the temporary collection
- writeToConfigCollectionsForTempNss(
- opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+ // Remove the config.collections entry for the temporary collection
+ writeToConfigCollectionsForTempNss(
+ opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
- // Update the config.collections entry for the original namespace to reflect the new
- // shard key, new epoch, and new UUID
- updateConfigCollectionsForOriginalNss(
- opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
-
- // Remove all chunk and tag documents associated with the original collection, then
- // update the chunk and tag docs currently associated with the temp nss to be associated
- // with the original nss
- removeChunkAndTagsDocsForOriginalNss(
- opCtx, coordinatorDoc, newCollectionTimestamp, txnNumber);
- updateChunkAndTagsDocsForTempNss(
- opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
- });
+ // Update the config.collections entry for the original namespace to reflect the new
+ // shard key, new epoch, and new UUID
+ updateConfigCollectionsForOriginalNss(
+ opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
+
+ // Remove all chunk and tag documents associated with the original collection, then
+ // update the chunk and tag docs currently associated with the temp nss to be associated
+ // with the original nss
+ removeChunkAndTagsDocsForOriginalNss(
+ opCtx, coordinatorDoc, newCollectionTimestamp, txnNumber);
+ updateChunkAndTagsDocsForTempNss(
+ opCtx, coordinatorDoc, newCollectionEpoch, newCollectionTimestamp, txnNumber);
+ });
}
void writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(
OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) {
// Run updates to config.reshardingOperations and config.collections in a transaction
auto nextState = coordinatorDoc.getState();
- invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end());
- invariant(notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone,
- "failed to write state transition with nextState {}"_format(
- CoordinatorState_serializer(nextState)));
- // Resharding metadata changes to be executed.
- auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) {
- // Update the config.reshardingOperations entry
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ std::vector<NamespaceString> collNames = {coordinatorDoc.getSourceNss()};
+ if (nextState < CoordinatorStateEnum::kDecisionPersisted ||
+ nextState == CoordinatorStateEnum::kError) {
+ collNames.emplace_back(coordinatorDoc.getTempReshardingNss());
+ }
- // Update the config.collections entry for the original collection
- updateConfigCollectionsForOriginalNss(
- opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+ ShardingCatalogManager::get(opCtx)->bumpMultipleCollectionVersionsAndChangeMetadataInTxn(
+ opCtx, collNames, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Update the config.reshardingOperations entry
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- // Update the config.collections entry for the temporary resharding collection. If we've
- // already persisted the decision that the operation will succeed, we've removed the entry
- // for the temporary collection and updated the entry with original namespace to have the
- // new shard key, UUID, and epoch
- if (nextState < CoordinatorStateEnum::kDecisionPersisted ||
- nextState == CoordinatorStateEnum::kError) {
- writeToConfigCollectionsForTempNss(
+ // Update the config.collections entry for the original collection
+ updateConfigCollectionsForOriginalNss(
opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
- }
- };
- bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, std::move(changeMetadataFunc));
+ // Update the config.collections entry for the temporary resharding collection. If we've
+ // already persisted the decision that the operation will succeed, we've removed the
+ // entry for the temporary collection and updated the entry with original namespace to
+ // have the new shard key, UUID, and epoch
+ if (nextState < CoordinatorStateEnum::kDecisionPersisted ||
+ nextState == CoordinatorStateEnum::kError) {
+ writeToConfigCollectionsForTempNss(
+ opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+ }
+ });
}
void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
@@ -872,8 +769,10 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDone);
- executeStateTransitionAndMetadataChangesInTxn(
- opCtx, updatedCoordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn(
+ opCtx,
+ updatedCoordinatorDoc.getSourceNss(),
+ [&](OperationContext* opCtx, TxnNumber txnNumber) {
// Remove entry for this resharding operation from config.reshardingOperations
writeToCoordinatorStateNss(opCtx, updatedCoordinatorDoc, txnNumber);