diff options
Diffstat (limited to 'src/mongo/db')
18 files changed, 279 insertions, 188 deletions
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 784e2327bed..3e4ebaca039 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -121,6 +121,9 @@ const NamespaceString NamespaceString::kReshardingTxnClonerProgressNamespace( const NamespaceString NamespaceString::kCollectionCriticalSectionsNamespace( NamespaceString::kConfigDb, "collection_critical_sections"); +const NamespaceString NamespaceString::kForceOplogBatchBoundaryNamespace( + NamespaceString::kConfigDb, "system.forceOplogBatchBoundary"); + bool NamespaceString::isListCollectionsCursorNS() const { return coll() == listCollectionsCursorCol; } @@ -185,7 +188,8 @@ bool NamespaceString::isLegalClientSystemNS( * processing each operation matches the primary's when committing that operation. */ bool NamespaceString::mustBeAppliedInOwnOplogBatch() const { - return isSystemDotViews() || isServerConfigurationCollection() || isPrivilegeCollection(); + return isSystemDotViews() || isServerConfigurationCollection() || isPrivilegeCollection() || + _ns == kForceOplogBatchBoundaryNamespace.ns(); } NamespaceString NamespaceString::makeListCollectionsNSS(StringData dbName) { diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 4a32d0891f9..5545129a4ff 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -172,6 +172,9 @@ public: // Namespace for storing config.collectionCriticalSections documents static const NamespaceString kCollectionCriticalSectionsNamespace; + // Dummy namespace used for forcing secondaries to handle an oplog entry on its own batch. + static const NamespaceString kForceOplogBatchBoundaryNamespace; + /** * Constructs an empty NamespaceString. */ diff --git a/src/mongo/db/s/resharding/coordinator_document.idl b/src/mongo/db/s/resharding/coordinator_document.idl index 3cadd2a63ff..a742988cde9 100644 --- a/src/mongo/db/s/resharding/coordinator_document.idl +++ b/src/mongo/db/s/resharding/coordinator_document.idl @@ -69,7 +69,6 @@ structs: inline_chained_structs: true chained_structs: CommonReshardingMetadata: CommonReshardingMetadata - FetchTimestamp: FetchTimestampStruct ReshardingApproxCopySize: ReshardingApproxCopySizeStruct AbortReason: AbortReasonStruct generate_comparison_operators: false @@ -79,6 +78,10 @@ structs: strict: false fields: state: CoordinatorState + cloneTimestamp: + type: timestamp + description: "The timestamp for the snapshot read while cloning from the donors." + optional: true donorShards: type: array<DonorShardEntry> description: "The list of all donor shards for this resharding operation." diff --git a/src/mongo/db/s/resharding/recipient_document.idl b/src/mongo/db/s/resharding/recipient_document.idl index 5efe3d36deb..5d2fe08ce8a 100644 --- a/src/mongo/db/s/resharding/recipient_document.idl +++ b/src/mongo/db/s/resharding/recipient_document.idl @@ -41,18 +41,21 @@ structs: inline_chained_structs: true chained_structs: CommonReshardingMetadata: CommonReshardingMetadata - FetchTimestamp: FetchTimestampStruct generate_comparison_operators: false # Use strict:false to avoid complications around upgrade/downgrade. This isn't technically # required for resharding because durable state from all resharding operations is cleaned up # before the upgrade or downgrade can complete. strict: false fields: + cloneTimestamp: + type: timestamp + optional: true + description: "The timestamp for the snapshot read while cloning from the donors." # We intentionally have the mutable state nested in a subobject to make it easy to # overwrite with a single $set. mutableState: RecipientShardContext donorShards: - type: array<shard_id> + type: array<DonorShardFetchTimestamp> description: "The list of donor shards that report to this recipient." minimumOperationDurationMillis: type: long diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 35df4d6b38b..ff5d149f8d8 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -140,10 +140,10 @@ void writeToCoordinatorStateNss(OperationContext* opCtx, setBuilder.append(ReshardingCoordinatorDocument::kStateFieldName, CoordinatorState_serializer(coordinatorDoc.getState())); - if (auto fetchTimestamp = coordinatorDoc.getFetchTimestamp()) { - // If the fetchTimestamp exists, include it in the update. - setBuilder.append(ReshardingCoordinatorDocument::kFetchTimestampFieldName, - *fetchTimestamp); + if (auto cloneTimestamp = coordinatorDoc.getCloneTimestamp()) { + // If the cloneTimestamp exists, include it in the update. + setBuilder.append(ReshardingCoordinatorDocument::kCloneTimestampFieldName, + *cloneTimestamp); } if (auto abortReason = coordinatorDoc.getAbortReason()) { @@ -201,13 +201,21 @@ void writeToCoordinatorStateNss(OperationContext* opCtx, */ TypeCollectionRecipientFields constructRecipientFields( const ReshardingCoordinatorDocument& coordinatorDoc) { + std::vector<DonorShardFetchTimestamp> donorShards; + + for (const auto& donor : coordinatorDoc.getDonorShards()) { + DonorShardFetchTimestamp donorFetchTimestamp(donor.getId()); + donorFetchTimestamp.setMinFetchTimestamp(donor.getMutableState().getMinFetchTimestamp()); + donorShards.push_back(std::move(donorFetchTimestamp)); + } + TypeCollectionRecipientFields recipientFields( - extractShardIdsFromParticipantEntries(coordinatorDoc.getDonorShards()), + std::move(donorShards), coordinatorDoc.getSourceUUID(), coordinatorDoc.getSourceNss(), resharding::gReshardingMinimumOperationDurationMillis.load()); - emplaceFetchTimestampIfExists(recipientFields, coordinatorDoc.getFetchTimestamp()); + emplaceCloneTimestampIfExists(recipientFields, coordinatorDoc.getCloneTimestamp()); return recipientFields; } @@ -346,9 +354,18 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx, return BatchedCommandRequest::buildInsertOp( CollectionType::ConfigNS, std::vector<BSONObj>{collType.toBSON()}); } - case CoordinatorStateEnum::kCloning: - // Update the 'state' and 'fetchTimestamp' fields in the + case CoordinatorStateEnum::kCloning: { + // Update the 'state', 'donorShards' and 'cloneTimestamp' fields in the // 'reshardingFields.recipient' section + + BSONArrayBuilder donorShardsBuilder; + for (const auto& donor : coordinatorDoc.getDonorShards()) { + DonorShardFetchTimestamp donorShardFetchTimestamp(donor.getId()); + donorShardFetchTimestamp.setMinFetchTimestamp( + donor.getMutableState().getMinFetchTimestamp()); + donorShardsBuilder.append(donorShardFetchTimestamp.toBSON()); + } + return BatchedCommandRequest::buildUpdateOp( CollectionType::ConfigNS, BSON(CollectionType::kNssFieldName @@ -356,12 +373,15 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx, BSON("$set" << BSON( "reshardingFields.state" << CoordinatorState_serializer(nextState).toString() - << "reshardingFields.recipientFields.fetchTimestamp" - << coordinatorDoc.getFetchTimestamp().get() << "lastmod" + << "reshardingFields.recipientFields.cloneTimestamp" + << coordinatorDoc.getCloneTimestamp().get() + << "reshardingFields.recipientFields.donorShards" + << donorShardsBuilder.arr() << "lastmod" << opCtx->getServiceContext()->getPreciseClockSource()->now())), false, // upsert false // multi ); + } case CoordinatorStateEnum::kDecisionPersisted: // Remove the entry for the temporary nss return BatchedCommandRequest::buildDeleteOp( @@ -1396,13 +1416,13 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: _updateCoordinatorDocStateAndCatalogEntries( CoordinatorStateEnum nextState, ReshardingCoordinatorDocument coordinatorDoc, - boost::optional<Timestamp> fetchTimestamp, + boost::optional<Timestamp> cloneTimestamp, boost::optional<ReshardingApproxCopySize> approxCopySize, boost::optional<Status> abortReason) { // Build new state doc for coordinator state update ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; updatedCoordinatorDoc.setState(nextState); - emplaceFetchTimestampIfExists(updatedCoordinatorDoc, std::move(fetchTimestamp)); + emplaceCloneTimestampIfExists(updatedCoordinatorDoc, std::move(cloneTimestamp)); emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize)); emplaceAbortReasonIfExists(updatedCoordinatorDoc, abortReason); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index 9f3df91fc42..18f09fc948f 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -329,7 +329,7 @@ private: void _updateCoordinatorDocStateAndCatalogEntries( CoordinatorStateEnum nextState, ReshardingCoordinatorDocument coordinatorDoc, - boost::optional<Timestamp> fetchTimestamp = boost::none, + boost::optional<Timestamp> cloneTimestamp = boost::none, boost::optional<ReshardingApproxCopySize> approxCopySize = boost::none, boost::optional<Status> abortReason = boost::none); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index 45a063c20d2..75d8d961ad5 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -90,7 +90,7 @@ protected: {DonorShardEntry(ShardId("shard0000"), {})}, {RecipientShardEntry(ShardId("shard0001"), {})}); doc.setCommonReshardingMetadata(meta); - emplaceFetchTimestampIfExists(doc, std::move(fetchTimestamp)); + emplaceCloneTimestampIfExists(doc, std::move(fetchTimestamp)); return doc; } @@ -251,12 +251,12 @@ protected: 0); ASSERT(coordinatorDoc.getState() == expectedCoordinatorDoc.getState()); ASSERT(coordinatorDoc.getActive()); - if (expectedCoordinatorDoc.getFetchTimestamp()) { - ASSERT(coordinatorDoc.getFetchTimestamp()); - ASSERT_EQUALS(coordinatorDoc.getFetchTimestamp().get(), - expectedCoordinatorDoc.getFetchTimestamp().get()); + if (expectedCoordinatorDoc.getCloneTimestamp()) { + ASSERT(coordinatorDoc.getCloneTimestamp()); + ASSERT_EQUALS(coordinatorDoc.getCloneTimestamp().get(), + expectedCoordinatorDoc.getCloneTimestamp().get()); } else { - ASSERT(!coordinatorDoc.getFetchTimestamp()); + ASSERT(!coordinatorDoc.getCloneTimestamp()); } // Confirm the (non)existence of the CoordinatorDocument abortReason. @@ -398,12 +398,12 @@ protected: ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getSourceNss(), expectedReshardingFields.getRecipientFields()->getSourceNss()); - if (expectedReshardingFields.getRecipientFields()->getFetchTimestamp()) { - ASSERT(onDiskReshardingFields.getRecipientFields()->getFetchTimestamp()); - ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getFetchTimestamp().get(), - expectedReshardingFields.getRecipientFields()->getFetchTimestamp().get()); + if (expectedReshardingFields.getRecipientFields()->getCloneTimestamp()) { + ASSERT(onDiskReshardingFields.getRecipientFields()->getCloneTimestamp()); + ASSERT_EQUALS(onDiskReshardingFields.getRecipientFields()->getCloneTimestamp().get(), + expectedReshardingFields.getRecipientFields()->getCloneTimestamp().get()); } else { - ASSERT(!onDiskReshardingFields.getRecipientFields()->getFetchTimestamp()); + ASSERT(!onDiskReshardingFields.getRecipientFields()->getCloneTimestamp()); } if (onDiskReshardingFields.getState() == CoordinatorStateEnum::kError) { @@ -761,7 +761,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWithFetchTimestampSu // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; expectedCoordinatorDoc.setState(CoordinatorStateEnum::kCloning); - emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1)); + emplaceCloneTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1)); writeStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); assertChunkVersionIncreasedAfterStateTransition(donorChunk, donorCollectionVersion); diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index 4ab52148ae7..cb2956e2085 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -96,14 +96,14 @@ std::unique_ptr<ReshardingCollectionCloner> ReshardingDataReplication::_makeColl std::vector<std::unique_ptr<ReshardingTxnCloner>> ReshardingDataReplication::_makeTxnCloners( const CommonReshardingMetadata& metadata, - const std::vector<ShardId>& donorShardIds, - Timestamp fetchTimestamp) { + const std::vector<DonorShardFetchTimestamp>& donorShards) { std::vector<std::unique_ptr<ReshardingTxnCloner>> txnCloners; - txnCloners.reserve(donorShardIds.size()); + txnCloners.reserve(donorShards.size()); - for (const auto& donor : donorShardIds) { + for (const auto& donor : donorShards) { txnCloners.emplace_back(std::make_unique<ReshardingTxnCloner>( - ReshardingSourceId(metadata.getReshardingUUID(), donor), fetchTimestamp)); + ReshardingSourceId(metadata.getReshardingUUID(), donor.getShardId()), + *donor.getMinFetchTimestamp())); } return txnCloners; @@ -113,14 +113,15 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication:: OperationContext* opCtx, ReshardingMetrics* metrics, const CommonReshardingMetadata& metadata, - const std::vector<ShardId>& donorShardIds, - Timestamp fetchTimestamp, + const std::vector<DonorShardFetchTimestamp>& donorShards, const ShardId& myShardId) { std::vector<std::unique_ptr<ReshardingOplogFetcher>> oplogFetchers; - oplogFetchers.reserve(donorShardIds.size()); + oplogFetchers.reserve(donorShards.size()); - for (const auto& donor : donorShardIds) { - auto oplogBufferNss = getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor); + for (const auto& donor : donorShards) { + auto oplogBufferNss = + getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId()); + auto fetchTimestamp = *donor.getMinFetchTimestamp(); auto idToResumeFrom = resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNss, fetchTimestamp); invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp})); @@ -133,7 +134,7 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication:: // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value. std::move(idToResumeFrom), - donor, + donor.getShardId(), myShardId, std::move(oplogBufferNss))); } @@ -178,23 +179,26 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication:: OperationContext* opCtx, ReshardingMetrics* metrics, CommonReshardingMetadata metadata, - const std::vector<ShardId>& donorShardIds, - Timestamp fetchTimestamp, + const std::vector<DonorShardFetchTimestamp>& donorShards, + Timestamp cloneTimestamp, ChunkManager sourceChunkMgr, std::shared_ptr<executor::TaskExecutor> executor, const std::vector<NamespaceString>& stashCollections, const std::vector<std::unique_ptr<ReshardingOplogFetcher>>& oplogFetchers, const std::vector<std::unique_ptr<ThreadPool>>& oplogApplierWorkers) { std::vector<std::unique_ptr<ReshardingOplogApplier>> oplogAppliers; - oplogAppliers.reserve(donorShardIds.size()); + oplogAppliers.reserve(donorShards.size()); - for (size_t i = 0; i < donorShardIds.size(); ++i) { - auto sourceId = ReshardingSourceId{metadata.getReshardingUUID(), donorShardIds[i]}; - auto idToResumeFrom = resharding::getApplierIdToResumeFrom(opCtx, sourceId, fetchTimestamp); - invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp})); + for (size_t i = 0; i < donorShards.size(); ++i) { + auto sourceId = + ReshardingSourceId{metadata.getReshardingUUID(), donorShards[i].getShardId()}; + auto minOplogTimestamp = *donorShards[i].getMinFetchTimestamp(); + auto idToResumeFrom = + resharding::getApplierIdToResumeFrom(opCtx, sourceId, minOplogTimestamp); + invariant((idToResumeFrom >= ReshardingDonorOplogId{minOplogTimestamp, minOplogTimestamp})); const auto& oplogBufferNss = - getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardIds[i]); + getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShards[i].getShardId()); oplogAppliers.emplace_back(std::make_unique<ReshardingOplogApplier>( std::make_unique<ReshardingOplogApplier::Env>(opCtx->getServiceContext(), metrics), @@ -204,10 +208,10 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication:: metadata.getSourceUUID(), stashCollections, i, - fetchTimestamp, + cloneTimestamp, // The recipient applies oplog entries from the donor starting from the progress value - // in progress_applier. Otherwise, it starts at fetchTimestamp, which corresponds to - // {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value. + // in progress_applier. Otherwise, it starts at cloneTimestamp, which corresponds to + // {clusterTime: cloneTimestamp, ts: cloneTimestamp} as a resume token value. std::make_unique<ReshardingDonorOplogIterator>( oplogBufferNss, std::move(idToResumeFrom), oplogFetchers[i].get()), sourceChunkMgr, @@ -222,8 +226,8 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m OperationContext* opCtx, ReshardingMetrics* metrics, CommonReshardingMetadata metadata, - std::vector<ShardId> donorShardIds, - Timestamp fetchTimestamp, + const std::vector<DonorShardFetchTimestamp>& donorShards, + Timestamp cloneTimestamp, bool cloningDone, ShardId myShardId, ChunkManager sourceChunkMgr, @@ -232,24 +236,23 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m std::vector<std::unique_ptr<ReshardingTxnCloner>> txnCloners; if (!cloningDone) { - collectionCloner = _makeCollectionCloner(metrics, metadata, myShardId, fetchTimestamp); - txnCloners = _makeTxnCloners(metadata, donorShardIds, fetchTimestamp); + collectionCloner = _makeCollectionCloner(metrics, metadata, myShardId, cloneTimestamp); + txnCloners = _makeTxnCloners(metadata, donorShards); } - auto oplogFetchers = - _makeOplogFetchers(opCtx, metrics, metadata, donorShardIds, fetchTimestamp, myShardId); + auto oplogFetchers = _makeOplogFetchers(opCtx, metrics, metadata, donorShards, myShardId); - auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShardIds.size()); - auto oplogApplierWorkers = _makeOplogApplierWorkers(donorShardIds.size()); + auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size()); + auto oplogApplierWorkers = _makeOplogApplierWorkers(donorShards.size()); auto stashCollections = resharding::ensureStashCollectionsExist( - opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShardIds); + opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShards); auto oplogAppliers = _makeOplogAppliers(opCtx, metrics, metadata, - donorShardIds, - fetchTimestamp, + donorShards, + cloneTimestamp, std::move(sourceChunkMgr), std::move(executor), stashCollections, diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h index e1b1d6437f6..48526e52c6b 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.h +++ b/src/mongo/db/s/resharding/resharding_data_replication.h @@ -144,8 +144,8 @@ public: OperationContext* opCtx, ReshardingMetrics* metrics, CommonReshardingMetadata metadata, - std::vector<ShardId> donorShardIds, - Timestamp fetchTimestamp, + const std::vector<DonorShardFetchTimestamp>& donorShards, + Timestamp cloneTimestamp, bool cloningDone, ShardId myShardId, ChunkManager sourceChunkMgr, @@ -189,15 +189,13 @@ private: static std::vector<std::unique_ptr<ReshardingTxnCloner>> _makeTxnCloners( const CommonReshardingMetadata& metadata, - const std::vector<ShardId>& donorShardIds, - Timestamp fetchTimestamp); + const std::vector<DonorShardFetchTimestamp>& donorShards); static std::vector<std::unique_ptr<ReshardingOplogFetcher>> _makeOplogFetchers( OperationContext* opCtx, ReshardingMetrics* metrics, const CommonReshardingMetadata& metadata, - const std::vector<ShardId>& donorShardIds, - Timestamp fetchTimestamp, + const std::vector<DonorShardFetchTimestamp>& donorShards, const ShardId& myShardId); static std::shared_ptr<executor::TaskExecutor> _makeOplogFetcherExecutor(size_t numDonors); @@ -208,8 +206,8 @@ private: OperationContext* opCtx, ReshardingMetrics* metrics, CommonReshardingMetadata metadata, - const std::vector<ShardId>& donorShardIds, - Timestamp fetchTimestamp, + const std::vector<DonorShardFetchTimestamp>& donorShards, + Timestamp cloneTimestamp, ChunkManager sourceChunkMgr, std::shared_ptr<executor::TaskExecutor> executor, const std::vector<NamespaceString>& stashCollections, diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp index 846f451ff52..e2320680222 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -334,14 +334,14 @@ ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( const ReshardingFields& reshardingFields) { // The recipient state machines are created before the donor shards are prepared to donate but // will remain idle until the donor shards are prepared to donate. - invariant(!reshardingFields.getRecipientFields()->getFetchTimestamp()); + invariant(!reshardingFields.getRecipientFields()->getCloneTimestamp()); RecipientShardContext recipientCtx; recipientCtx.setState(RecipientStateEnum::kAwaitingFetchTimestamp); auto recipientDoc = ReshardingRecipientDocument{ std::move(recipientCtx), - reshardingFields.getRecipientFields()->getDonorShardIds(), + reshardingFields.getRecipientFields()->getDonorShards(), reshardingFields.getRecipientFields()->getMinimumOperationDurationMillis()}; auto sourceNss = reshardingFields.getRecipientFields()->getSourceNss(); diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp index fd61fbd7e26..7aab3e61325 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -52,7 +52,7 @@ using namespace fmt::literals; TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromReshardingFields) { OperationContext* opCtx = operationContext(); - auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard); + auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId()); auto reshardingFields = createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); @@ -66,13 +66,13 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromRe TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructRecipientDocumentFromReshardingFields) { OperationContext* opCtx = operationContext(); - auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard); + auto metadata = + makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard.getShardId()); auto reshardingFields = createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); - appendRecipientFieldsToReshardingFields( - reshardingFields, kShardIds, kExistingUUID, kOriginalNss); + appendRecipientFieldsToReshardingFields(reshardingFields, kShards, kExistingUUID, kOriginalNss); auto recipientDoc = resharding::constructRecipientDocumentFromReshardingFields( opCtx, kTemporaryReshardingNss, metadata, reshardingFields); @@ -81,7 +81,7 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) { OperationContext* opCtx = operationContext(); - auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard); + auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId()); auto reshardingFields = createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); @@ -102,12 +102,12 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) { TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) { OperationContext* opCtx = operationContext(); - auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard); + auto metadata = + makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard.getShardId()); auto reshardingFields = createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); - appendRecipientFieldsToReshardingFields( - reshardingFields, kShardIds, kExistingUUID, kOriginalNss); + appendRecipientFieldsToReshardingFields(reshardingFields, kShards, kExistingUUID, kOriginalNss); resharding::processReshardingFieldsForCollection( opCtx, kTemporaryReshardingNss, metadata, reshardingFields); @@ -126,7 +126,7 @@ TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) { TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstanceWithIncorrectCoordinatorState) { OperationContext* opCtx = operationContext(); - auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard); + auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId()); auto reshardingFields = createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kDecisionPersisted); @@ -142,12 +142,13 @@ TEST_F(ReshardingDonorRecipientCommonTest, TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstanceWithIncorrectCoordinatorState) { OperationContext* opCtx = operationContext(); - auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard); + auto metadata = + makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard.getShardId()); auto reshardingFields = createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kDecisionPersisted); appendRecipientFieldsToReshardingFields( - reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp); + reshardingFields, kShards, kExistingUUID, kOriginalNss, kCloneTimestamp); ASSERT_THROWS_CODE(resharding::processReshardingFieldsForCollection( opCtx, kTemporaryReshardingNss, metadata, reshardingFields), @@ -165,7 +166,7 @@ TEST_F(ReshardingDonorRecipientCommonTest, TEST_F(ReshardingDonorRecipientCommonTest, ProcessDonorFieldsWhenShardDoesntOwnAnyChunks) { OperationContext* opCtx = operationContext(); - auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kOtherShard); + auto metadata = makeShardedMetadataForOriginalCollection(opCtx, kOtherShard.getShardId()); auto reshardingFields = createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); @@ -184,12 +185,12 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessDonorFieldsWhenShardDoesntOwnA TEST_F(ReshardingDonorRecipientCommonTest, ProcessRecipientFieldsWhenShardDoesntOwnAnyChunks) { OperationContext* opCtx = operationContext(); - auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kOtherShard); + auto metadata = + makeShardedMetadataForTemporaryReshardingCollection(opCtx, kOtherShard.getShardId()); auto reshardingFields = createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); - appendRecipientFieldsToReshardingFields( - reshardingFields, kShardIds, kExistingUUID, kOriginalNss); + appendRecipientFieldsToReshardingFields(reshardingFields, kShards, kExistingUUID, kOriginalNss); resharding::processReshardingFieldsForCollection( opCtx, kTemporaryReshardingNss, metadata, reshardingFields); @@ -205,7 +206,8 @@ TEST_F(ReshardingDonorRecipientCommonTest, ProcessRecipientFieldsWhenShardDoesnt TEST_F(ReshardingDonorRecipientCommonTest, ProcessReshardingFieldsWithoutDonorOrRecipientFields) { OperationContext* opCtx = operationContext(); - auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard); + auto metadata = + makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard.getShardId()); auto reshardingFields = createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); @@ -238,8 +240,8 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta { AutoGetCollection autoColl(opCtx, kOriginalNss, LockMode::MODE_IS); auto csr = CollectionShardingRuntime::get(opCtx, kOriginalNss); - csr->setFilteringMetadata(opCtx, - makeShardedMetadataForOriginalCollection(opCtx, kThisShard)); + csr->setFilteringMetadata( + opCtx, makeShardedMetadataForOriginalCollection(opCtx, kThisShard.getShardId())); ASSERT(csr->getCurrentMetadataIfKnown()); } @@ -247,8 +249,9 @@ TEST_F(ReshardingDonorRecipientCommonInternalsTest, ClearReshardingFilteringMeta { AutoGetCollection autoColl(opCtx, kTemporaryReshardingNss, LockMode::MODE_IS); auto csr = CollectionShardingRuntime::get(opCtx, kTemporaryReshardingNss); - csr->setFilteringMetadata( - opCtx, makeShardedMetadataForTemporaryReshardingCollection(opCtx, kThisShard)); + csr->setFilteringMetadata(opCtx, + makeShardedMetadataForTemporaryReshardingCollection( + opCtx, kThisShard.getShardId())); ASSERT(csr->getCurrentMetadataIfKnown()); } diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h index 72a3307cd5e..b8f4356bd17 100644 --- a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.h @@ -63,12 +63,14 @@ public: const OID kReshardingEpoch = OID::gen(); const UUID kReshardingUUID = UUID::gen(); - const ShardId kThisShard = ShardId("shardOne"); - const ShardId kOtherShard = ShardId("shardTwo"); + const DonorShardFetchTimestamp kThisShard = + makeDonorShardFetchTimestamp(ShardId("shardOne"), Timestamp(10, 0)); + const DonorShardFetchTimestamp kOtherShard = + makeDonorShardFetchTimestamp(ShardId("shardTwo"), Timestamp(20, 0)); - const std::vector<ShardId> kShardIds = {kThisShard, kOtherShard}; + const std::vector<DonorShardFetchTimestamp> kShards = {kThisShard, kOtherShard}; - const Timestamp kFetchTimestamp = Timestamp(1, 0); + const Timestamp kCloneTimestamp = Timestamp(20, 0); protected: CollectionMetadata makeShardedMetadataForOriginalCollection( @@ -104,7 +106,7 @@ protected: auto chunk = ChunkType( nss, std::move(range), ChunkVersion(1, 0, epoch, boost::none), shardThatChunkExistsOn); ChunkManager cm( - kThisShard, + kThisShard.getShardId(), DatabaseVersion(uuid), makeStandaloneRoutingTableHistory(RoutingTableHistory::makeNew(nss, uuid, @@ -119,7 +121,7 @@ protected: boost::none); if (!OperationShardingState::isOperationVersioned(opCtx)) { - const auto version = cm.getVersion(kThisShard); + const auto version = cm.getVersion(kThisShard.getShardId()); BSONObjBuilder builder; version.appendToCommand(&builder); @@ -127,14 +129,15 @@ protected: oss.initializeClientRoutingVersionsFromCommand(nss, builder.obj()); } - return CollectionMetadata(std::move(cm), kThisShard); + return CollectionMetadata(std::move(cm), kThisShard.getShardId()); } ReshardingDonorDocument makeDonorStateDoc() { DonorShardContext donorCtx; donorCtx.setState(DonorStateEnum::kPreparingToDonate); - ReshardingDonorDocument doc(std::move(donorCtx), {kThisShard, kOtherShard}); + ReshardingDonorDocument doc(std::move(donorCtx), + {kThisShard.getShardId(), kOtherShard.getShardId()}); NamespaceString sourceNss = kOriginalNss; auto sourceUUID = UUID::gen(); @@ -149,7 +152,8 @@ protected: RecipientShardContext recipCtx; recipCtx.setState(RecipientStateEnum::kCloning); - ReshardingRecipientDocument doc(std::move(recipCtx), {kThisShard, kOtherShard}, 1000); + ReshardingRecipientDocument doc( + std::move(recipCtx), {kThisShard.getShardId(), kOtherShard.getShardId()}, 1000); NamespaceString sourceNss = kOriginalNss; auto sourceUUID = UUID::gen(); @@ -158,10 +162,8 @@ protected: doc.setCommonReshardingMetadata(std::move(commonMetadata)); - // A document in the cloning state requires a fetch timestamp. - FetchTimestamp ts; - ts.setFetchTimestamp(kFetchTimestamp); - doc.setFetchTimestampStruct(ts); + // A document in the cloning state requires a clone timestamp. + doc.setCloneTimestamp(kCloneTimestamp); return doc; } @@ -174,19 +176,24 @@ protected: void appendDonorFieldsToReshardingFields(ReshardingFields& fields, const BSONObj& reshardingKey) { + std::vector<ShardId> donorShardIds; + for (const auto& shard : kShards) { + donorShardIds.emplace_back(shard.getShardId()); + } + fields.setDonorFields( - TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey, kShardIds)); + TypeCollectionDonorFields(kTemporaryReshardingNss, reshardingKey, donorShardIds)); } void appendRecipientFieldsToReshardingFields( ReshardingFields& fields, - const std::vector<ShardId> donorShardIds, + const std::vector<DonorShardFetchTimestamp> donorShards, const UUID& existingUUID, const NamespaceString& originalNss, - const boost::optional<Timestamp>& fetchTimestamp = boost::none) { + const boost::optional<Timestamp>& cloneTimestamp = boost::none) { auto recipientFields = - TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss, 5000); - emplaceFetchTimestampIfExists(recipientFields, fetchTimestamp); + TypeCollectionRecipientFields(donorShards, existingUUID, originalNss, 5000); + emplaceCloneTimestampIfExists(recipientFields, cloneTimestamp); fields.setRecipientFields(std::move(recipientFields)); } @@ -229,18 +236,37 @@ protected: ASSERT(recipientDoc.getMutableState().getState() == RecipientStateEnum::kAwaitingFetchTimestamp); - ASSERT(!recipientDoc.getFetchTimestamp()); + ASSERT(!recipientDoc.getCloneTimestamp()); + + const auto donorShards = reshardingFields.getRecipientFields()->getDonorShards(); + std::map<ShardId, DonorShardFetchTimestamp> donorShardMap; + for (const auto& donor : donorShards) { + donorShardMap.emplace(donor.getShardId(), donor); + } - auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds(); - auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end()); + for (const auto& donorShardFromRecipientDoc : recipientDoc.getDonorShards()) { + auto donorIter = donorShardMap.find(donorShardFromRecipientDoc.getShardId()); + ASSERT(donorIter != donorShardMap.end()); + ASSERT_EQ(donorIter->second.getMinFetchTimestamp().has_value(), + donorShardFromRecipientDoc.getMinFetchTimestamp().has_value()); - for (const auto& donorShardId : recipientDoc.getDonorShards()) { - auto reshardingFieldsDonorShardId = donorShardIdsSet.find(donorShardId); - ASSERT(reshardingFieldsDonorShardId != donorShardIdsSet.end()); - donorShardIdsSet.erase(reshardingFieldsDonorShardId); + if (donorIter->second.getMinFetchTimestamp()) { + ASSERT_EQ(*donorIter->second.getMinFetchTimestamp(), + *donorShardFromRecipientDoc.getMinFetchTimestamp()); + } + + donorShardMap.erase(donorShardFromRecipientDoc.getShardId()); } - ASSERT(donorShardIdsSet.empty()); + ASSERT(donorShardMap.empty()); + } + +private: + DonorShardFetchTimestamp makeDonorShardFetchTimestamp( + ShardId shardId, boost::optional<Timestamp> fetchTimestamp) { + DonorShardFetchTimestamp donorFetchTimestamp(shardId); + donorFetchTimestamp.setMinFetchTimestamp(fetchTimestamp); + return donorFetchTimestamp; } }; diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 62ebb7305d8..c02296ca462 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -65,8 +65,7 @@ namespace { const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)}; -Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss, - const CollectionUUID& sourceUUID) { +Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss) { auto opCtx = cc().makeOperationContext(); // Do a no-op write and use the OpTime as the minFetchTimestamp @@ -86,8 +85,8 @@ Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss, WriteUnitOfWork wuow(opCtx.get()); opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( opCtx.get(), - sourceNss, - sourceUUID, + NamespaceString::kForceOplogBatchBoundaryNamespace, + boost::none, BSON("msg" << msg), boost::none, boost::none, @@ -442,8 +441,7 @@ void ReshardingDonorService::DonorStateMachine:: _externalState->waitForCollectionFlush(opCtx.get(), _metadata.getTempReshardingNss()); } - Timestamp minFetchTimestamp = - generateMinFetchTimestamp(_metadata.getSourceNss(), _metadata.getSourceUUID()); + Timestamp minFetchTimestamp = generateMinFetchTimestamp(_metadata.getSourceNss()); LOGV2_DEBUG(5390702, 2, diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp index ae24821b064..855c316fe08 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -307,8 +307,9 @@ TEST_F(ReshardingDonorServiceTest, WritesNoOpOplogEntryToGenerateMinFetchTimesta ErrorCodes::InterruptedDueToReplStateChange); DBDirectClient client(opCtx.get()); - auto cursor = client.query(NamespaceString(NamespaceString::kRsOplogNamespace.ns()), - BSON("ns" << doc.getSourceNss().toString())); + auto cursor = + client.query(NamespaceString(NamespaceString::kRsOplogNamespace.ns()), + BSON("ns" << NamespaceString::kForceOplogBatchBoundaryNamespace.ns())); ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection"; repl::OplogEntry op(cursor->next()); @@ -317,7 +318,7 @@ TEST_F(ReshardingDonorServiceTest, WritesNoOpOplogEntryToGenerateMinFetchTimesta ASSERT_EQ(OpType_serializer(op.getOpType()), OpType_serializer(repl::OpTypeEnum::kNoop)) << op.getEntry(); - ASSERT_EQ(op.getUuid(), doc.getSourceUUID()) << op.getEntry(); + ASSERT_FALSE(op.getUuid()) << op.getEntry(); ASSERT_EQ(op.getObject()["msg"].type(), BSONType::String) << op.getEntry(); ASSERT_FALSE(bool(op.getObject2())) << op.getEntry(); ASSERT_FALSE(bool(op.getDestinedRecipient())) << op.getEntry(); @@ -347,9 +348,6 @@ TEST_F(ReshardingDonorServiceTest, WritesFinalReshardOpOplogEntriesWhileWritesBl BSON("ns" << doc.getSourceNss().toString())); ASSERT_TRUE(cursor->more()) << "Found no oplog entries for source collection"; - // Skip the first oplog entry returned because it is the no-op from generating the - // minFetchTimestamp value. - cursor->next(); for (const auto& recipientShardId : doc.getRecipientShards()) { ASSERT_TRUE(cursor->more()) << "Didn't find finalReshardOp entry for source collection"; diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 2daebd6656e..40a3729f091 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -81,13 +81,16 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) { } } -void ensureFulfilledPromise(WithLock lk, SharedPromise<Timestamp>& sp, Timestamp ts) { +void ensureFulfilledPromise( + WithLock lk, + SharedPromise<ReshardingRecipientService::RecipientStateMachine::CloneDetails>& sp, + ReshardingRecipientService::RecipientStateMachine::CloneDetails details) { auto future = sp.getFuture(); if (!future.isReady()) { - sp.emplaceValue(ts); + sp.emplaceValue(details); } else { // Ensure that we would only attempt to fulfill the promise with the same Timestamp value. - invariant(future.get() == ts); + invariant(future.get().cloneTimestamp == details.cloneTimestamp); } } @@ -149,10 +152,11 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx, opCtx, reshardingNss, optionsAndIndexes); } -std::vector<NamespaceString> ensureStashCollectionsExist(OperationContext* opCtx, - const ChunkManager& cm, - const UUID& existingUUID, - std::vector<ShardId> donorShards) { +std::vector<NamespaceString> ensureStashCollectionsExist( + OperationContext* opCtx, + const ChunkManager& cm, + const UUID& existingUUID, + const std::vector<DonorShardFetchTimestamp>& donorShards) { // Use the same collation for the stash collections as the temporary resharding collection auto collator = cm.getDefaultCollator(); BSONObj collationSpec = collator ? collator->getSpec().toBSON() : BSONObj(); @@ -165,7 +169,7 @@ std::vector<NamespaceString> ensureStashCollectionsExist(OperationContext* opCtx options.collation = std::move(collationSpec); for (const auto& donor : donorShards) { stashCollections.emplace_back(ReshardingOplogApplier::ensureStashCollectionExists( - opCtx, existingUUID, donor, options)); + opCtx, existingUUID, donor.getShardId(), options)); } } @@ -219,10 +223,10 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( : repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(), _recipientService{recipientService}, _metadata{recipientDoc.getCommonReshardingMetadata()}, - _donorShardIds{recipientDoc.getDonorShards()}, _minimumOperationDuration{Milliseconds{recipientDoc.getMinimumOperationDurationMillis()}}, _recipientCtx{recipientDoc.getMutableState()}, - _fetchTimestamp{recipientDoc.getFetchTimestamp()}, + _donorShards{recipientDoc.getDonorShards()}, + _cloneTimestamp{recipientDoc.getCloneTimestamp()}, _markKilledExecutor(std::make_shared<ThreadPool>([] { ThreadPool::Options options; options.poolName = "RecipientStateMachineCancelableOpCtxPool"; @@ -391,9 +395,12 @@ void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChange auto coordinatorState = reshardingFields.getState(); if (coordinatorState >= CoordinatorStateEnum::kCloning) { - auto fetchTimestamp = reshardingFields.getRecipientFields()->getFetchTimestamp(); - invariant(fetchTimestamp); - ensureFulfilledPromise(lk, _allDonorsPreparedToDonate, *fetchTimestamp); + auto recipientFields = *reshardingFields.getRecipientFields(); + invariant(recipientFields.getCloneTimestamp()); + ensureFulfilledPromise( + lk, + _allDonorsPreparedToDonate, + {*recipientFields.getCloneTimestamp(), recipientFields.getDonorShards()}); } if (coordinatorState >= CoordinatorStateEnum::kDecisionPersisted) { @@ -405,14 +412,15 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { - invariant(_fetchTimestamp); + invariant(_cloneTimestamp); return ExecutorFuture(**executor); } return _allDonorsPreparedToDonate.getFuture() .thenRunOn(**executor) - .then( - [this](Timestamp fetchTimestamp) { _transitionToCreatingCollection(fetchTimestamp); }); + .then([this](ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) { + _transitionToCreatingCollection(std::move(cloneDetails)); + }); } void ReshardingRecipientService::RecipientStateMachine:: @@ -429,7 +437,7 @@ void ReshardingRecipientService::RecipientStateMachine:: _metadata.getTempReshardingNss(), _metadata.getReshardingUUID(), _metadata.getSourceUUID(), - *_fetchTimestamp); + *_cloneTimestamp); ShardKeyPattern shardKeyPattern{_metadata.getReshardingKey()}; @@ -457,7 +465,7 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication( OperationContext* opCtx, bool cloningDone, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - invariant(_fetchTimestamp); + invariant(_cloneTimestamp); auto myShardId = ShardingState::get(opCtx->getServiceContext())->shardId(); auto catalogCache = Grid::get(opCtx)->catalogCache(); @@ -467,8 +475,8 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication( return _dataReplicationFactory(opCtx, _metrics(), _metadata, - _donorShardIds, - *_fetchTimestamp, + _donorShards, + *_cloneTimestamp, cloningDone, std::move(myShardId), std::move(sourceChunkMgr), @@ -588,8 +596,9 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: }) .then([this] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - for (const auto& donor : _donorShardIds) { - auto stashNss = getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor); + for (const auto& donor : _donorShards) { + auto stashNss = + getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId()); AutoGetCollection stashColl(opCtx.get(), stashNss, MODE_IS); uassert(5356800, "Resharding completed with non-empty stash collections", @@ -661,14 +670,17 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( } void ReshardingRecipientService::RecipientStateMachine::_transitionState( - RecipientShardContext&& newRecipientCtx, boost::optional<Timestamp>&& fetchTimestamp) { + RecipientShardContext&& newRecipientCtx, + boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&& + cloneDetails) { invariant(newRecipientCtx.getState() != RecipientStateEnum::kAwaitingFetchTimestamp); // For logging purposes. auto oldState = _recipientCtx.getState(); auto newState = newRecipientCtx.getState(); - _updateRecipientDocument(std::move(newRecipientCtx), std::move(fetchTimestamp)); + _updateRecipientDocument(std::move(newRecipientCtx), std::move(cloneDetails)); + _metrics()->setRecipientState(newState); LOGV2_INFO(5279506, @@ -681,10 +693,10 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( } void ReshardingRecipientService::RecipientStateMachine::_transitionToCreatingCollection( - Timestamp fetchTimestamp) { + ReshardingRecipientService::RecipientStateMachine::CloneDetails cloneDetails) { auto newRecipientCtx = _recipientCtx; newRecipientCtx.setState(RecipientStateEnum::kCreatingCollection); - _transitionState(std::move(newRecipientCtx), fetchTimestamp); + _transitionState(std::move(newRecipientCtx), std::move(cloneDetails)); } void ReshardingRecipientService::RecipientStateMachine::_transitionToError(Status abortReason) { @@ -808,7 +820,9 @@ void ReshardingRecipientService::RecipientStateMachine::insertStateDocument( } void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument( - RecipientShardContext&& newRecipientCtx, boost::optional<Timestamp>&& fetchTimestamp) { + RecipientShardContext&& newRecipientCtx, + boost::optional<ReshardingRecipientService::RecipientStateMachine::CloneDetails>&& + cloneDetails) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); PersistentTaskStore<ReshardingRecipientDocument> store( NamespaceString::kRecipientReshardingOperationsNamespace); @@ -819,10 +833,20 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument setBuilder.append(ReshardingRecipientDocument::kMutableStateFieldName, newRecipientCtx.toBSON()); - if (fetchTimestamp) { - setBuilder.append(ReshardingRecipientDocument::kFetchTimestampFieldName, - *fetchTimestamp); + if (cloneDetails) { + setBuilder.append(ReshardingRecipientDocument::kCloneTimestampFieldName, + cloneDetails->cloneTimestamp); + + BSONArrayBuilder donorShardsArrayBuilder; + for (const auto& donor : cloneDetails->donorShards) { + donorShardsArrayBuilder.append(donor.toBSON()); + } + + setBuilder.append(ReshardingRecipientDocument::kDonorShardsFieldName, + donorShardsArrayBuilder.arr()); } + + setBuilder.doneFast(); } store.update(opCtx.get(), @@ -833,8 +857,9 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument _recipientCtx = newRecipientCtx; - if (fetchTimestamp) { - _fetchTimestamp = fetchTimestamp; + if (cloneDetails) { + _cloneTimestamp = cloneDetails->cloneTimestamp; + _donorShards = std::move(cloneDetails->donorShards); } } @@ -850,8 +875,9 @@ void ReshardingRecipientService::RecipientStateMachine::_removeRecipientDocument void ReshardingRecipientService::RecipientStateMachine::_dropOplogCollections( OperationContext* opCtx) { - for (const auto& donor : _donorShardIds) { - auto reshardingSourceId = ReshardingSourceId{_metadata.getReshardingUUID(), donor}; + for (const auto& donor : _donorShards) { + auto reshardingSourceId = + ReshardingSourceId{_metadata.getReshardingUUID(), donor.getShardId()}; // Remove the oplog applier progress doc for this donor. PersistentTaskStore<ReshardingOplogApplierProgress> oplogApplierProgressStore( @@ -871,11 +897,13 @@ void ReshardingRecipientService::RecipientStateMachine::_dropOplogCollections( WriteConcernOptions()); // Drop the conflict stash collection for this donor. - auto stashNss = getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor); + auto stashNss = + getLocalConflictStashNamespace(_metadata.getSourceUUID(), donor.getShardId()); resharding::data_copy::ensureCollectionDropped(opCtx, stashNss); // Drop the oplog buffer collection for this donor. - auto oplogBufferNss = getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor); + auto oplogBufferNss = + getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()); resharding::data_copy::ensureCollectionDropped(opCtx, oplogBufferNss); } } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index c777bf8cc34..b83bfce32e1 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -53,10 +53,11 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx, const UUID& existingUUID, Timestamp fetchTimestamp); -std::vector<NamespaceString> ensureStashCollectionsExist(OperationContext* opCtx, - const ChunkManager& cm, - const UUID& existingUUID, - std::vector<ShardId> donorShards); +std::vector<NamespaceString> ensureStashCollectionsExist( + OperationContext* opCtx, + const ChunkManager& cm, + const UUID& existingUUID, + const std::vector<DonorShardFetchTimestamp>& donorShards); ReshardingDonorOplogId getFetcherIdToResumeFrom(OperationContext* opCtx, NamespaceString oplogBufferNss, @@ -101,6 +102,11 @@ public: class ReshardingRecipientService::RecipientStateMachine final : public repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine> { public: + struct CloneDetails { + Timestamp cloneTimestamp; + std::vector<DonorShardFetchTimestamp> donorShards; + }; + explicit RecipientStateMachine(const ReshardingRecipientService* recipientService, const BSONObj& recipientDoc, ReshardingDataReplicationFactory dataReplicationFactory); @@ -162,10 +168,10 @@ private: void _transitionState(RecipientStateEnum newState); void _transitionState(RecipientShardContext&& newRecipientCtx, - boost::optional<Timestamp>&& fetchTimestamp); + boost::optional<CloneDetails>&& cloneDetails); // Transitions the on-disk and in-memory state to RecipientStateEnum::kCreatingCollection. - void _transitionToCreatingCollection(Timestamp fetchTimestamp); + void _transitionToCreatingCollection(CloneDetails cloneDetails); // Transitions the on-disk and in-memory state to RecipientStateEnum::kError. void _transitionToError(Status abortReason); @@ -176,9 +182,9 @@ private: OperationContext* opCtx, const std::shared_ptr<executor::ScopedTaskExecutor>& executor); // Updates the mutable portion of the on-disk and in-memory recipient document with - // 'newRecipientCtx' and 'fetchTimestamp'. + // 'newRecipientCtx', 'fetchTimestamp and 'donorShards'. void _updateRecipientDocument(RecipientShardContext&& newRecipientCtx, - boost::optional<Timestamp>&& fetchTimestamp); + boost::optional<CloneDetails>&& cloneDetails); // Removes the local recipient document from disk. void _removeRecipientDocument(); @@ -214,13 +220,13 @@ private: // The in-memory representation of the immutable portion of the document in // config.localReshardingOperations.recipient. const CommonReshardingMetadata _metadata; - const std::vector<ShardId> _donorShardIds; const Milliseconds _minimumOperationDuration; // The in-memory representation of the mutable portion of the document in // config.localReshardingOperations.recipient. RecipientShardContext _recipientCtx; - boost::optional<Timestamp> _fetchTimestamp; + std::vector<DonorShardFetchTimestamp> _donorShards; + boost::optional<Timestamp> _cloneTimestamp; // ThreadPool used by CancelableOperationContext. // CancelableOperationContext must have a thread that is always available to it to mark its @@ -246,7 +252,7 @@ private: // Each promise below corresponds to a state on the recipient state machine. They are listed in // ascending order, such that the first promise below will be the first promise fulfilled. - SharedPromise<Timestamp> _allDonorsPreparedToDonate; + SharedPromise<CloneDetails> _allDonorsPreparedToDonate; SharedPromise<void> _coordinatorHasDecisionPersisted; diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 5020b0369b6..c4a5b594045 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -154,7 +154,7 @@ public: recipientFields.setSourceUUID(uuid); // Populating the set of donor shard ids isn't necessary to test the functionality of // creating the temporary resharding collection. - recipientFields.setDonorShardIds({}); + recipientFields.setDonorShards({}); recipientFields.setMinimumOperationDurationMillis(5000); reshardingFields.setRecipientFields(recipientFields); diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 8b5639d7b4e..5f8647e69df 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -56,22 +56,20 @@ constexpr auto kReshardFinalOpLogType = "reshardFinalOp"_sd; * Emplaces the 'fetchTimestamp' onto the ClassWithFetchTimestamp if the timestamp has been * emplaced inside the boost::optional. */ -template <class ClassWithFetchTimestamp> -void emplaceFetchTimestampIfExists(ClassWithFetchTimestamp& c, - boost::optional<Timestamp> fetchTimestamp) { - if (!fetchTimestamp) { +template <typename ClassWithCloneTimestamp> +void emplaceCloneTimestampIfExists(ClassWithCloneTimestamp& c, + boost::optional<Timestamp> cloneTimestamp) { + if (!cloneTimestamp) { return; } - invariant(!fetchTimestamp->isNull()); + invariant(!cloneTimestamp->isNull()); - if (auto alreadyExistingFetchTimestamp = c.getFetchTimestamp()) { - invariant(fetchTimestamp == alreadyExistingFetchTimestamp); + if (auto alreadyExistingCloneTimestamp = c.getCloneTimestamp()) { + invariant(cloneTimestamp == alreadyExistingCloneTimestamp); } - FetchTimestamp fetchTimestampStruct; - fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp)); - c.setFetchTimestampStruct(std::move(fetchTimestampStruct)); + c.setCloneTimestamp(*cloneTimestamp); } /** |