diff options
author | Blake Oler <blake.oler@mongodb.com> | 2020-10-07 23:09:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-10-15 21:19:28 +0000 |
commit | 235371574161936f50f712de54bad7c806f4cbd6 (patch) | |
tree | e6e90ab204bd04a54c633f7bde5b611e44393525 | |
parent | b370bdde808ed5791fe37c3d760a9f356cef8dc5 (diff) | |
download | mongo-235371574161936f50f712de54bad7c806f4cbd6.tar.gz |
SERVER-51212 Handle resharding fields from shard version mismatch function
-rw-r--r-- | src/mongo/db/mongod_main.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_service.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_test.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp | 184 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_recipient_common.h | 80 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp | 374 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_service.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.h | 31 | ||||
-rw-r--r-- | src/mongo/db/s/shard_filtering_metadata_refresh.cpp | 11 |
14 files changed, 708 insertions, 67 deletions
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 88dde2eb787..ce8b7f477e7 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -137,7 +137,9 @@ #include "mongo/db/s/op_observer_sharding_impl.h" #include "mongo/db/s/periodic_sharded_index_consistency_checker.h" #include "mongo/db/s/resharding/resharding_coordinator_service.h" +#include "mongo/db/s/resharding/resharding_donor_service.h" #include "mongo/db/s/resharding/resharding_op_observer.h" +#include "mongo/db/s/resharding/resharding_recipient_service.h" #include "mongo/db/s/shard_server_op_observer.h" #include "mongo/db/s/sharding_initialization_mongod.h" #include "mongo/db/s/sharding_state_recovery.h" @@ -309,7 +311,13 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) { std::vector<std::unique_ptr<repl::PrimaryOnlyService>> services; services.push_back(std::make_unique<TenantMigrationDonorService>(serviceContext)); services.push_back(std::make_unique<repl::TenantMigrationRecipientService>(serviceContext)); - services.push_back(std::make_unique<ReshardingCoordinatorService>(serviceContext)); + + if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { + services.push_back(std::make_unique<ReshardingCoordinatorService>(serviceContext)); + } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) { + services.push_back(std::make_unique<ReshardingDonorService>(serviceContext)); + services.push_back(std::make_unique<ReshardingRecipientService>(serviceContext)); + } for (auto& service : services) { registry->registerService(std::move(service)); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index ddbc7ab1f0b..dc0100104da 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -67,6 +67,7 @@ env.Library( 'periodic_sharded_index_consistency_checker.cpp', 'range_deletion_util.cpp', 'read_only_catalog_cache_loader.cpp', + 'resharding/resharding_donor_recipient_common.cpp', 'resharding/resharding_op_observer.cpp', 'resharding/resharding_coordinator_observer.cpp', 'resharding/resharding_coordinator_service.cpp', @@ -461,6 +462,7 @@ env.CppUnitTest( 'migration_session_id_test.cpp', 'migration_util_test.cpp', 'namespace_metadata_change_notifications_test.cpp', + 'resharding/resharding_donor_recipient_common_test.cpp', 'resharding/resharding_recipient_service_test.cpp', 'resharding_collection_test.cpp', 'resharding_destined_recipient_test.cpp', diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index d024f4f1e4e..d24eaba115d 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -127,7 +127,8 @@ public: std::set<ShardId> recipientShardIds; std::vector<ChunkType> initialChunks; ChunkVersion version(1, 0, OID::gen()); - auto tempReshardingNss = constructTemporaryReshardingNss(nss, cm); + auto tempReshardingNss = constructTemporaryReshardingNss( + nss.db(), getCollectionUUIDFromChunkManger(nss, cm)); if (presetReshardedChunksSpecified) { const auto chunks = request().get_presetReshardedChunks().get(); @@ -211,7 +212,7 @@ public: // This promise will currently be falsely fulfilled by a call to interrupt() inside // the ReshardingCoordinatorService. This is to enable jsTests to pass while code // is still being committed. - // TODO SERVER-51212 Change this comment and assess the current call to .wait(). + // TODO SERVER-51398 Change this comment and assess the current call to .wait(). instance->getObserver()->awaitAllDonorsReadyToDonate().wait(); } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index e17e86635bd..b0d30953211 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -391,9 +391,7 @@ CollectionType createTempReshardingCollectionType( TypeCollectionRecipientFields recipient( std::move(donorShardIds), coordinatorDoc.getExistingUUID(), coordinatorDoc.getNss()); - if (coordinatorDoc.getFetchTimestampStruct().getFetchTimestamp()) { - recipient.setFetchTimestampStruct(coordinatorDoc.getFetchTimestampStruct()); - } + emplaceFetchTimestampIfExists(recipient, coordinatorDoc.getFetchTimestamp()); tempEntryReshardingFields.setRecipientFields(recipient); collType.setReshardingFields(std::move(tempEntryReshardingFields)); @@ -650,7 +648,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat return ExecutorFuture<void>(**executor, Status::OK()); } - // TODO SERVER-51212 Remove this call. + // TODO SERVER-51398 Remove this call. interrupt({ErrorCodes::InternalError, "Early exit to support jsTesting"}); return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate() @@ -764,15 +762,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates( // Build new state doc for coordinator state update ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedStateDoc; updatedCoordinatorDoc.setState(nextState); - if (fetchTimestamp) { - auto& fetchTimestampStruct = updatedCoordinatorDoc.getFetchTimestampStruct(); - if (fetchTimestampStruct.getFetchTimestamp()) - invariant(fetchTimestampStruct.getFetchTimestamp().get() == fetchTimestamp.get()); - - invariant(!fetchTimestamp->isNull()); - - fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp)); - } + emplaceFetchTimestampIfExists(updatedCoordinatorDoc, std::move(fetchTimestamp)); auto opCtx = cc().makeOperationContext(); resharding::persistStateTransition(opCtx.get(), updatedCoordinatorDoc); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index 66256efcac6..e6a7daeafb6 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -38,6 +38,7 @@ #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/s/config/config_server_test_fixture.h" #include "mongo/db/s/resharding/resharding_coordinator_service.h" +#include "mongo/db/s/resharding_util.h" #include "mongo/db/s/transaction_coordinator_service.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/s/catalog/type_collection.h" @@ -86,15 +87,7 @@ protected: {DonorShardEntry(ShardId("shard0000"))}, {RecipientShardEntry(ShardId("shard0001"))}); doc.setCommonReshardingMetadata(meta); - if (fetchTimestamp) { - auto fetchTimestampStruct = doc.getFetchTimestampStruct(); - if (fetchTimestampStruct.getFetchTimestamp()) - invariant(fetchTimestampStruct.getFetchTimestamp().get() == fetchTimestamp.get()); - - fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp)); - doc.setFetchTimestampStruct(fetchTimestampStruct); - } - + emplaceFetchTimestampIfExists(doc, std::move(fetchTimestamp)); return doc; } @@ -565,11 +558,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistFetchTimestampStateTransitio // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; expectedCoordinatorDoc.setState(CoordinatorStateEnum::kCloning); - Timestamp fetchTimestamp = Timestamp(1, 1); - auto fetchTimestampStruct = expectedCoordinatorDoc.getFetchTimestampStruct(); - fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp)); - expectedCoordinatorDoc.setFetchTimestampStruct(fetchTimestampStruct); - + emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1)); persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); } diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp new file mode 100644 index 00000000000..8b848a95271 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp @@ -0,0 +1,184 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/resharding/resharding_donor_recipient_common.h" + +namespace mongo { +namespace resharding { + +using DonorStateMachine = ReshardingDonorService::DonorStateMachine; +using RecipientStateMachine = ReshardingRecipientService::RecipientStateMachine; + +namespace { +std::vector<DonorShardMirroringEntry> createDonorShardMirroringEntriesFromDonorShardIds( + const std::vector<ShardId>& shardIds) { + std::vector<DonorShardMirroringEntry> donorShards(shardIds.size()); + for (size_t i = 0; i < shardIds.size(); ++i) { + donorShards[i] = {shardIds[i], false /* mirroring */}; + } + return donorShards; +} + +/* + * Creates a ReshardingStateMachine with the assumption that the state machine does not already + * exist. + */ +template <class Service, class StateMachine, class ReshardingDocument> +void createReshardingStateMachine(OperationContext* opCtx, const ReshardingDocument& doc) { + auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); + auto service = registry->lookupServiceByName(Service::kServiceName); + StateMachine::getOrCreate(opCtx, service, doc.toBSON()); +} + +/* + * Either constructs a new ReshardingDonorStateMachine with 'reshardingFields' or passes + * 'reshardingFields' to an already-existing ReshardingDonorStateMachine. + */ +void processReshardingFieldsForDonorCollection(OperationContext* opCtx, + const NamespaceString& nss, + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields) { + if (auto donorStateMachine = tryGetReshardingStateMachine<ReshardingDonorService, + DonorStateMachine, + ReshardingDonorDocument>( + opCtx, reshardingFields.getUuid())) { + donorStateMachine->get()->onReshardingFieldsChanges(reshardingFields); + return; + } + + // If a resharding operation is past state kPreparingToDonate but does not currently have a + // donor document in-memory, this means that the document will be recovered by the + // ReshardingDonorService, and at that time the latest instance of 'reshardingFields' will be + // read. Return no-op. + if (reshardingFields.getState() > CoordinatorStateEnum::kPreparingToDonate) { + return; + } + + auto donorDoc = constructDonorDocumentFromReshardingFields(nss, metadata, reshardingFields); + createReshardingStateMachine<ReshardingDonorService, + DonorStateMachine, + ReshardingDonorDocument>(opCtx, donorDoc); +} + +/* + * Either constructs a new ReshardingRecipientStateMachine with 'reshardingFields' or passes + * 'reshardingFields' to an already-existing ReshardingRecipientStateMachine. + */ +void processReshardingFieldsForRecipientCollection(OperationContext* opCtx, + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields) { + if (auto recipientStateMachine = tryGetReshardingStateMachine<ReshardingRecipientService, + RecipientStateMachine, + ReshardingRecipientDocument>( + opCtx, reshardingFields.getUuid())) { + recipientStateMachine->get()->onReshardingFieldsChanges(reshardingFields); + return; + } + + // If a resharding operation is past state kCloning but does not currently have a recipient + // document in-memory, this means that the document will be recovered by the + // ReshardingRecipientService, and at that time the latest instance of 'reshardingFields' + // will be read. Return no-op. + if (reshardingFields.getState() > CoordinatorStateEnum::kCloning) { + return; + } + + auto recipientDoc = + constructRecipientDocumentFromReshardingFields(opCtx, metadata, reshardingFields); + createReshardingStateMachine<ReshardingRecipientService, + RecipientStateMachine, + ReshardingRecipientDocument>(opCtx, recipientDoc); +} + +} // namespace + +ReshardingDonorDocument constructDonorDocumentFromReshardingFields( + const NamespaceString& nss, + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields) { + auto donorDoc = ReshardingDonorDocument(DonorStateEnum::kPreparingToDonate); + + auto commonMetadata = + CommonReshardingMetadata(reshardingFields.getUuid(), + nss, + getCollectionUUIDFromChunkManger(nss, *metadata.getChunkManager()), + reshardingFields.getDonorFields()->getReshardingKey().toBSON()); + donorDoc.setCommonReshardingMetadata(std::move(commonMetadata)); + + return donorDoc; +} + +ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( + OperationContext* opCtx, + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields) { + std::vector<DonorShardMirroringEntry> donorShards = + createDonorShardMirroringEntriesFromDonorShardIds( + reshardingFields.getRecipientFields()->getDonorShardIds()); + + auto recipientDoc = + ReshardingRecipientDocument(RecipientStateEnum::kCloning, std::move(donorShards)); + + auto commonMetadata = + CommonReshardingMetadata(reshardingFields.getUuid(), + reshardingFields.getRecipientFields()->getOriginalNamespace(), + reshardingFields.getRecipientFields()->getExistingUUID(), + metadata.getShardKeyPattern().toBSON()); + recipientDoc.setCommonReshardingMetadata(std::move(commonMetadata)); + + emplaceFetchTimestampIfExists(recipientDoc, + reshardingFields.getRecipientFields()->getFetchTimestamp()); + + return recipientDoc; +} + +void processReshardingFieldsForCollection(OperationContext* opCtx, + const NamespaceString& nss, + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields) { + if (reshardingFields.getDonorFields()) { + invariant(!reshardingFields.getRecipientFields()); + processReshardingFieldsForDonorCollection(opCtx, nss, metadata, reshardingFields); + return; + } + + if (reshardingFields.getRecipientFields()) { + invariant(!reshardingFields.getDonorFields()); + processReshardingFieldsForRecipientCollection(opCtx, metadata, reshardingFields); + return; + } + + MONGO_UNREACHABLE +} + +} // namespace resharding + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h new file mode 100644 index 00000000000..309be7d5230 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/resharding/resharding_donor_service.h" +#include "mongo/db/s/resharding/resharding_recipient_service.h" +#include "mongo/db/s/resharding_util.h" + +namespace mongo { +namespace resharding { + +using ReshardingFields = TypeCollectionReshardingFields; + +/** + * Looks up the StateMachine by the 'reshardingUUID'. If it does not exist, returns boost::none. + */ +template <class Service, class StateMachine, class ReshardingDocument> +boost::optional<std::shared_ptr<StateMachine>> tryGetReshardingStateMachine( + OperationContext* opCtx, const UUID& reshardingUUID) { + auto instanceId = BSON(ReshardingDocument::k_idFieldName << reshardingUUID); + auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()); + auto service = registry->lookupServiceByName(Service::kServiceName); + return StateMachine::lookup(opCtx, service, instanceId); +} + +/** + * The following functions construct a ReshardingDocument from the given 'reshardingFields'. + */ +ReshardingDonorDocument constructDonorDocumentFromReshardingFields( + const NamespaceString& nss, + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields); + +ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields( + OperationContext* opCtx, + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields); + +/** + * Takes in the reshardingFields from a collection's config.collections entry and gives the + * corresponding ReshardingDonorStateMachine or ReshardingRecipientStateMachine the updated + * information. Will construct a ReshardingDonorStateMachine or ReshardingRecipientStateMachine if: + * 1. The reshardingFields state indicates that the resharding operation is new, and + * 2. A state machine does not exist on this node for the given namespace. + */ +void processReshardingFieldsForCollection(OperationContext* opCtx, + const NamespaceString& nss, + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields); + +} // namespace resharding + +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp new file mode 100644 index 00000000000..f479d02942e --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp @@ -0,0 +1,374 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/catalog_raii.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/resharding/resharding_donor_recipient_common.h" +#include "mongo/db/s/shard_server_test_fixture.h" +#include "mongo/unittest/death_test.h" +#include "mongo/util/fail_point.h" + +namespace mongo { +namespace { + +using namespace fmt::literals; + +class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture { +public: + const UUID kExistingUUID = UUID::gen(); + const NamespaceString kOriginalNss = NamespaceString("db", "foo"); + + const NamespaceString kTemporaryReshardingNss = + constructTemporaryReshardingNss("db", kExistingUUID); + const std::string kOriginalShardKey = "oldKey"; + const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1); + const std::string kReshardingKey = "newKey"; + const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1); + const OID kOriginalEpoch = OID::gen(); + const OID kReshardingEpoch = OID::gen(); + const UUID kReshardingUUID = UUID::gen(); + + const ShardId kShardOne = ShardId("shardOne"); + const ShardId kShardTwo = ShardId("shardTwo"); + + const std::vector<ShardId> kShardIds = {kShardOne, kShardTwo}; + + const Timestamp kFetchTimestamp = Timestamp(1, 0); + +protected: + CollectionMetadata makeShardedMetadataForOriginalCollection(OperationContext* opCtx) { + return makeShardedMetadata(opCtx, + kOriginalNss, + kOriginalShardKey, + kOriginalShardKeyPattern, + kExistingUUID, + kOriginalEpoch); + } + + CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection( + OperationContext* opCtx) { + return makeShardedMetadata(opCtx, + kTemporaryReshardingNss, + kReshardingKey, + kReshardingKeyPattern, + kReshardingUUID, + kReshardingEpoch); + } + + CollectionMetadata makeShardedMetadata(OperationContext* opCtx, + const NamespaceString& nss, + const std::string& shardKey, + const BSONObj& shardKeyPattern, + const UUID& uuid, + const OID& epoch) { + auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY)); + auto chunk = ChunkType(nss, std::move(range), ChunkVersion(1, 0, epoch), kShardTwo); + ChunkManager cm( + kShardOne, + DatabaseVersion(uuid, 1), + makeStandaloneRoutingTableHistory(RoutingTableHistory::makeNew(nss, + uuid, + shardKeyPattern, + nullptr, + false, + epoch, + boost::none, + {std::move(chunk)})), + boost::none); + + if (!OperationShardingState::isOperationVersioned(opCtx)) { + const auto version = cm.getVersion(kShardOne); + BSONObjBuilder builder; + version.appendToCommand(&builder); + + auto& oss = OperationShardingState::get(opCtx); + oss.initializeClientRoutingVersionsFromCommand(nss, builder.obj()); + } + + return CollectionMetadata(std::move(cm), kShardOne); + } + + ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID, + CoordinatorStateEnum state) { + auto fields = ReshardingFields(reshardingUUID); + fields.setState(state); + return fields; + }; + + void appendDonorFieldsToReshardingFields(ReshardingFields& fields, + const BSONObj& reshardingKey) { + fields.setDonorFields(TypeCollectionDonorFields(reshardingKey)); + } + + void appendRecipientFieldsToReshardingFields( + ReshardingFields& fields, + const std::vector<ShardId> donorShardIds, + const UUID& existingUUID, + const NamespaceString& originalNss, + const boost::optional<Timestamp>& fetchTimestamp = boost::none) { + auto recipientFields = + TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss); + emplaceFetchTimestampIfExists(recipientFields, fetchTimestamp); + fields.setRecipientFields(std::move(recipientFields)); + } + + template <class ReshardingDocument> + void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss, + const UUID& reshardingUUID, + const UUID& existingUUID, + const BSONObj& reshardingKey, + const ReshardingDocument& reshardingDoc) { + ASSERT_EQ(reshardingDoc.get_id(), reshardingUUID); + ASSERT_EQ(reshardingDoc.getNss(), nss); + ASSERT_EQ(reshardingDoc.getExistingUUID(), existingUUID); + ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey(), reshardingKey); + } + + void assertDonorDocMatchesReshardingFields(const NamespaceString& nss, + const UUID& existingUUID, + const ReshardingFields& reshardingFields, + const ReshardingDonorDocument& donorDoc) { + assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>( + nss, + reshardingFields.getUuid(), + existingUUID, + reshardingFields.getDonorFields()->getReshardingKey().toBSON(), + donorDoc); + ASSERT(donorDoc.getState() == DonorStateEnum::kPreparingToDonate); + ASSERT(donorDoc.getMinFetchTimestamp() == boost::none); + } + + void assertRecipientDocMatchesReshardingFields( + const CollectionMetadata& metadata, + const ReshardingFields& reshardingFields, + const ReshardingRecipientDocument& recipientDoc) { + assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>( + reshardingFields.getRecipientFields()->getOriginalNamespace(), + reshardingFields.getUuid(), + reshardingFields.getRecipientFields()->getExistingUUID(), + metadata.getShardKeyPattern().toBSON(), + recipientDoc); + + ASSERT(recipientDoc.getState() == RecipientStateEnum::kCloning); + ASSERT(recipientDoc.getFetchTimestamp() == + reshardingFields.getRecipientFields()->getFetchTimestamp()); + + auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds(); + auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end()); + + for (const auto& donorShardMirroringEntry : recipientDoc.getDonorShardsMirroring()) { + ASSERT_EQ(donorShardMirroringEntry.getMirroring(), false); + auto reshardingFieldsDonorShardId = + donorShardIdsSet.find(donorShardMirroringEntry.getId()); + ASSERT(reshardingFieldsDonorShardId != donorShardIdsSet.end()); + donorShardIdsSet.erase(reshardingFieldsDonorShardId); + } + + ASSERT(donorShardIdsSet.empty()); + } +}; + +TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromReshardingFields) { + OperationContext* opCtx = operationContext(); + auto metadata = makeShardedMetadataForOriginalCollection(opCtx); + + auto reshardingFields = + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); + appendDonorFieldsToReshardingFields(reshardingFields, kReshardingKeyPattern); + + auto donorDoc = resharding::constructDonorDocumentFromReshardingFields( + kOriginalNss, metadata, reshardingFields); + assertDonorDocMatchesReshardingFields(kOriginalNss, kExistingUUID, reshardingFields, donorDoc); +} + +TEST_F(ReshardingDonorRecipientCommonInternalsTest, + ConstructRecipientDocumentFromReshardingFields) { + OperationContext* opCtx = operationContext(); + auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx); + + auto reshardingFields = + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning); + + appendRecipientFieldsToReshardingFields( + reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp); + + auto recipientDoc = resharding::constructRecipientDocumentFromReshardingFields( + opCtx, metadata, reshardingFields); + assertRecipientDocMatchesReshardingFields(metadata, reshardingFields, recipientDoc); +} + +class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest { +public: + void setUp() override { + ShardServerTestFixture::setUp(); + + WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext()); + + _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); + + std::unique_ptr<ReshardingDonorService> donorService = + std::make_unique<ReshardingDonorService>(getServiceContext()); + _registry->registerService(std::move(donorService)); + + std::unique_ptr<ReshardingRecipientService> recipientService = + std::make_unique<ReshardingRecipientService>(getServiceContext()); + _registry->registerService(std::move(recipientService)); + _registry->onStartup(operationContext()); + + stepUp(); + } + + void tearDown() override { + WaitForMajorityService::get(getServiceContext()).shutDown(); + + _registry->onShutdown(); + + ShardServerTestFixture::tearDown(); + } + + void stepUp() { + auto replCoord = repl::ReplicationCoordinator::get(getServiceContext()); + + // Advance term + _term++; + + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + ASSERT_OK(replCoord->updateTerm(operationContext(), _term)); + replCoord->setMyLastAppliedOpTimeAndWallTime( + repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t())); + + _registry->onStepUpComplete(operationContext(), _term); + } + +protected: + repl::PrimaryOnlyServiceRegistry* _registry; + long long _term = 0; +}; + +TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) { + OperationContext* opCtx = operationContext(); + auto metadata = makeShardedMetadataForOriginalCollection(opCtx); + + auto reshardingFields = + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate); + appendDonorFieldsToReshardingFields(reshardingFields, kReshardingKeyPattern); + + resharding::processReshardingFieldsForCollection( + opCtx, kOriginalNss, metadata, reshardingFields); + + auto donorStateMachine = + resharding::tryGetReshardingStateMachine<ReshardingDonorService, + ReshardingDonorService::DonorStateMachine, + ReshardingDonorDocument>(opCtx, kReshardingUUID); + + ASSERT(donorStateMachine != boost::none); +} + +TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) { + OperationContext* opCtx = operationContext(); + auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx); + + auto reshardingFields = + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning); + appendRecipientFieldsToReshardingFields( + reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp); + + resharding::processReshardingFieldsForCollection( + opCtx, kTemporaryReshardingNss, metadata, reshardingFields); + + auto recipientStateMachine = + resharding::tryGetReshardingStateMachine<ReshardingRecipientService, + ReshardingRecipientService::RecipientStateMachine, + ReshardingRecipientDocument>(opCtx, + kReshardingUUID); + + ASSERT(recipientStateMachine != boost::none); +} + +TEST_F(ReshardingDonorRecipientCommonTest, + CreateDonorServiceInstanceWithIncorrectCoordinatorState) { + OperationContext* opCtx = operationContext(); + auto metadata = makeShardedMetadataForOriginalCollection(opCtx); + + auto reshardingFields = + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCommitted); + appendDonorFieldsToReshardingFields(reshardingFields, kReshardingKeyPattern); + + auto donorStateMachine = + resharding::tryGetReshardingStateMachine<ReshardingDonorService, + ReshardingDonorService::DonorStateMachine, + ReshardingDonorDocument>(opCtx, kReshardingUUID); + + ASSERT(donorStateMachine == boost::none); +} + +TEST_F(ReshardingDonorRecipientCommonTest, + CreateRecipientServiceInstanceWithIncorrectCoordinatorState) { + OperationContext* opCtx = operationContext(); + auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx); + + auto reshardingFields = + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCommitted); + appendRecipientFieldsToReshardingFields( + reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp); + + resharding::processReshardingFieldsForCollection( + opCtx, kTemporaryReshardingNss, metadata, reshardingFields); + + auto recipientStateMachine = + resharding::tryGetReshardingStateMachine<ReshardingRecipientService, + ReshardingRecipientService::RecipientStateMachine, + ReshardingRecipientDocument>(opCtx, + kReshardingUUID); + + ASSERT(recipientStateMachine == boost::none); +} + +DEATH_TEST_F(ReshardingDonorRecipientCommonTest, + ProcessReshardingFieldsWithoutDonorOrRecipientFields, + "invariant") { + OperationContext* opCtx = operationContext(); + auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx); + + auto reshardingFields = + createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning); + + resharding::processReshardingFieldsForCollection( + opCtx, kTemporaryReshardingNss, metadata, reshardingFields); +} + +} // namespace + +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index e26ef0aa737..90e63a6e2c1 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -34,10 +34,11 @@ #include "mongo/s/resharding/type_collection_fields_gen.h" namespace mongo { -constexpr StringData kReshardingDonorServiceName = "ReshardingDonorService"_sd; class ReshardingDonorService final : public repl::PrimaryOnlyService { public: + static constexpr StringData kServiceName = "ReshardingDonorService"_sd; + explicit ReshardingDonorService(ServiceContext* serviceContext) : PrimaryOnlyService(serviceContext) {} ~ReshardingDonorService() = default; @@ -45,7 +46,7 @@ public: class DonorStateMachine; StringData getServiceName() const override { - return kReshardingDonorServiceName; + return kServiceName; } NamespaceString getStateDocumentsNS() const override { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 6fa3ca44f4d..07980cd9740 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -279,15 +279,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState( return; } - if (fetchTimestamp) { - auto& fetchTimestampStruct = replacementDoc.getFetchTimestampStruct(); - - // If the recipient is recovering and already knows the fetchTimestamp, it cannot change - if (fetchTimestampStruct.getFetchTimestamp()) - invariant(fetchTimestampStruct.getFetchTimestamp().get() == fetchTimestamp.get()); - - fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp)); - } + emplaceFetchTimestampIfExists(replacementDoc, std::move(fetchTimestamp)); _updateRecipientDocument(std::move(replacementDoc)); } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 6b13827e31b..61a8e1e3a12 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -34,7 +34,6 @@ #include "mongo/s/resharding/type_collection_fields_gen.h" namespace mongo { -constexpr StringData kReshardingRecipientServiceName = "ReshardingRecipientService"_sd; namespace resharding { @@ -51,6 +50,8 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx, class ReshardingRecipientService final : public repl::PrimaryOnlyService { public: + static constexpr StringData kServiceName = "ReshardingRecipientService"_sd; + explicit ReshardingRecipientService(ServiceContext* serviceContext) : PrimaryOnlyService(serviceContext) {} ~ReshardingRecipientService() = default; @@ -58,7 +59,7 @@ public: class RecipientStateMachine; StringData getServiceName() const override { - return kReshardingRecipientServiceName; + return kServiceName; } NamespaceString getStateDocumentsNS() const override { diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 8cc41078863..6c3db032662 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -72,15 +72,6 @@ UUID getCollectionUuid(OperationContext* opCtx, const NamespaceString& nss) { return *uuid; } -// Assembles the namespace string for the temporary resharding collection based on the source -// namespace components. -NamespaceString getTempReshardingNss(StringData db, const UUID& sourceUuid) { - return NamespaceString(db, - fmt::format("{}{}", - NamespaceString::kTemporaryReshardingCollectionPrefix, - sourceUuid.toString())); -} - // Ensure that this shard owns the document. This must be called after verifying that we // are in a resharding operation so that we are guaranteed that migrations are suspended. bool documentBelongsToMe(OperationContext* opCtx, @@ -152,14 +143,12 @@ UUID getCollectionUUIDFromChunkManger(const NamespaceString& originalNss, const return collectionUUID.get(); } -NamespaceString constructTemporaryReshardingNss(const NamespaceString& originalNss, - const ChunkManager& cm) { - auto collectionUUID = getCollectionUUIDFromChunkManger(originalNss, cm); - NamespaceString tempReshardingNss( - originalNss.db(), - "{}{}"_format(NamespaceString::kTemporaryReshardingCollectionPrefix, - collectionUUID.toString())); - return tempReshardingNss; + +NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourceUuid) { + return NamespaceString(db, + fmt::format("{}{}", + NamespaceString::kTemporaryReshardingCollectionPrefix, + sourceUuid.toString())); } BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) { @@ -716,7 +705,7 @@ boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx, bool allowLocks = true; auto tempNssRoutingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( opCtx, - getTempReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)), + constructTemporaryReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)), allowLocks); uassert(ShardInvalidatedForTargetingInfo(sourceNss), diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 0ef4d258815..58f4fd5d0a6 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -50,6 +50,28 @@ namespace mongo { constexpr auto kReshardingOplogPrePostImageOps = "prePostImageOps"_sd; /** + * Emplaces the 'fetchTimestamp' onto the ClassWithFetchTimestamp if the timestamp has been + * emplaced inside the boost::optional. + */ +template <class ClassWithFetchTimestamp> +void emplaceFetchTimestampIfExists(ClassWithFetchTimestamp& c, + boost::optional<Timestamp> fetchTimestamp) { + if (!fetchTimestamp) { + return; + } + + invariant(!fetchTimestamp->isNull()); + + if (auto alreadyExistingFetchTimestamp = c.getFetchTimestamp()) { + invariant(fetchTimestamp == alreadyExistingFetchTimestamp); + } + + FetchTimestamp fetchTimestampStruct; + fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp)); + c.setFetchTimestampStruct(std::move(fetchTimestampStruct)); +} + +/** * Helper method to construct a DonorShardEntry with the fields specified. */ DonorShardEntry makeDonorShard(ShardId shardId, @@ -72,15 +94,12 @@ RecipientShardEntry makeRecipientShard( UUID getCollectionUUIDFromChunkManger(const NamespaceString& nss, const ChunkManager& cm); /** - * Constructs the temporary resharding collection's namespace provided the original collection's - * namespace and chunk manager. + * Assembles the namespace string for the temporary resharding collection based on the source + * namespace components. * * <db>.system.resharding.<existing collection's UUID> - * - * Note: throws if the original collection does not have a UUID. */ -NamespaceString constructTemporaryReshardingNss(const NamespaceString& originalNss, - const ChunkManager& cm); +NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourceUuid); /** * Constructs a BatchedCommandRequest with batch type 'Insert'. diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 614032749bb..05f6e2bb3f2 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -41,6 +41,7 @@ #include "mongo/db/s/database_sharding_state.h" #include "mongo/db/s/migration_util.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/resharding/resharding_donor_recipient_common.h" #include "mongo/db/s/sharding_runtime_d_params_gen.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_statistics.h" @@ -130,6 +131,16 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext } currentMetadata = forceGetCurrentMetadata(opCtx, nss); + + if (currentMetadata && currentMetadata->isSharded()) { + // If the collection metadata after a refresh has 'reshardingFields', then pass it + // to the resharding subsystem to process. + const auto& reshardingFields = currentMetadata->getReshardingFields(); + if (reshardingFields) { + resharding::processReshardingFieldsForCollection( + opCtx, nss, *currentMetadata, *reshardingFields); + } + } }) .semi() .share(); |