summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_data_replication.cpp
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2021-03-22 20:51:11 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-08 19:16:47 +0000
commitaa401671d769f20e98f40b864338c7bd1c14d292 (patch)
tree27edd2840e7cc3a939bc9178d5fbb680cede3329 /src/mongo/db/s/resharding/resharding_data_replication.cpp
parentbcd58f9b973e2a6839b40ceedb69dd245e72ab05 (diff)
downloadmongo-aa401671d769f20e98f40b864338c7bd1c14d292.tar.gz
SERVER-55214 Make resharding recipient shards use fetchTimestamp from each donor shard when fetching config.transactions and the oplog
Also force the no-op oplog write that is being used as the minFetchTimestamp marker for resharding into its own batch when replicating.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_data_replication.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.cpp69
1 files changed, 36 insertions, 33 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp
index 4ab52148ae7..cb2956e2085 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp
@@ -96,14 +96,14 @@ std::unique_ptr<ReshardingCollectionCloner> ReshardingDataReplication::_makeColl
std::vector<std::unique_ptr<ReshardingTxnCloner>> ReshardingDataReplication::_makeTxnCloners(
const CommonReshardingMetadata& metadata,
- const std::vector<ShardId>& donorShardIds,
- Timestamp fetchTimestamp) {
+ const std::vector<DonorShardFetchTimestamp>& donorShards) {
std::vector<std::unique_ptr<ReshardingTxnCloner>> txnCloners;
- txnCloners.reserve(donorShardIds.size());
+ txnCloners.reserve(donorShards.size());
- for (const auto& donor : donorShardIds) {
+ for (const auto& donor : donorShards) {
txnCloners.emplace_back(std::make_unique<ReshardingTxnCloner>(
- ReshardingSourceId(metadata.getReshardingUUID(), donor), fetchTimestamp));
+ ReshardingSourceId(metadata.getReshardingUUID(), donor.getShardId()),
+ *donor.getMinFetchTimestamp()));
}
return txnCloners;
@@ -113,14 +113,15 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::
OperationContext* opCtx,
ReshardingMetrics* metrics,
const CommonReshardingMetadata& metadata,
- const std::vector<ShardId>& donorShardIds,
- Timestamp fetchTimestamp,
+ const std::vector<DonorShardFetchTimestamp>& donorShards,
const ShardId& myShardId) {
std::vector<std::unique_ptr<ReshardingOplogFetcher>> oplogFetchers;
- oplogFetchers.reserve(donorShardIds.size());
+ oplogFetchers.reserve(donorShards.size());
- for (const auto& donor : donorShardIds) {
- auto oplogBufferNss = getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor);
+ for (const auto& donor : donorShards) {
+ auto oplogBufferNss =
+ getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId());
+ auto fetchTimestamp = *donor.getMinFetchTimestamp();
auto idToResumeFrom =
resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNss, fetchTimestamp);
invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}));
@@ -133,7 +134,7 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::
// value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds
// to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value.
std::move(idToResumeFrom),
- donor,
+ donor.getShardId(),
myShardId,
std::move(oplogBufferNss)));
}
@@ -178,23 +179,26 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
OperationContext* opCtx,
ReshardingMetrics* metrics,
CommonReshardingMetadata metadata,
- const std::vector<ShardId>& donorShardIds,
- Timestamp fetchTimestamp,
+ const std::vector<DonorShardFetchTimestamp>& donorShards,
+ Timestamp cloneTimestamp,
ChunkManager sourceChunkMgr,
std::shared_ptr<executor::TaskExecutor> executor,
const std::vector<NamespaceString>& stashCollections,
const std::vector<std::unique_ptr<ReshardingOplogFetcher>>& oplogFetchers,
const std::vector<std::unique_ptr<ThreadPool>>& oplogApplierWorkers) {
std::vector<std::unique_ptr<ReshardingOplogApplier>> oplogAppliers;
- oplogAppliers.reserve(donorShardIds.size());
+ oplogAppliers.reserve(donorShards.size());
- for (size_t i = 0; i < donorShardIds.size(); ++i) {
- auto sourceId = ReshardingSourceId{metadata.getReshardingUUID(), donorShardIds[i]};
- auto idToResumeFrom = resharding::getApplierIdToResumeFrom(opCtx, sourceId, fetchTimestamp);
- invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}));
+ for (size_t i = 0; i < donorShards.size(); ++i) {
+ auto sourceId =
+ ReshardingSourceId{metadata.getReshardingUUID(), donorShards[i].getShardId()};
+ auto minOplogTimestamp = *donorShards[i].getMinFetchTimestamp();
+ auto idToResumeFrom =
+ resharding::getApplierIdToResumeFrom(opCtx, sourceId, minOplogTimestamp);
+ invariant((idToResumeFrom >= ReshardingDonorOplogId{minOplogTimestamp, minOplogTimestamp}));
const auto& oplogBufferNss =
- getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardIds[i]);
+ getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShards[i].getShardId());
oplogAppliers.emplace_back(std::make_unique<ReshardingOplogApplier>(
std::make_unique<ReshardingOplogApplier::Env>(opCtx->getServiceContext(), metrics),
@@ -204,10 +208,10 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication::
metadata.getSourceUUID(),
stashCollections,
i,
- fetchTimestamp,
+ cloneTimestamp,
// The recipient applies oplog entries from the donor starting from the progress value
- // in progress_applier. Otherwise, it starts at fetchTimestamp, which corresponds to
- // {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value.
+ // in progress_applier. Otherwise, it starts at cloneTimestamp, which corresponds to
+ // {clusterTime: cloneTimestamp, ts: cloneTimestamp} as a resume token value.
std::make_unique<ReshardingDonorOplogIterator>(
oplogBufferNss, std::move(idToResumeFrom), oplogFetchers[i].get()),
sourceChunkMgr,
@@ -222,8 +226,8 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m
OperationContext* opCtx,
ReshardingMetrics* metrics,
CommonReshardingMetadata metadata,
- std::vector<ShardId> donorShardIds,
- Timestamp fetchTimestamp,
+ const std::vector<DonorShardFetchTimestamp>& donorShards,
+ Timestamp cloneTimestamp,
bool cloningDone,
ShardId myShardId,
ChunkManager sourceChunkMgr,
@@ -232,24 +236,23 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m
std::vector<std::unique_ptr<ReshardingTxnCloner>> txnCloners;
if (!cloningDone) {
- collectionCloner = _makeCollectionCloner(metrics, metadata, myShardId, fetchTimestamp);
- txnCloners = _makeTxnCloners(metadata, donorShardIds, fetchTimestamp);
+ collectionCloner = _makeCollectionCloner(metrics, metadata, myShardId, cloneTimestamp);
+ txnCloners = _makeTxnCloners(metadata, donorShards);
}
- auto oplogFetchers =
- _makeOplogFetchers(opCtx, metrics, metadata, donorShardIds, fetchTimestamp, myShardId);
+ auto oplogFetchers = _makeOplogFetchers(opCtx, metrics, metadata, donorShards, myShardId);
- auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShardIds.size());
- auto oplogApplierWorkers = _makeOplogApplierWorkers(donorShardIds.size());
+ auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size());
+ auto oplogApplierWorkers = _makeOplogApplierWorkers(donorShards.size());
auto stashCollections = resharding::ensureStashCollectionsExist(
- opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShardIds);
+ opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShards);
auto oplogAppliers = _makeOplogAppliers(opCtx,
metrics,
metadata,
- donorShardIds,
- fetchTimestamp,
+ donorShards,
+ cloneTimestamp,
std::move(sourceChunkMgr),
std::move(executor),
stashCollections,