diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-04-09 17:03:05 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-09 17:26:13 +0000 |
commit | d7c268102917688e9763063d009ad04fdf1d3532 (patch) | |
tree | 251fd43f16e034245642e17a751cbd882c2df416 | |
parent | a59ed06e5f2984cf7b7a1d9dac138297ace1f0a0 (diff) | |
download | mongo-d7c268102917688e9763063d009ad04fdf1d3532.tar.gz |
SERVER-55288 Move recipient helpers into resharding_data_replication.
8 files changed, 311 insertions, 208 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 2f8fd92ab26..ec1dac6b245 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -494,6 +494,7 @@ env.CppUnitTest( 'resharding_destined_recipient_test.cpp', 'resharding/resharding_agg_test.cpp', 'resharding/resharding_collection_cloner_test.cpp', + 'resharding/resharding_data_replication_test.cpp', 'resharding/resharding_donor_oplog_iterator_test.cpp', 'resharding/resharding_donor_recipient_common_test.cpp', 'resharding/resharding_metrics_test.cpp', diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp index 3864357c299..3bbdf20e590 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.cpp +++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp @@ -35,11 +35,11 @@ #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/s/resharding/resharding_collection_cloner.h" +#include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_future_util.h" #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_applier.h" #include "mongo/db/s/resharding/resharding_oplog_fetcher.h" -#include "mongo/db/s/resharding/resharding_recipient_service.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding/resharding_txn_cloner.h" #include "mongo/db/s/resharding_util.h" @@ -83,14 +83,14 @@ std::unique_ptr<ReshardingCollectionCloner> ReshardingDataReplication::_makeColl ReshardingMetrics* metrics, const CommonReshardingMetadata& metadata, const ShardId& myShardId, - Timestamp fetchTimestamp) { + Timestamp cloneTimestamp) { return std::make_unique<ReshardingCollectionCloner>( std::make_unique<ReshardingCollectionCloner::Env>(metrics), ShardKeyPattern{metadata.getReshardingKey()}, metadata.getSourceNss(), metadata.getSourceUUID(), myShardId, - fetchTimestamp, + cloneTimestamp, metadata.getTempReshardingNss()); } @@ -121,18 +121,18 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication:: for (const auto& donor : donorShards) { auto oplogBufferNss = getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId()); - auto fetchTimestamp = *donor.getMinFetchTimestamp(); - auto idToResumeFrom = - resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNss, fetchTimestamp); - invariant((idToResumeFrom >= ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp})); + auto minFetchTimestamp = *donor.getMinFetchTimestamp(); + auto idToResumeFrom = getOplogFetcherResumeId(opCtx, oplogBufferNss, minFetchTimestamp); + invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp})); oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>( std::make_unique<ReshardingOplogFetcher::Env>(opCtx->getServiceContext(), metrics), metadata.getReshardingUUID(), metadata.getSourceUUID(), // The recipient fetches oplog entries from the donor starting from the largest _id - // value in the oplog buffer. Otherwise, it starts at fetchTimestamp, which corresponds - // to {clusterTime: fetchTimestamp, ts: fetchTimestamp} as a resume token value. + // value in the oplog buffer. Otherwise, it starts at minFetchTimestamp, which + // corresponds to {clusterTime: minFetchTimestamp, ts: minFetchTimestamp} as a resume + // token value. std::move(idToResumeFrom), donor.getShardId(), myShardId, @@ -175,10 +175,9 @@ std::vector<std::unique_ptr<ReshardingOplogApplier>> ReshardingDataReplication:: for (size_t i = 0; i < donorShards.size(); ++i) { auto sourceId = ReshardingSourceId{metadata.getReshardingUUID(), donorShards[i].getShardId()}; - auto minOplogTimestamp = *donorShards[i].getMinFetchTimestamp(); - auto idToResumeFrom = - resharding::getApplierIdToResumeFrom(opCtx, sourceId, minOplogTimestamp); - invariant((idToResumeFrom >= ReshardingDonorOplogId{minOplogTimestamp, minOplogTimestamp})); + auto minFetchTimestamp = *donorShards[i].getMinFetchTimestamp(); + auto idToResumeFrom = getOplogApplierResumeId(opCtx, sourceId, minFetchTimestamp); + invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp})); const auto& oplogBufferNss = getLocalOplogBufferNamespace(metadata.getSourceUUID(), donorShards[i].getShardId()); @@ -224,9 +223,7 @@ std::unique_ptr<ReshardingDataReplicationInterface> ReshardingDataReplication::m auto oplogFetcherExecutor = _makeOplogFetcherExecutor(donorShards.size()); - auto stashCollections = resharding::ensureStashCollectionsExist( - opCtx, sourceChunkMgr, metadata.getSourceUUID(), donorShards); - + auto stashCollections = ensureStashCollectionsExist(opCtx, sourceChunkMgr, donorShards); auto oplogAppliers = _makeOplogAppliers(opCtx, metrics, metadata, @@ -386,7 +383,7 @@ std::vector<SharedSemiFuture<void>> ReshardingDataReplication::_runTxnCloners( } // ReshardingTxnCloners must complete before the recipient transitions to kApplying to avoid - // errors caused by donor shards unpinning the fetchTimestamp. + // errors caused by donor shards unpinning their minFetchTimestamp. return txnClonerFutures; } @@ -455,4 +452,49 @@ void ReshardingDataReplication::shutdown() { _oplogFetcherExecutor->shutdown(); } +std::vector<NamespaceString> ReshardingDataReplication::ensureStashCollectionsExist( + OperationContext* opCtx, + const ChunkManager& sourceChunkMgr, + const std::vector<DonorShardFetchTimestamp>& donorShards) { + // Use the same collation for the stash collections as the temporary resharding collection + // (which is also the same as the collation for the collection being resharded). + CollectionOptions options; + if (auto collator = sourceChunkMgr.getDefaultCollator()) { + options.collation = collator->getSpec().toBSON(); + } + + std::vector<NamespaceString> stashCollections; + stashCollections.reserve(donorShards.size()); + + for (const auto& donor : donorShards) { + stashCollections.emplace_back(ReshardingOplogApplier::ensureStashCollectionExists( + opCtx, *sourceChunkMgr.getUUID(), donor.getShardId(), options)); + } + + return stashCollections; +} + +ReshardingDonorOplogId ReshardingDataReplication::getOplogFetcherResumeId( + OperationContext* opCtx, const NamespaceString& oplogBufferNss, Timestamp minFetchTimestamp) { + invariant(!opCtx->lockState()->isLocked()); + + AutoGetCollection coll(opCtx, oplogBufferNss, MODE_IS); + if (coll) { + auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *coll); + if (!highestOplogBufferId.missing()) { + return ReshardingDonorOplogId::parse({"getOplogFetcherResumeId"}, + highestOplogBufferId.getDocument().toBson()); + } + } + + return ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}; +} + +ReshardingDonorOplogId ReshardingDataReplication::getOplogApplierResumeId( + OperationContext* opCtx, const ReshardingSourceId& sourceId, Timestamp minFetchTimestamp) { + auto applierProgress = ReshardingOplogApplier::checkStoredProgress(opCtx, sourceId); + return applierProgress ? applierProgress->getProgress() + : ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}; +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h index 9653c7f02ae..73bb12ba856 100644 --- a/src/mongo/db/s/resharding/resharding_data_replication.h +++ b/src/mongo/db/s/resharding/resharding_data_replication.h @@ -35,6 +35,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/cancelable_operation_context.h" +#include "mongo/db/s/resharding/donor_oplog_id_gen.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/resharding/common_types_gen.h" #include "mongo/s/shard_id.h" @@ -120,7 +121,7 @@ public: /** * Returns a future that becomes ready when either * (a) the recipient with respect to each donor shard has applied through the timestamp it has - * finished cloning at (the fetchTimestamp), or + * finished cloning at (the cloneTimestamp), or * (b) the recipient has encountered an operation-fatal error. */ virtual SharedSemiFuture<void> awaitConsistentButStale() = 0; @@ -178,12 +179,28 @@ public: void shutdown() override; + // The following methods are called by ReshardingDataReplication::make() and only exposed + // publicly for unit-testing purposes. + + static std::vector<NamespaceString> ensureStashCollectionsExist( + OperationContext* opCtx, + const ChunkManager& sourceChunkMgr, + const std::vector<DonorShardFetchTimestamp>& donorShards); + + static ReshardingDonorOplogId getOplogFetcherResumeId(OperationContext* opCtx, + const NamespaceString& oplogBufferNss, + Timestamp minFetchTimestamp); + + static ReshardingDonorOplogId getOplogApplierResumeId(OperationContext* opCtx, + const ReshardingSourceId& sourceId, + Timestamp minFetchTimestamp); + private: static std::unique_ptr<ReshardingCollectionCloner> _makeCollectionCloner( ReshardingMetrics* metrics, const CommonReshardingMetadata& metadata, const ShardId& myShardId, - Timestamp fetchTimestamp); + Timestamp cloneTimestamp); static std::vector<std::unique_ptr<ReshardingTxnCloner>> _makeTxnCloners( const CommonReshardingMetadata& metadata, diff --git a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp new file mode 100644 index 00000000000..10091b5f3df --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp @@ -0,0 +1,231 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include <memory> +#include <vector> + +#include "mongo/bson/bsonmisc.h" +#include "mongo/db/persistent_task_store.h" +#include "mongo/db/query/collation/collator_factory_mock.h" +#include "mongo/db/query/collation/collator_interface_mock.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/s/resharding/resharding_data_copy_util.h" +#include "mongo/db/s/resharding/resharding_data_replication.h" +#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/chunk_manager.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +class ReshardingDataReplicationTest : public ServiceContextMongoDTest { +public: + void setUp() override { + ServiceContextMongoDTest::setUp(); + + auto serviceContext = getServiceContext(); + { + auto opCtx = makeOperationContext(); + auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext); + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); + + repl::createOplog(opCtx.get()); + + CollatorFactoryInterface::set(serviceContext, std::make_unique<CollatorFactoryMock>()); + } + } + + ChunkManager makeChunkManagerForSourceCollection( + std::unique_ptr<CollatorInterface> defaultCollator) { + const OID epoch = OID::gen(); + std::vector<ChunkType> chunks = {ChunkType{ + _sourceNss, + ChunkRange{BSON(_currentShardKey << MINKEY), BSON(_currentShardKey << MAXKEY)}, + ChunkVersion(100, 0, epoch, boost::none /* timestamp */), + _myDonorId}}; + + auto rt = RoutingTableHistory::makeNew(_sourceNss, + _sourceUUID, + BSON(_currentShardKey << 1), + std::move(defaultCollator), + false /* unique */, + std::move(epoch), + boost::none /* timestamp */, + boost::none /* reshardingFields */, + true /* allowMigrations */, + chunks); + + return ChunkManager(_myDonorId, + DatabaseVersion(UUID::gen()), + makeStandaloneRoutingTableHistory(std::move(rt)), + boost::none /* clusterTime */); + } + +private: + RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) { + const auto version = rt.getVersion(); + return RoutingTableHistoryValueHandle( + std::move(rt), ComparableChunkVersion::makeComparableChunkVersion(version)); + } + + const StringData _currentShardKey = "sk"; + + const NamespaceString _sourceNss{"test_crud", "collection_being_resharded"}; + const CollectionUUID _sourceUUID = UUID::gen(); + + const ShardId _myDonorId{"myDonorId"}; +}; + +TEST_F(ReshardingDataReplicationTest, StashCollectionsHaveSameCollationAsReshardingCollection) { + CollatorInterfaceMock collator(CollatorInterfaceMock::MockType::kReverseString); + auto sourceChunkMgr = makeChunkManagerForSourceCollection(collator.clone()); + + auto stashCollections = [&] { + auto opCtx = makeOperationContext(); + return ReshardingDataReplication::ensureStashCollectionsExist( + opCtx.get(), + sourceChunkMgr, + {DonorShardFetchTimestamp{{"shard0"}}, DonorShardFetchTimestamp{{"shard1"}}}); + }(); + + for (const auto& nss : stashCollections) { + auto opCtx = makeOperationContext(); + AutoGetCollection stashColl(opCtx.get(), nss, MODE_IS); + ASSERT_TRUE(bool(stashColl->getDefaultCollator())) + << "Stash collection was created with 'simple' collation"; + ASSERT_BSONOBJ_BINARY_EQ(stashColl->getDefaultCollator()->getSpec().toBSON(), + collator.getSpec().toBSON()); + } +} + +TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) { + auto opCtx = makeOperationContext(); + + const auto reshardingUUID = UUID::gen(); + auto oplogBufferNss = getLocalOplogBufferNamespace(reshardingUUID, {"shard0"}); + + const auto minFetchTimestamp = Timestamp{10, 0}; + const auto oplogId1 = ReshardingDonorOplogId{{20, 0}, {18, 0}}; + const auto oplogId2 = ReshardingDonorOplogId{{20, 0}, {19, 0}}; + const auto oplogId3 = ReshardingDonorOplogId{{20, 0}, {20, 0}}; + + // The minFetchTimestamp value is used when the oplog buffer collection doesn't exist. + ASSERT_BSONOBJ_BINARY_EQ( + ReshardingDataReplication::getOplogFetcherResumeId( + opCtx.get(), oplogBufferNss, minFetchTimestamp) + .toBSON(), + (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON())); + + // The minFetchTimestamp value is used when the oplog buffer collection is empty. + resharding::data_copy::ensureCollectionExists(opCtx.get(), oplogBufferNss, CollectionOptions{}); + ASSERT_BSONOBJ_BINARY_EQ( + ReshardingDataReplication::getOplogFetcherResumeId( + opCtx.get(), oplogBufferNss, minFetchTimestamp) + .toBSON(), + (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON())); + + auto insertFn = [&](const ReshardingDonorOplogId& oplogId) { + AutoGetCollection oplogBufferColl(opCtx.get(), oplogBufferNss, MODE_IX); + WriteUnitOfWork wuow(opCtx.get()); + ASSERT_OK(oplogBufferColl->insertDocument( + opCtx.get(), InsertStatement{BSON("_id" << oplogId.toBSON())}, nullptr)); + wuow.commit(); + }; + + insertFn(oplogId2); + ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId( + opCtx.get(), oplogBufferNss, minFetchTimestamp) + .toBSON(), + oplogId2.toBSON()); + + // The highest oplog ID is used regardless of the original insertion order. + insertFn(oplogId3); + insertFn(oplogId1); + ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId( + opCtx.get(), oplogBufferNss, minFetchTimestamp) + .toBSON(), + oplogId3.toBSON()); +} + +TEST_F(ReshardingDataReplicationTest, GetOplogApplierResumeId) { + auto opCtx = makeOperationContext(); + + const auto reshardingUUID = UUID::gen(); + const auto minFetchTimestamp = Timestamp{10, 0}; + const ReshardingSourceId sourceId{reshardingUUID, {"shard0"}}; + + // The minFetchTimestamp value is used when the applier progress collection doesn't exist. + ASSERT_BSONOBJ_BINARY_EQ( + ReshardingDataReplication::getOplogApplierResumeId(opCtx.get(), sourceId, minFetchTimestamp) + .toBSON(), + (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON())); + + // The minFetchTimestamp value is used when the applier progress collection is empty. + resharding::data_copy::ensureCollectionExists( + opCtx.get(), NamespaceString::kReshardingApplierProgressNamespace, CollectionOptions{}); + ASSERT_BSONOBJ_BINARY_EQ( + ReshardingDataReplication::getOplogApplierResumeId(opCtx.get(), sourceId, minFetchTimestamp) + .toBSON(), + (ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON())); + + PersistentTaskStore<ReshardingOplogApplierProgress> store( + NamespaceString::kReshardingApplierProgressNamespace); + + const auto expectedOplogId = ReshardingDonorOplogId{{20, 0}, {18, 0}}; + store.add(opCtx.get(), ReshardingOplogApplierProgress{sourceId, expectedOplogId, 5}); + ASSERT_BSONOBJ_BINARY_EQ( + ReshardingDataReplication::getOplogApplierResumeId(opCtx.get(), sourceId, minFetchTimestamp) + .toBSON(), + expectedOplogId.toBSON()); + + // The progress for the specific ReshardingSourceId is returned. + store.add(opCtx.get(), + ReshardingOplogApplierProgress{ReshardingSourceId{reshardingUUID, {"shard1"}}, + ReshardingDonorOplogId{{20, 0}, {19, 0}}, + 6}); + store.add(opCtx.get(), + ReshardingOplogApplierProgress{ReshardingSourceId{UUID::gen(), {"shard0"}}, + ReshardingDonorOplogId{{20, 0}, {20, 0}}, + 7}); + ASSERT_BSONOBJ_BINARY_EQ( + ReshardingDataReplication::getOplogApplierResumeId(opCtx.get(), sourceId, minFetchTimestamp) + .toBSON(), + expectedOplogId.toBSON()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp index ec292da2930..a8616102cdc 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp @@ -259,7 +259,7 @@ private: std::move(epoch), boost::none /* timestamp */, boost::none /* reshardingFields */, - false /* allowMigrations */, + true /* allowMigrations */, chunks); return ChunkManager(_myDonorId, diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index a6c67ebe2b7..6da05212806 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -152,53 +152,6 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx, opCtx, reshardingNss, optionsAndIndexes); } -std::vector<NamespaceString> ensureStashCollectionsExist( - OperationContext* opCtx, - const ChunkManager& cm, - const UUID& existingUUID, - const std::vector<DonorShardFetchTimestamp>& donorShards) { - // Use the same collation for the stash collections as the temporary resharding collection - auto collator = cm.getDefaultCollator(); - BSONObj collationSpec = collator ? collator->getSpec().toBSON() : BSONObj(); - - std::vector<NamespaceString> stashCollections; - stashCollections.reserve(donorShards.size()); - - { - CollectionOptions options; - options.collation = std::move(collationSpec); - for (const auto& donor : donorShards) { - stashCollections.emplace_back(ReshardingOplogApplier::ensureStashCollectionExists( - opCtx, existingUUID, donor.getShardId(), options)); - } - } - - return stashCollections; -} - -ReshardingDonorOplogId getFetcherIdToResumeFrom(OperationContext* opCtx, - NamespaceString oplogBufferNss, - Timestamp fetchTimestamp) { - AutoGetCollection collection(opCtx, oplogBufferNss, MODE_IS); - if (!collection) { - return ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp}; - } - - auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *collection); - return highestOplogBufferId.missing() - ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp} - : ReshardingDonorOplogId::parse({"resharding::getFetcherIdToResumeFrom"}, - highestOplogBufferId.getDocument().toBson()); -} - -ReshardingDonorOplogId getApplierIdToResumeFrom(OperationContext* opCtx, - ReshardingSourceId sourceId, - Timestamp fetchTimestamp) { - auto applierProgress = ReshardingOplogApplier::checkStoredProgress(opCtx, sourceId); - return !applierProgress ? ReshardingDonorOplogId{fetchTimestamp, fetchTimestamp} - : applierProgress->getProgress(); -} - } // namespace resharding std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance( diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index b4b63efb071..b6fa21316c6 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -53,19 +53,6 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx, const UUID& existingUUID, Timestamp fetchTimestamp); -std::vector<NamespaceString> ensureStashCollectionsExist( - OperationContext* opCtx, - const ChunkManager& cm, - const UUID& existingUUID, - const std::vector<DonorShardFetchTimestamp>& donorShards); - -ReshardingDonorOplogId getFetcherIdToResumeFrom(OperationContext* opCtx, - NamespaceString oplogBufferNss, - Timestamp fetchTimestamp); - -ReshardingDonorOplogId getApplierIdToResumeFrom(OperationContext* opCtx, - ReshardingSourceId sourceId, - Timestamp fetchTimestamp); } // namespace resharding class ReshardingRecipientService final : public repl::PrimaryOnlyService { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index c4a5b594045..8d118974d5b 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -542,133 +542,5 @@ TEST_F(ReshardingRecipientServiceTest, verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); } -TEST_F(ReshardingRecipientServiceTest, StashCollectionsHaveSameCollationAsReshardingCollection) { - auto shards = setupNShards(2); - - std::unique_ptr<CollatorInterfaceMock> collator = - std::make_unique<CollatorInterfaceMock>(CollatorInterfaceMock::MockType::kReverseString); - auto collationSpec = collator->getSpec().toBSON(); - auto srcChunkMgr = makeChunkManager(kOrigNss, - ShardKeyPattern(BSON("_id" << 1)), - std::move(collator), - false /* unique */, - {} /* splitPoints */); - - // Create stash collections for both donor shards. - auto stashCollections = resharding::ensureStashCollectionsExist( - operationContext(), srcChunkMgr, kOrigUUID, {ShardId("shard0"), ShardId("shard1")}); - - // Verify that each stash collation has the collation we passed in above. - { - auto opCtx = operationContext(); - - DBDirectClient client(opCtx); - auto collInfos = client.getCollectionInfos("config"); - StringMap<BSONObj> nsToOptions; - for (const auto& coll : collInfos) { - nsToOptions[coll["name"].str()] = coll["options"].Obj(); - } - - for (const auto& coll : stashCollections) { - auto it = nsToOptions.find(coll.coll()); - ASSERT(it != nsToOptions.end()); - auto options = it->second; - - ASSERT(options.hasField("collation")); - auto collation = options["collation"].Obj(); - ASSERT_BSONOBJ_EQ(collationSpec, collation); - } - } -} - -TEST_F(ReshardingRecipientServiceTest, FindFetcherIdToResumeFrom) { - auto opCtx = operationContext(); - NamespaceString oplogBufferNs = - getLocalOplogBufferNamespace(kReshardingUUID, ShardId("shard0")); - auto timestamp0 = Timestamp(1, 0); - auto timestamp1 = Timestamp(1, 1); - auto timestamp2 = Timestamp(1, 2); - auto timestamp3 = Timestamp(1, 3); - - // Start from FetchTimestamp since localOplogBuffer doesn't exist. - ASSERT((resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) == - ReshardingDonorOplogId{timestamp0, timestamp0})); - - - DBDirectClient client(opCtx); - client.insert(oplogBufferNs.toString(), - BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp1))); - - // Make sure to use the entry in localOplogBuffer. - ASSERT((resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) == - ReshardingDonorOplogId{timestamp3, timestamp1})); - - - client.insert(oplogBufferNs.toString(), - BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp3))); - client.insert(oplogBufferNs.toString(), - BSON("_id" << BSON("clusterTime" << timestamp3 << "ts" << timestamp2))); - - // Make sure to choose the largest timestamp. - ASSERT((resharding::getFetcherIdToResumeFrom(opCtx, oplogBufferNs, timestamp0) == - ReshardingDonorOplogId{timestamp3, timestamp3})); -} - -TEST_F(ReshardingRecipientServiceTest, FindApplierIdToResumeFrom) { - auto opCtx = operationContext(); - const ReshardingSourceId sourceId0{UUID::gen(), ShardId("shard0")}; - const ReshardingSourceId sourceId1{UUID::gen(), ShardId("shard1")}; - - auto timestamp0 = Timestamp(1, 0); - auto timestamp1 = Timestamp(1, 1); - auto timestamp2 = Timestamp(1, 2); - auto timestamp3 = Timestamp(1, 3); - - // Start from FetchTimestamp since reshardingApplierProgress doesn't exist. - ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId0, timestamp0) == - ReshardingDonorOplogId{timestamp0, timestamp0})); - ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId1, timestamp0) == - ReshardingDonorOplogId{timestamp0, timestamp0})); - - - DBDirectClient client(opCtx); - auto updateApplierProgress = [&client](auto sourceId, auto clusterTime, auto ts) { - client.update( - NamespaceString::kReshardingApplierProgressNamespace.ns(), - QUERY(ReshardingOplogApplierProgress::kOplogSourceIdFieldName << sourceId.toBSON()), - BSON("$set" << BSON(ReshardingOplogApplierProgress::kProgressFieldName - << BSON("clusterTime" << clusterTime << "ts" << ts) - << ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName - << 1LL)), /* not used for this test */ - true /* upsert */, - false /* multi */); - }; - - updateApplierProgress(sourceId0, timestamp3, timestamp1); - - // SourceId0 resumes from the progress field but sourceId1 still uses FetchTimestamp. - ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId0, timestamp0) == - ReshardingDonorOplogId{timestamp3, timestamp1})); - ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId1, timestamp0) == - ReshardingDonorOplogId{timestamp0, timestamp0})); - - updateApplierProgress(sourceId1, timestamp3, timestamp1); - - // Both resume from the progress field. - ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId0, timestamp0) == - ReshardingDonorOplogId{timestamp3, timestamp1})); - ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId1, timestamp0) == - ReshardingDonorOplogId{timestamp3, timestamp1})); - - updateApplierProgress(sourceId0, timestamp3, timestamp3); - updateApplierProgress(sourceId1, timestamp3, timestamp2); - - // Resume from the updated progress value. - ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId0, timestamp0) == - ReshardingDonorOplogId{timestamp3, timestamp3})); - ASSERT((resharding::getApplierIdToResumeFrom(opCtx, sourceId1, timestamp0) == - ReshardingDonorOplogId{timestamp3, timestamp2})); -} - } // namespace } // namespace mongo |