diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-03-09 02:45:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-09 03:52:02 +0000 |
commit | 8f5b19480757929c98a5d7675c4c1762b32b09e9 (patch) | |
tree | 55f9ecf39e834f240148054db54d4a4061fcb3ac /src/mongo/db/s | |
parent | d4393f8baef001b8bd25ccd32055a156bb155777 (diff) | |
download | mongo-8f5b19480757929c98a5d7675c4c1762b32b09e9.tar.gz |
SERVER-54981 Add recipientShards field to resharding donor document.
Changes "reshardingFields.donorFields" to be filled in as part of the
coordinator's transition to kPreparingToDonate.
Diffstat (limited to 'src/mongo/db/s')
11 files changed, 91 insertions, 46 deletions
diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp index d3b585f9d90..5bbf258a5ab 100644 --- a/src/mongo/db/s/collection_metadata_test.cpp +++ b/src/mongo/db/s/collection_metadata_test.cpp @@ -123,7 +123,8 @@ protected: } else if (state == CoordinatorStateEnum::kBlockingWrites) { TypeCollectionDonorFields donorFields{ constructTemporaryReshardingNss(kNss.db(), existingUuid), - KeyPattern{BSON("newKey" << 1)}}; + KeyPattern{BSON("newKey" << 1)}, + {kThisShard, kOtherShard}}; reshardingFields.setDonorFields(std::move(donorFields)); } diff --git a/src/mongo/db/s/resharding/donor_document.idl b/src/mongo/db/s/resharding/donor_document.idl index c282f93874c..c2847f01356 100644 --- a/src/mongo/db/s/resharding/donor_document.idl +++ b/src/mongo/db/s/resharding/donor_document.idl @@ -50,3 +50,6 @@ structs: # We intentionally have the mutable state nested in a subobject to make it easy to # overwrite with a single $set. mutableState: DonorShardContext + recipientShards: + type: array<shard_id> + description: "The list of recipient shards that are replicating from this donor." diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index e20b8f447ec..d1f046a4903 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -192,25 +192,12 @@ void writeToCoordinatorStateNss(OperationContext* opCtx, } /** - * Extracts the ShardId from each Donor/RecipientShardEntry in participantShardEntries. - */ -template <class T> -std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntries) { - std::vector<ShardId> shardIds(participantShardEntries.size()); - std::transform(participantShardEntries.begin(), - participantShardEntries.end(), - shardIds.begin(), - [](auto& shardEntry) { return shardEntry.getId(); }); - return shardIds; -} - -/** * Creates reshardingFields.recipientFields for the resharding operation. Note: these should not * change once the operation has begun. */ TypeCollectionRecipientFields constructRecipientFields( const ReshardingCoordinatorDocument& coordinatorDoc) { - auto donorShardIds = extractShardIds(coordinatorDoc.getDonorShards()); + auto donorShardIds = resharding::extractShardIds(coordinatorDoc.getDonorShards()); TypeCollectionRecipientFields recipientFields( std::move(donorShardIds), coordinatorDoc.getSourceUUID(), coordinatorDoc.getSourceNss()); emplaceFetchTimestampIfExists(recipientFields, coordinatorDoc.getFetchTimestamp()); @@ -229,9 +216,6 @@ BSONObj createReshardingFieldsUpdateForOriginalNss( TypeCollectionReshardingFields originalEntryReshardingFields( coordinatorDoc.getReshardingUUID()); originalEntryReshardingFields.setState(coordinatorDoc.getState()); - TypeCollectionDonorFields donorField(coordinatorDoc.getTempReshardingNss(), - coordinatorDoc.getReshardingKey()); - originalEntryReshardingFields.setDonorFields(donorField); return BSON("$set" << BSON(CollectionType::kReshardingFieldsFieldName << originalEntryReshardingFields.toBSON() @@ -239,6 +223,31 @@ BSONObj createReshardingFieldsUpdateForOriginalNss( << opCtx->getServiceContext()->getPreciseClockSource()->now() << CollectionType::kAllowMigrationsFieldName << false)); } + case CoordinatorStateEnum::kPreparingToDonate: { + TypeCollectionDonorFields donorFields( + coordinatorDoc.getTempReshardingNss(), + coordinatorDoc.getReshardingKey(), + resharding::extractShardIds(coordinatorDoc.getRecipientShards())); + + BSONObjBuilder updateBuilder; + { + BSONObjBuilder setBuilder(updateBuilder.subobjStart("$set")); + { + setBuilder.append(CollectionType::kReshardingFieldsFieldName + "." + + TypeCollectionReshardingFields::kStateFieldName, + CoordinatorState_serializer(nextState)); + + setBuilder.append(CollectionType::kReshardingFieldsFieldName + "." + + TypeCollectionReshardingFields::kDonorFieldsFieldName, + donorFields.toBSON()); + + setBuilder.append(CollectionType::kUpdatedAtFieldName, + opCtx->getServiceContext()->getPreciseClockSource()->now()); + } + } + + return updateBuilder.obj(); + } case CoordinatorStateEnum::kDecisionPersisted: { // Update the config.collections entry for the original nss to reflect // the new sharded collection. Set 'uuid' to the reshardingUUID, 'key' to the new shard @@ -588,7 +597,7 @@ void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( opCtx, updatedCoordinatorDoc.getSourceNss(), - extractShardIds(updatedCoordinatorDoc.getDonorShards()), + resharding::extractShardIds(updatedCoordinatorDoc.getDonorShards()), std::move(changeMetadataFunc)); } else if (participantsToNotify == ParticipantsToNotifyEnum::kRecipients) { // Bump the recipient shard versions for the temporary resharding namespace along with @@ -596,7 +605,7 @@ void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( opCtx, updatedCoordinatorDoc.getTempReshardingNss(), - extractShardIds(updatedCoordinatorDoc.getRecipientShards()), + resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()), std::move(changeMetadataFunc)); } else if (participantsToNotify == ParticipantsToNotifyEnum::kAllParticipantsPostDecisionPersisted) { @@ -607,7 +616,7 @@ void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( opCtx, updatedCoordinatorDoc.getSourceNss(), - extractShardIds(updatedCoordinatorDoc.getRecipientShards()), + resharding::extractShardIds(updatedCoordinatorDoc.getRecipientShards()), std::move(changeMetadataFunc)); } } @@ -1354,7 +1363,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { auto opCtx = cc().makeOperationContext(); - auto recipientIds = extractShardIds(_coordinatorDoc.getRecipientShards()); + auto recipientIds = resharding::extractShardIds(_coordinatorDoc.getRecipientShards()); NamespaceString nssToRefresh; // Refresh the temporary namespace if the coordinator is in state 'kError' just in case the @@ -1375,7 +1384,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { auto opCtx = cc().makeOperationContext(); - auto donorIds = extractShardIds(_coordinatorDoc.getDonorShards()); + auto donorIds = resharding::extractShardIds(_coordinatorDoc.getDonorShards()); sharding_util::tellShardsToRefreshCollection( opCtx.get(), donorIds, _coordinatorDoc.getSourceNss(), **executor); } @@ -1384,8 +1393,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsTo const BSONObj& refreshCmd, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { auto opCtx = cc().makeOperationContext(); - auto donorShardIds = extractShardIds(_coordinatorDoc.getDonorShards()); - auto recipientShardIds = extractShardIds(_coordinatorDoc.getRecipientShards()); + auto donorShardIds = resharding::extractShardIds(_coordinatorDoc.getDonorShards()); + auto recipientShardIds = resharding::extractShardIds(_coordinatorDoc.getRecipientShards()); std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()}; participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end()); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index be8447a68f2..1e689b2781b 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -78,6 +78,19 @@ void writeStateTransitionAndCatalogUpdatesThenBumpShardVersions( void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc); +/** + * Extracts the ShardId from each Donor/RecipientShardEntry in participantShardEntries. + */ +template <class T> +std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntries) { + std::vector<ShardId> shardIds(participantShardEntries.size()); + std::transform(participantShardEntries.begin(), + participantShardEntries.end(), + shardIds.begin(), + [](auto& shardEntry) { return shardEntry.getId(); }); + return shardIds; +} + } // namespace resharding class ServiceContext; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index 99f8285d2b8..67450a78bed 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -198,7 +198,9 @@ protected: TypeCollectionReshardingFields reshardingFields(coordinatorDoc.getReshardingUUID()); reshardingFields.setState(coordinatorDoc.getState()); reshardingFields.setDonorFields(TypeCollectionDonorFields( - coordinatorDoc.getTempReshardingNss(), coordinatorDoc.getReshardingKey())); + coordinatorDoc.getTempReshardingNss(), + coordinatorDoc.getReshardingKey(), + resharding::extractShardIds(coordinatorDoc.getRecipientShards()))); auto originalNssCatalogEntry = makeOriginalCollectionCatalogEntry( coordinatorDoc, @@ -470,8 +472,10 @@ protected: TypeCollectionReshardingFields expectedReshardingFields( expectedCoordinatorDoc.getReshardingUUID()); expectedReshardingFields.setState(expectedCoordinatorDoc.getState()); - TypeCollectionDonorFields donorField(expectedCoordinatorDoc.getTempReshardingNss(), - expectedCoordinatorDoc.getReshardingKey()); + TypeCollectionDonorFields donorField( + expectedCoordinatorDoc.getTempReshardingNss(), + expectedCoordinatorDoc.getReshardingKey(), + resharding::extractShardIds(expectedCoordinatorDoc.getRecipientShards())); expectedReshardingFields.setDonorFields(donorField); if (auto abortReason = expectedCoordinatorDoc.getAbortReason()) { AbortReason abortReasonStruct; @@ -518,9 +522,10 @@ protected: TypeCollectionReshardingFields reshardingFields( expectedCoordinatorDoc.getReshardingUUID()); reshardingFields.setState(expectedCoordinatorDoc.getState()); - reshardingFields.setDonorFields( - TypeCollectionDonorFields(expectedCoordinatorDoc.getTempReshardingNss(), - expectedCoordinatorDoc.getReshardingKey())); + reshardingFields.setDonorFields(TypeCollectionDonorFields( + expectedCoordinatorDoc.getTempReshardingNss(), + expectedCoordinatorDoc.getReshardingKey(), + resharding::extractShardIds(expectedCoordinatorDoc.getRecipientShards()))); auto originalNssCatalogEntry = makeOriginalCollectionCatalogEntry( expectedCoordinatorDoc, diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 0ad26e7eae1..18d10d50547 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -261,18 +261,28 @@ void processReshardingFieldsForRecipientCollection(OperationContext* opCtx, void verifyValidReshardingFields(const ReshardingFields& reshardingFields) { auto coordinatorState = reshardingFields.getState(); - if (coordinatorState < CoordinatorStateEnum::kDecisionPersisted) { + if (coordinatorState < CoordinatorStateEnum::kPreparingToDonate) { + // Prior to the state CoordinatorStateEnum::kPreparingToDonate, the source collection's + // config.collections entry won't have "donorFields". Additionally, the temporary resharding + // collection's config.collections entry won't exist yet. + uassert(5498100, + fmt::format("reshardingFields must not contain donorFields or recipientFields when" + " the coordinator is in state {}. Got reshardingFields {}", + CoordinatorState_serializer(reshardingFields.getState()), + reshardingFields.toBSON().toString()), + !reshardingFields.getDonorFields() && !reshardingFields.getRecipientFields()); + } else if (coordinatorState < CoordinatorStateEnum::kDecisionPersisted) { // Prior to the state CoordinatorStateEnum::kDecisionPersisted, only the source // collection's config.collections entry should have donorFields, and only the // temporary resharding collection's entry should have recipientFields. uassert(5274201, - fmt::format("reshardingFields must contain either donorFields or recipientFields " - "(and not both) when the " - "coordinator is in state {}. Got reshardingFields {}", + fmt::format("reshardingFields must contain exactly one of donorFields and" + " recipientFields when the coordinator is in state {}. Got" + " reshardingFields {}", CoordinatorState_serializer(reshardingFields.getState()), reshardingFields.toBSON().toString()), - reshardingFields.getDonorFields().is_initialized() ^ - reshardingFields.getRecipientFields().is_initialized()); + bool(reshardingFields.getDonorFields()) != + bool(reshardingFields.getRecipientFields())); } else { // At and after state CoordinatorStateEnum::kDecisionPersisted, the temporary // resharding collection's config.collections entry has been removed, and so the @@ -295,7 +305,8 @@ ReshardingDonorDocument constructDonorDocumentFromReshardingFields( DonorShardContext donorCtx; donorCtx.setState(DonorStateEnum::kPreparingToDonate); - auto donorDoc = ReshardingDonorDocument{std::move(donorCtx)}; + auto donorDoc = ReshardingDonorDocument{ + std::move(donorCtx), reshardingFields.getDonorFields()->getRecipientShardIds()}; auto sourceUUID = getCollectionUUIDFromChunkManger(nss, *metadata.getChunkManager()); auto commonMetadata = diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h index 790f6eb5ddb..6deff9cafdd 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h @@ -135,7 +135,8 @@ protected: void appendDonorFieldsToReshardingFields(ReshardingFields& fields, const BSONObj& reshardingKey) { - fields.setDonorFields(TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey)); + fields.setDonorFields( + TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey, kShardIds)); } void appendRecipientFieldsToReshardingFields( diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 79c16d97ddf..05aaf0b4d05 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -142,6 +142,7 @@ ReshardingDonorService::DonorStateMachine::DonorStateMachine( const ReshardingDonorDocument& donorDoc) : repl::PrimaryOnlyService::TypedInstance<DonorStateMachine>(), _metadata{donorDoc.getCommonReshardingMetadata()}, + _recipientShardIds{donorDoc.getRecipientShards()}, _donorCtx{donorDoc.getMutableState()} {} ReshardingDonorService::DonorStateMachine::~DonorStateMachine() { @@ -373,10 +374,7 @@ void ReshardingDonorService::DonorStateMachine:: try { Timer latency; - const auto recipients = - getRecipientShards(rawOpCtx, _metadata.getSourceNss(), _metadata.getSourceUUID()); - - for (const auto& recipient : recipients) { + for (const auto& recipient : _recipientShardIds) { auto oplog = generateOplogEntry(recipient); writeConflictRetry( rawOpCtx, @@ -403,7 +401,7 @@ void ReshardingDonorService::DonorStateMachine:: "Committed oplog entries to temporarily block writes for resharding", "namespace"_attr = _metadata.getSourceNss(), "reshardingUUID"_attr = _metadata.getReshardingUUID(), - "numRecipients"_attr = recipients.size(), + "numRecipients"_attr = _recipientShardIds.size(), "duration"_attr = duration_cast<Milliseconds>(latency.elapsed())); ensureFulfilledPromise(lg, _finalOplogEntriesWritten); } diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 1f73febf005..da1296b562d 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -151,6 +151,7 @@ private: // The in-memory representation of the immutable portion of the document in // config.localReshardingOperations.donor. const CommonReshardingMetadata _metadata; + const std::vector<ShardId> _recipientShardIds; // The in-memory representation of the mutable portion of the document in // config.localReshardingOperations.donor. diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp index 6f686c59731..1c8e5842eac 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -193,7 +193,7 @@ TEST_F(ReshardingDonorServiceTest, DonorShardContext donorCtx; donorCtx.setState(DonorStateEnum::kPreparingToBlockWrites); - ReshardingDonorDocument doc(std::move(donorCtx)); + ReshardingDonorDocument doc(std::move(donorCtx), kRecipientShards); CommonReshardingMetadata metadata(kReshardingUUID, mongo::NamespaceString(kReshardNs), kExistingUUID, diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp index 7d932c4eb0e..ec16834362b 100644 --- a/src/mongo/db/s/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp @@ -201,7 +201,10 @@ protected: TypeCollectionReshardingFields reshardingFields; reshardingFields.setReshardingUUID(UUID::gen()); - reshardingFields.setDonorFields(TypeCollectionDonorFields{env.tempNss, BSON("y" << 1)}); + reshardingFields.setDonorFields(TypeCollectionDonorFields{ + env.tempNss, + BSON("y" << 1), + {ShardId{kShardList[0].getName()}, ShardId{kShardList[1].getName()}}}); reshardingFields.setState(CoordinatorStateEnum::kPreparingToDonate); CollectionType coll(kNss, env.version.epoch(), Date_t::now(), UUID::gen()); |