summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_recipient_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp36
1 files changed, 29 insertions, 7 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index d846bf9290c..7c12ade6d38 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -169,6 +169,21 @@ std::vector<NamespaceString> ensureStashCollectionsExist(
return stashCollections;
}
+ReshardingDonorOplogId getIdToResumeFrom(OperationContext* opCtx,
+ NamespaceString oplogBufferNss,
+ Timestamp fetchTimestamp) {
+ AutoGetCollection collection(opCtx, oplogBufferNss, MODE_IS);
+ if (!collection) {
+ return ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp};
+ }
+
+ auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *collection);
+ return highestOplogBufferId.missing()
+ ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}
+ : ReshardingDonorOplogId::parse({"resharding::getIdToResumeFrom"},
+ highestOplogBufferId.getDocument().toBson());
+}
+
} // namespace resharding
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance(
@@ -351,6 +366,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
}
auto* serviceContext = Client::getCurrent()->getServiceContext();
+ auto fetchTimestamp = *_recipientDoc.getFetchTimestamp();
auto tempNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
_recipientDoc.getExistingUUID());
@@ -359,7 +375,7 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
_recipientDoc.getNss(),
_recipientDoc.getExistingUUID(),
ShardingState::get(serviceContext)->shardId(),
- *_recipientDoc.getFetchTimestamp(),
+ fetchTimestamp,
std::move(tempNss));
auto numDonors = _recipientDoc.getDonorShardsMirroring().size();
@@ -373,18 +389,24 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
const auto& recipientId = ShardingState::get(serviceContext)->shardId();
for (const auto& donor : _recipientDoc.getDonorShardsMirroring()) {
+ auto oplogBufferNss =
+ getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId());
+ auto opCtx = cc().makeOperationContext();
+ auto idToResumeFrom =
+ resharding::getIdToResumeFrom(opCtx.get(), oplogBufferNss, fetchTimestamp);
+ invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}));
+
stdx::lock_guard<Latch> lk(_mutex);
_oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>(
_recipientDoc.get_id(),
_recipientDoc.getExistingUUID(),
- // The recipient fetches oplog entries from the donor starting from the fetchTimestamp,
- // which corresponds to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume
- // token value.
- ReshardingDonorOplogId{*_recipientDoc.getFetchTimestamp(),
- *_recipientDoc.getFetchTimestamp()},
+ // The recipient fetches oplog entries from the donor starting from the largest _id
+ // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds
+ // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value.
+ idToResumeFrom,
donor.getId(),
recipientId,
- getLocalOplogBufferNamespace(_recipientDoc.getExistingUUID(), donor.getId())));
+ oplogBufferNss));
_oplogFetcherFutures.emplace_back(
_oplogFetchers.back()