diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.cpp | 36 |
1 files changed, 29 insertions, 7 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index d846bf9290c..7c12ade6d38 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -169,6 +169,21 @@ std::vector<NamespaceString> ensureStashCollectionsExist( return stashCollections; } +ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx, + NamespaceString oplogBufferNss, + Timestamp fetchTimestamp) { + AutoGetCollection collection(opCtx, oplogBufferNss, MODE_IS); + if (!collection) { + return ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}; + } + + auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *collection); + return highestOplogBufferId.missing() + ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp} + : ReshardingDonorOplogId::parse({"resharding::getIdToResumeFrom"}, + highestOplogBufferId.getDocument().toBson()); +} + } // namespace resharding std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance( @@ -351,6 +366,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin } auto* serviceContext = Client::getCurrent()->getServiceContext(); + auto fetchTimestamp = *_recipientDoc.getFetchTimestamp(); auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), _recipientDoc.getExistingUUID()); @@ -359,7 +375,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin _recipientDoc.getNss(), _recipientDoc.getExistingUUID(), ShardingState::get(serviceContext)->shardId(), - *_recipientDoc.getFetchTimestamp(), + fetchTimestamp, std::move(tempNss)); auto numDonors = _recipientDoc.getDonorShardsMirroring().size(); @@ -373,18 +389,24 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin const auto& recipientId = ShardingState::get(serviceContext)->shardId(); for (const auto& donor : _recipientDoc.getDonorShardsMirroring()) { + auto oplogBufferNss = + getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId()); + auto opCtx = cc().makeOperationContext(); + auto idToResumeFrom = + resharding::getIdToResumeFrom(opCtx.get(), oplogBufferNss, fetchTimestamp); + invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp})); + stdx::lock_guard<Latch> lk(_mutex); _oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>( _recipientDoc.get_id(), _recipientDoc.getExistingUUID(), - // The recipient fetches oplog entries from the donor starting from the fetchTimestamp, - // which corresponds to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume - // token value. - ReshardingDonorOplogId{*_recipientDoc.getFetchTimestamp(), - *_recipientDoc.getFetchTimestamp()}, + // The recipient fetches oplog entries from the donor starting from the largest _id + // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds + // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value. + idToResumeFrom, donor.getId(), recipientId, - getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId()))); + oplogBufferNss)); _oplogFetcherFutures.emplace_back( _oplogFetchers.back() |