diff options
author | Randolph Tan <randolph@10gen.com> | 2021-03-22 20:51:11 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-08 19:16:47 +0000 |
commit | aa401671d769f20e98f40b864338c7bd1c14d292 (patch) | |
tree | 27edd2840e7cc3a939bc9178d5fbb680cede3329 /src/mongo/db/s/resharding/resharding_data_replication.cpp | |
parent | bcd58f9b973e2a6839b40ceedb69dd245e72ab05 (diff) | |
download | mongo-aa401671d769f20e98f40b864338c7bd1c14d292.tar.gz |
SERVER-55214 Make resharding recipient shards use fetchTimestamp from each donor shard when fetching config.transactions and the oplog
Also force the no-op oplog write that is being used as the minFetchTimestamp marker for resharding into its own batch when replicating.
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_data_replication.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_data_replication.cpp | 69 |
1 files changed, 36 insertions, 33 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index 4ab52148ae7..cb2956e2085 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -96,14 +96,14 @@ std::unique_ptr<ReshardingCollectionCloner> ReshardingDataReplication::_makeColl std::vector<std::unique_ptr<ReshardingTxnCloner>> ReshardingDataReplication::_makeTxnCloners( const CommonReshardingMetadata& metadata, - const std::vector<ShardId>& donorShardIds, - Timestamp fetchTimestamp) { + const std::vector<DonorShardFetchTimestamp>& donorShards) { std::vector<std::unique_ptr<ReshardingTxnCloner>> txnCloners; - txnCloners.reserve(donorShardIds.size()); + txnCloners.reserve(donorShards.size()); - for (const auto& donor : donorShardIds) { + for (const auto& donor : donorShards) { txnCloners.emplace_back(std::make_unique<ReshardingTxnCloner>( - ReshardingSourceId(metadata.getReshardingUUID(), donor), fetchTimestamp)); + ReshardingSourceId(metadata.getReshardingUUID(), donor.getShardId()), + *donor.getMinFetchTimestamp())); } return txnCloners; @@ -113,14 +113,15 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication:: OperationContext* opCtx, ReshardingMetrics* metrics, const CommonReshardingMetadata& metadata, - const std::vector<ShardId>& donorShardIds, - Timestamp fetchTimestamp, + const std::vector<DonorShardFetchTimestamp>& donorShards, const ShardId& myShardId) { std::vector<std::unique_ptr<ReshardingOplogFetcher>> oplogFetchers; - oplogFetchers.reserve(donorShardIds.size()); + oplogFetchers.reserve(donorShards.size()); - for (const auto& donor : donorShardIds) { - auto oplogBufferNss = getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor); + for (const auto& donor : donorShards) { + auto oplogBufferNss = + getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId()); + auto fetchTimestamp = *donor.getMinFetchTimestamp(); auto idToResumeFrom = resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNss, fetchTimestamp); invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp})); @@ -133,7 +134,7 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication:: // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value. std::move(idToResumeFrom), - donor, + donor.getShardId(), myShardId, std::move(oplogBufferNss))); } @@ -178,23 +179,26 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication:: OperationContext* opCtx, ReshardingMetrics* metrics, CommonReshardingMetadata metadata, - const std::vector<ShardId>& donorShardIds, - Timestamp fetchTimestamp, + const std::vector<DonorShardFetchTimestamp>& donorShards, + Timestamp cloneTimestamp, ChunkManager sourceChunkMgr, std::shared_ptr<executor::TaskExecutor> executor, const std::vector<NamespaceString>& stashCollections, const std::vector<std::unique_ptr<ReshardingOplogFetcher>>& oplogFetchers, const std::vector<std::unique_ptr<ThreadPool>>& oplogApplierWorkers) { std::vector<std::unique_ptr<ReshardingOplogApplier>> oplogAppliers; - oplogAppliers.reserve(donorShardIds.size()); + oplogAppliers.reserve(donorShards.size()); - for (size_t i = 0; i < donorShardIds.size(); ++i) { - auto sourceId = ReshardingSourceId{metadata.getReshardingUUID(), donorShardIds[i]}; - auto idToResumeFrom = resharding::getApplierIdToResumeFrom(opCtx, sourceId, fetchTimestamp); - invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp})); + for (size_t i = 0; i < donorShards.size(); ++i) { + auto sourceId = + ReshardingSourceId{metadata.getReshardingUUID(), donorShards[i].getShardId()}; + auto minOplogTimestamp = *donorShards[i].getMinFetchTimestamp(); + auto idToResumeFrom = + resharding::getApplierIdToResumeFrom(opCtx, sourceId, minOplogTimestamp); + invariant((idToResumeFrom >= ReshardingDonorOplogId{minOplogTimestamp, minOplogTimestamp})); const auto& oplogBufferNss = - getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShardIds[i]); + getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShards[i].getShardId()); oplogAppliers.emplace_back(std::make_unique<ReshardingOplogApplier>( std::make_unique<ReshardingOplogApplier::Env>(opCtx->getServiceContext(), metrics), @@ -204,10 +208,10 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication:: metadata.getSourceUUID(), stashCollections, i, - fetchTimestamp, + cloneTimestamp, // The recipient applies oplog entries from the donor starting from the progress value - // in progress_applier. Otherwise, it starts at fetchTimestamp, which corresponds to - // {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value. + // in progress_applier. Otherwise, it starts at cloneTimestamp, which corresponds to + // {clusterTime: cloneTimestamp, ts: cloneTimestamp} as a resume token value. std::make_unique<ReshardingDonorOplogIterator>( oplogBufferNss, std::move(idToResumeFrom), oplogFetchers[i].get()), sourceChunkMgr, @@ -222,8 +226,8 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m OperationContext* opCtx, ReshardingMetrics* metrics, CommonReshardingMetadata metadata, - std::vector<ShardId> donorShardIds, - Timestamp fetchTimestamp, + const std::vector<DonorShardFetchTimestamp>& donorShards, + Timestamp cloneTimestamp, bool cloningDone, ShardId myShardId, ChunkManager sourceChunkMgr, @@ -232,24 +236,23 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m std::vector<std::unique_ptr<ReshardingTxnCloner>> txnCloners; if (!cloningDone) { - collectionCloner = _makeCollectionCloner(metrics, metadata, myShardId, fetchTimestamp); - txnCloners = _makeTxnCloners(metadata, donorShardIds, fetchTimestamp); + collectionCloner = _makeCollectionCloner(metrics, metadata, myShardId, cloneTimestamp); + txnCloners = _makeTxnCloners(metadata, donorShards); } - auto oplogFetchers = - _makeOplogFetchers(opCtx, metrics, metadata, donorShardIds, fetchTimestamp, myShardId); + auto oplogFetchers = _makeOplogFetchers(opCtx, metrics, metadata, donorShards, myShardId); - auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShardIds.size()); - auto oplogApplierWorkers = _makeOplogApplierWorkers(donorShardIds.size()); + auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size()); + auto oplogApplierWorkers = _makeOplogApplierWorkers(donorShards.size()); auto stashCollections = resharding::ensureStashCollectionsExist( - opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShardIds); + opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShards); auto oplogAppliers = _makeOplogAppliers(opCtx, metrics, metadata, - donorShardIds, - fetchTimestamp, + donorShards, + cloneTimestamp, std::move(sourceChunkMgr), std::move(executor), stashCollections, |