diff options
-rw-r--r-- | src/mongo/db/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_service.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_service.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_donor_service_test.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.cpp | 144 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.h | 34 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp | 165 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service_external_state.h | 146 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service_external_state_test.cpp (renamed from src/mongo/db/s/resharding/resharding_recipient_service_test.cpp) | 66 |
9 files changed, 391 insertions, 187 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 4d04c6efc51..bd03fc9e93c 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -82,6 +82,7 @@ env.Library( 'resharding/resharding_oplog_fetcher.cpp', 'resharding/resharding_oplog_session_application.cpp', 'resharding/resharding_recipient_service.cpp', + 'resharding/resharding_recipient_service_external_state.cpp', 'resharding/resharding_server_parameters.idl', 'resharding/resharding_txn_cloner.cpp', 'resharding/resharding_txn_cloner_progress.idl', @@ -482,7 +483,7 @@ env.CppUnitTest( 'resharding/resharding_oplog_crud_application_test.cpp', 'resharding/resharding_oplog_fetcher_test.cpp', 'resharding/resharding_oplog_session_application_test.cpp', - 'resharding/resharding_recipient_service_test.cpp', + 'resharding/resharding_recipient_service_external_state_test.cpp', 'resharding/resharding_donor_service_test.cpp', 'resharding/resharding_txn_cloner_test.cpp', 'session_catalog_migration_destination_test.cpp', diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 791128053b5..2ed925efb80 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -185,16 +185,12 @@ public: std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::constructInstance( BSONObj initialState) { - return std::make_shared<DonorStateMachine>(std::move(initialState), - std::make_unique<ExternalStateImpl>()); + return std::make_shared<DonorStateMachine>( + ReshardingDonorDocument::parse({"DonorStateMachine"}, initialState), + std::make_unique<ExternalStateImpl>()); } ReshardingDonorService::DonorStateMachine::DonorStateMachine( - const BSONObj& donorDoc, std::unique_ptr<DonorStateMachineExternalState> externalState) - : DonorStateMachine(ReshardingDonorDocument::parse({"DonorStateMachine"}, donorDoc), - std::move(externalState)) {} - -ReshardingDonorService::DonorStateMachine::DonorStateMachine( const ReshardingDonorDocument& donorDoc, std::unique_ptr<DonorStateMachineExternalState> externalState) : repl::PrimaryOnlyService::TypedInstance<DonorStateMachine>(), diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 11addfc69bc..ab74589d603 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -73,7 +73,7 @@ public: class ReshardingDonorService::DonorStateMachine final : public repl::PrimaryOnlyService::TypedInstance<DonorStateMachine> { public: - explicit DonorStateMachine(const BSONObj& donorDoc, + explicit DonorStateMachine(const ReshardingDonorDocument& donorDoc, std::unique_ptr<DonorStateMachineExternalState> externalState); ~DonorStateMachine(); @@ -104,9 +104,6 @@ public: const ReshardingDonorDocument& donorDoc); private: - DonorStateMachine(const ReshardingDonorDocument& donorDoc, - std::unique_ptr<DonorStateMachineExternalState> externalState); - /** * Runs up until the donor is either in state kBlockingWrites or encountered an error. */ diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp index 2636d00f0b2..fdac7743862 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp @@ -167,8 +167,9 @@ public: : ReshardingDonorService(serviceContext) {} std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override { - return std::make_shared<DonorStateMachine>(std::move(initialState), - std::make_unique<ExternalStateForTest>()); + return std::make_shared<DonorStateMachine>( + ReshardingDonorDocument::parse({"ReshardingDonorServiceForTest"}, initialState), + std::make_unique<ExternalStateForTest>()); } }; diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 29d50b16c3f..1167c7ed61a 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -47,6 +47,7 @@ #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_applier.h" +#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/s/shard_key_util.h" @@ -93,82 +94,19 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<T>& sp, T value) { } // namespace -namespace resharding { - -void createTemporaryReshardingCollectionLocally(OperationContext* opCtx, - const NamespaceString& originalNss, - const NamespaceString& reshardingNss, - const UUID& reshardingUUID, - const UUID& existingUUID, - Timestamp fetchTimestamp) { - LOGV2_DEBUG( - 5002300, 1, "Creating temporary resharding collection", "originalNss"_attr = originalNss); - - auto catalogCache = Grid::get(opCtx)->catalogCache(); - - // Load the original collection's options from the database's primary shard. - auto [collOptions, uuid] = shardVersionRetry( - opCtx, - catalogCache, - reshardingNss, - "loading collection options to create temporary resharding collection"_sd, - [&]() -> MigrationDestinationManager::CollectionOptionsAndUUID { - auto originalCm = uassertStatusOK( - catalogCache->getShardedCollectionRoutingInfoWithRefresh(opCtx, originalNss)); - return MigrationDestinationManager::getCollectionOptions( - opCtx, - NamespaceStringOrUUID(originalNss.db().toString(), existingUUID), - originalCm.dbPrimary(), - originalCm, - fetchTimestamp); - }); - - // Load the original collection's indexes from the shard that owns the global minimum chunk. - auto [indexes, idIndex] = - shardVersionRetry(opCtx, - catalogCache, - reshardingNss, - "loading indexes to create temporary resharding collection"_sd, - [&]() -> MigrationDestinationManager::IndexesAndIdIndex { - auto originalCm = - catalogCache->getShardedCollectionRoutingInfo(opCtx, originalNss); - auto indexShardId = originalCm.getMinKeyShardIdWithSimpleCollation(); - return MigrationDestinationManager::getCollectionIndexes( - opCtx, - NamespaceStringOrUUID(originalNss.db().toString(), existingUUID), - indexShardId, - originalCm, - fetchTimestamp); - }); - - // Set the temporary resharding collection's UUID to the resharding UUID. Note that - // BSONObj::addFields() replaces any fields that already exist. - collOptions = collOptions.addFields(BSON("uuid" << reshardingUUID)); - CollectionOptionsAndIndexes optionsAndIndexes = {reshardingUUID, indexes, idIndex, collOptions}; - MigrationDestinationManager::cloneCollectionIndexesAndOptions( - opCtx, reshardingNss, optionsAndIndexes); -} - -} // namespace resharding - std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance( BSONObj initialState) { return std::make_shared<RecipientStateMachine>( - this, std::move(initialState), ReshardingDataReplication::make); + this, + ReshardingRecipientDocument::parse({"RecipientStateMachine"}, initialState), + std::make_unique<RecipientStateMachineExternalStateImpl>(), + ReshardingDataReplication::make); } ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( const ReshardingRecipientService* recipientService, - const BSONObj& recipientDoc, - ReshardingDataReplicationFactory dataReplicationFactory) - : RecipientStateMachine( - recipientService, - ReshardingRecipientDocument::parse({"RecipientStateMachine"}, recipientDoc), - std::move(dataReplicationFactory)) {} - -ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( - const ReshardingRecipientService* recipientService, const ReshardingRecipientDocument& recipientDoc, + std::unique_ptr<RecipientStateMachineExternalState> externalState, ReshardingDataReplicationFactory dataReplicationFactory) : repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(), _recipientService{recipientService}, @@ -177,6 +115,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( _recipientCtx{recipientDoc.getMutableState()}, _donorShards{recipientDoc.getDonorShards()}, _cloneTimestamp{recipientDoc.getCloneTimestamp()}, + _externalState{std::move(externalState)}, _markKilledExecutor(std::make_shared<ThreadPool>([] { ThreadPool::Options options; options.poolName = "RecipientStateMachineCancelableOpCtxPool"; @@ -184,7 +123,9 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( options.maxThreads = 1; return options; }())), - _dataReplicationFactory{std::move(dataReplicationFactory)} {} + _dataReplicationFactory{std::move(dataReplicationFactory)} { + invariant(_externalState); +} ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() { stdx::lock_guard<Latch> lg(_mutex); @@ -388,29 +329,22 @@ void ReshardingRecipientService::RecipientStateMachine:: { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - resharding::createTemporaryReshardingCollectionLocally(opCtx.get(), - _metadata.getSourceNss(), - _metadata.getTempReshardingNss(), - _metadata.getReshardingUUID(), - _metadata.getSourceUUID(), - *_cloneTimestamp); - - ShardKeyPattern shardKeyPattern{_metadata.getReshardingKey()}; - - auto catalogCache = Grid::get(opCtx.get())->catalogCache(); - shardVersionRetry(opCtx.get(), - catalogCache, - _metadata.getTempReshardingNss(), - "validating shard key index for reshardCollection"_sd, - [&] { - shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible( - opCtx.get(), - _metadata.getTempReshardingNss(), - shardKeyPattern, - CollationSpec::kSimpleSpec, - false, - shardkeyutil::ValidationBehaviorsShardCollection(opCtx.get())); - }); + _externalState->ensureTempReshardingCollectionExistsWithIndexes( + opCtx.get(), _metadata, *_cloneTimestamp); + + _externalState->withShardVersionRetry( + opCtx.get(), + _metadata.getTempReshardingNss(), + "validating shard key index for reshardCollection"_sd, + [&] { + shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible( + opCtx.get(), + _metadata.getTempReshardingNss(), + ShardKeyPattern{_metadata.getReshardingKey()}, + CollationSpec::kSimpleSpec, + false /* unique */, + shardkeyutil::ValidationBehaviorsShardCollection(opCtx.get())); + }); } _transitionState(RecipientStateEnum::kCloning); @@ -421,10 +355,9 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio bool cloningDone) { invariant(_cloneTimestamp); - auto myShardId = ShardingState::get(opCtx->getServiceContext())->shardId(); - auto catalogCache = Grid::get(opCtx)->catalogCache(); + auto myShardId = _externalState->myShardId(opCtx->getServiceContext()); auto sourceChunkMgr = - catalogCache->getShardedCollectionRoutingInfo(opCtx, _metadata.getSourceNss()); + _externalState->getShardedCollectionRoutingInfo(opCtx, _metadata.getSourceNss()); return _dataReplicationFactory(opCtx, _metrics(), @@ -562,7 +495,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: _transitionState(RecipientStateEnum::kStrictConsistency); const bool isAlsoDonor = [&] { - auto myShardId = ShardingState::get(cc().getServiceContext())->shardId(); + auto myShardId = _externalState->myShardId(cc().getServiceContext()); return std::find_if(_donorShards.begin(), _donorShards.end(), [&](const DonorShardFetchTimestamp& donor) { @@ -596,7 +529,7 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi } const bool isAlsoDonor = [&] { - auto myShardId = ShardingState::get(cc().getServiceContext())->shardId(); + auto myShardId = _externalState->myShardId(cc().getServiceContext()); return std::find_if(_donorShards.begin(), _donorShards.end(), [&](const DonorShardFetchTimestamp& donor) { @@ -750,8 +683,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_updateC .thenRunOn(**executor) .then([this] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - - auto shardId = ShardingState::get(opCtx.get())->shardId(); + auto shardId = _externalState->myShardId(opCtx->getServiceContext()); BSONObjBuilder updateBuilder; { @@ -763,16 +695,10 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_updateC } } - uassertStatusOK( - Grid::get(opCtx.get()) - ->catalogClient() - ->updateConfigDocument( - opCtx.get(), - NamespaceString::kConfigReshardingOperationsNamespace, - _makeQueryForCoordinatorUpdate(shardId, _recipientCtx.getState()), - updateBuilder.done(), - false /* upsert */, - ShardingCatalogClient::kMajorityWriteConcern)); + _externalState->updateCoordinatorDocument( + opCtx.get(), + _makeQueryForCoordinatorUpdate(shardId, _recipientCtx.getState()), + updateBuilder.done()); }); } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index e0c9fae5c97..c4a4d57df0d 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -39,23 +39,7 @@ namespace mongo { -namespace resharding { - -/** - * Creates the temporary resharding collection locally by loading the collection options and - * collection indexes from the original collection's primary and MinKey owning chunk shards, - * respectively. - */ -void createTemporaryReshardingCollectionLocally(OperationContext* opCtx, - const NamespaceString& originalNss, - const NamespaceString& reshardingNss, - const UUID& reshardingUUID, - const UUID& existingUUID, - Timestamp fetchTimestamp); - -} // namespace resharding - -class ReshardingRecipientService final : public repl::PrimaryOnlyService { +class ReshardingRecipientService : public repl::PrimaryOnlyService { public: static constexpr StringData kServiceName = "ReshardingRecipientService"_sd; @@ -65,6 +49,8 @@ public: class RecipientStateMachine; + class RecipientStateMachineExternalState; + StringData getServiceName() const override { return kServiceName; } @@ -107,9 +93,11 @@ public: } }; - explicit RecipientStateMachine(const ReshardingRecipientService* recipientService, - const BSONObj& recipientDoc, - ReshardingDataReplicationFactory dataReplicationFactory); + explicit RecipientStateMachine( + const ReshardingRecipientService* recipientService, + const ReshardingRecipientDocument& recipientDoc, + std::unique_ptr<RecipientStateMachineExternalState> externalState, + ReshardingDataReplicationFactory dataReplicationFactory); ~RecipientStateMachine(); @@ -137,10 +125,6 @@ public: const ReshardingRecipientDocument& recipientDoc); private: - RecipientStateMachine(const ReshardingRecipientService* recipientService, - const ReshardingRecipientDocument& recipientDoc, - ReshardingDataReplicationFactory dataReplicationFactory); - // The following functions correspond to the actions to take at a particular recipient state. ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection( const std::shared_ptr<executor::ScopedTaskExecutor>& executor); @@ -221,6 +205,8 @@ private: std::vector<DonorShardFetchTimestamp> _donorShards; boost::optional<Timestamp> _cloneTimestamp; + const std::unique_ptr<RecipientStateMachineExternalState> _externalState; + // ThreadPool used by CancelableOperationContext. // CancelableOperationContext must have a thread that is always available to it to mark its // opCtx as killed when the cancelToken has been cancelled. diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp new file mode 100644 index 00000000000..f160955a050 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp @@ -0,0 +1,165 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding + +#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h" + +#include "mongo/db/s/sharding_state.h" +#include "mongo/logv2/log.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" +#include "mongo/s/resharding/common_types_gen.h" +#include "mongo/s/stale_shard_version_helpers.h" + +namespace mongo { + +void ReshardingRecipientService::RecipientStateMachineExternalState:: + ensureTempReshardingCollectionExistsWithIndexes(OperationContext* opCtx, + const CommonReshardingMetadata& metadata, + Timestamp cloneTimestamp) { + LOGV2_DEBUG(5002300, + 1, + "Creating temporary resharding collection", + "sourceNamespace"_attr = metadata.getSourceNss()); + + // The CatalogCache may have a collection entry from before the source collection was sharded. + // We force a refresh to have this shard realize the collection is now sharded. + refreshCatalogCache(opCtx, metadata.getSourceNss()); + + auto [collOptions, unusedUUID] = getCollectionOptions( + opCtx, + metadata.getSourceNss(), + metadata.getSourceUUID(), + cloneTimestamp, + "loading collection options to create temporary resharding collection"_sd); + + auto [indexes, idIndex] = + getCollectionIndexes(opCtx, + metadata.getSourceNss(), + metadata.getSourceUUID(), + cloneTimestamp, + "loading indexes to create temporary resharding collection"_sd); + + // Set the temporary resharding collection's UUID to the resharding UUID. Note that + // BSONObj::addFields() replaces any fields that already exist. + collOptions = collOptions.addFields(BSON("uuid" << metadata.getReshardingUUID())); + MigrationDestinationManager::cloneCollectionIndexesAndOptions( + opCtx, + metadata.getTempReshardingNss(), + CollectionOptionsAndIndexes{metadata.getReshardingUUID(), + std::move(indexes), + std::move(idIndex), + std::move(collOptions)}); +} + +template <typename Callable> +auto RecipientStateMachineExternalStateImpl::_withShardVersionRetry(OperationContext* opCtx, + const NamespaceString& nss, + StringData reason, + Callable&& callable) { + auto catalogCache = Grid::get(opCtx)->catalogCache(); + return shardVersionRetry(opCtx, catalogCache, nss, reason, std::move(callable)); +} + +ShardId RecipientStateMachineExternalStateImpl::myShardId(ServiceContext* serviceContext) const { + return ShardingState::get(serviceContext)->shardId(); +} + +void RecipientStateMachineExternalStateImpl::refreshCatalogCache(OperationContext* opCtx, + const NamespaceString& nss) { + auto catalogCache = Grid::get(opCtx)->catalogCache(); + uassertStatusOK(catalogCache->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss)); +} + +ChunkManager RecipientStateMachineExternalStateImpl::getShardedCollectionRoutingInfo( + OperationContext* opCtx, const NamespaceString& nss) { + auto catalogCache = Grid::get(opCtx)->catalogCache(); + return catalogCache->getShardedCollectionRoutingInfo(opCtx, nss); +} + +MigrationDestinationManager::CollectionOptionsAndUUID +RecipientStateMachineExternalStateImpl::getCollectionOptions(OperationContext* opCtx, + const NamespaceString& nss, + const CollectionUUID& uuid, + Timestamp afterClusterTime, + StringData reason) { + // Load the collection options from the primary shard for the database. + return _withShardVersionRetry(opCtx, nss, reason, [&] { + auto cm = getShardedCollectionRoutingInfo(opCtx, nss); + return MigrationDestinationManager::getCollectionOptions( + opCtx, + NamespaceStringOrUUID{nss.db().toString(), uuid}, + cm.dbPrimary(), + cm, + afterClusterTime); + }); +} + +MigrationDestinationManager::IndexesAndIdIndex +RecipientStateMachineExternalStateImpl::getCollectionIndexes(OperationContext* opCtx, + const NamespaceString& nss, + const CollectionUUID& uuid, + Timestamp afterClusterTime, + StringData reason) { + // Load the list of indexes from the shard which owns the global minimum chunk. + return _withShardVersionRetry(opCtx, nss, reason, [&] { + auto cm = getShardedCollectionRoutingInfo(opCtx, nss); + return MigrationDestinationManager::getCollectionIndexes( + opCtx, + NamespaceStringOrUUID{nss.db().toString(), uuid}, + cm.getMinKeyShardIdWithSimpleCollation(), + cm, + afterClusterTime); + }); +} + +void RecipientStateMachineExternalStateImpl::withShardVersionRetry( + OperationContext* opCtx, + const NamespaceString& nss, + StringData reason, + unique_function<void()> callback) { + _withShardVersionRetry(opCtx, nss, reason, std::move(callback)); +} + +void RecipientStateMachineExternalStateImpl::updateCoordinatorDocument(OperationContext* opCtx, + const BSONObj& query, + const BSONObj& update) { + auto catalogClient = Grid::get(opCtx)->catalogClient(); + uassertStatusOK( + catalogClient->updateConfigDocument(opCtx, + NamespaceString::kConfigReshardingOperationsNamespace, + query, + update, + false, /* upsert */ + ShardingCatalogClient::kMajorityWriteConcern)); +} + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_external_state.h b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.h new file mode 100644 index 00000000000..7a414229125 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.h @@ -0,0 +1,146 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/util/functional.h" + +#include "mongo/base/string_data.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/catalog/collection_options.h" +#include "mongo/db/s/migration_destination_manager.h" +#include "mongo/db/s/resharding/resharding_recipient_service.h" +#include "mongo/s/chunk_manager.h" +#include "mongo/s/shard_id.h" + +namespace mongo { + +class BSONObj; +class CommonReshardingMetadata; +class NamespaceString; +class OperationContext; +class ServiceContext; + +/** + * Represents the interface that RecipientStateMachine uses to interact with the rest of the + * sharding codebase. + * + * In particular, RecipientStateMachine must not directly use Grid, ShardingState, or + * ShardingCatalogClient. RecipientStateMachine must instead access those types through the + * RecipientStateMachineExternalState interface. Having it behind an interface makes it more + * straightforward to unit test RecipientStateMachine. + */ +class ReshardingRecipientService::RecipientStateMachineExternalState { +public: + virtual ~RecipientStateMachineExternalState() = default; + + virtual ShardId myShardId(ServiceContext* serviceContext) const = 0; + + virtual void refreshCatalogCache(OperationContext* opCtx, const NamespaceString& nss) = 0; + + virtual ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss) = 0; + + virtual MigrationDestinationManager::CollectionOptionsAndUUID getCollectionOptions( + OperationContext* opCtx, + const NamespaceString& nss, + const CollectionUUID& uuid, + Timestamp afterClusterTime, + StringData reason) = 0; + + virtual MigrationDestinationManager::IndexesAndIdIndex getCollectionIndexes( + OperationContext* opCtx, + const NamespaceString& nss, + const CollectionUUID& uuid, + Timestamp afterClusterTime, + StringData reason) = 0; + + virtual void withShardVersionRetry(OperationContext* opCtx, + const NamespaceString& nss, + StringData reason, + unique_function<void()> callback) = 0; + + virtual void updateCoordinatorDocument(OperationContext* opCtx, + const BSONObj& query, + const BSONObj& update) = 0; + + /** + * Creates the temporary resharding collection locally. + * + * The collection options are taken from the primary shard for the source database and the + * collection indexes are taken from the shard which owns the global minimum chunk. + * + * This function won't automatically create an index on the new shard key pattern. + */ + void ensureTempReshardingCollectionExistsWithIndexes(OperationContext* opCtx, + const CommonReshardingMetadata& metadata, + Timestamp cloneTimestamp); +}; + +class RecipientStateMachineExternalStateImpl + : public ReshardingRecipientService::RecipientStateMachineExternalState { +public: + ShardId myShardId(ServiceContext* serviceContext) const override; + + void refreshCatalogCache(OperationContext* opCtx, const NamespaceString& nss) override; + + ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx, + const NamespaceString& nss) override; + + MigrationDestinationManager::CollectionOptionsAndUUID getCollectionOptions( + OperationContext* opCtx, + const NamespaceString& nss, + const CollectionUUID& uuid, + Timestamp afterClusterTime, + StringData reason) override; + + MigrationDestinationManager::IndexesAndIdIndex getCollectionIndexes(OperationContext* opCtx, + const NamespaceString& nss, + const CollectionUUID& uuid, + Timestamp afterClusterTime, + StringData reason) override; + + void withShardVersionRetry(OperationContext* opCtx, + const NamespaceString& nss, + StringData reason, + unique_function<void()> callback) override; + + void updateCoordinatorDocument(OperationContext* opCtx, + const BSONObj& query, + const BSONObj& update) override; + +private: + template <typename Callable> + auto _withShardVersionRetry(OperationContext* opCtx, + const NamespaceString& nss, + StringData reason, + Callable&& callable); +}; + +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_external_state_test.cpp index 8d118974d5b..eada2183685 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_external_state_test.cpp @@ -39,9 +39,8 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" -#include "mongo/db/s/migration_destination_manager.h" #include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" -#include "mongo/db/s/resharding/resharding_recipient_service.h" +#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog_mongod.h" @@ -53,8 +52,8 @@ namespace mongo { namespace { -class ReshardingRecipientServiceTest : public CatalogCacheTestFixture, - public ServiceContextMongoDTest { +class RecipientServiceExternalStateTest : public CatalogCacheTestFixture, + public ServiceContextMongoDTest { public: const ShardKeyPattern kShardKey = ShardKeyPattern(BSON("oldKey" << 1)); const OID kOrigEpoch = OID::gen(); @@ -66,6 +65,8 @@ public: const NamespaceString kReshardingNss = NamespaceString( str::stream() << "db." << NamespaceString::kTemporaryReshardingCollectionPrefix << kOrigUUID); + const CommonReshardingMetadata kMetadata{ + kReshardingUUID, kOrigNss, kOrigUUID, kReshardingNss, kReshardingKey.getKeyPattern()}; const Timestamp kDefaultFetchTimestamp = Timestamp(200, 1); void setUp() override { @@ -244,7 +245,7 @@ public: } }; -TEST_F(ReshardingRecipientServiceTest, CreateLocalReshardingCollectionBasic) { +TEST_F(RecipientServiceExternalStateTest, CreateLocalReshardingCollectionBasic) { auto shards = setupNShards(2); // Shard kOrigNss by _id with chunks [minKey, 0), [0, maxKey] on shards "0" and "1" @@ -282,19 +283,16 @@ TEST_F(ReshardingRecipientServiceTest, CreateLocalReshardingCollectionBasic) { expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); }); - resharding::createTemporaryReshardingCollectionLocally(operationContext(), - kOrigNss, - kReshardingNss, - kReshardingUUID, - kOrigUUID, - kDefaultFetchTimestamp); + RecipientStateMachineExternalStateImpl externalState; + externalState.ensureTempReshardingCollectionExistsWithIndexes( + operationContext(), kMetadata, kDefaultFetchTimestamp); future.default_timed_get(); verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); } -TEST_F(ReshardingRecipientServiceTest, +TEST_F(RecipientServiceExternalStateTest, CreatingLocalReshardingCollectionRetriesOnStaleVersionErrors) { auto shards = setupNShards(2); @@ -324,7 +322,6 @@ TEST_F(ReshardingRecipientServiceTest, expectRefreshReturnForOriginalColl(kOrigNss, kShardKey, kOrigUUID, kOrigEpoch); expectStaleDbVersionError(kOrigNss, "listCollections"); expectGetDatabase(kOrigNss, shards[1].getHost()); - expectRefreshReturnForOriginalColl(kOrigNss, kShardKey, kOrigUUID, kOrigEpoch); expectListCollections( kOrigNss, kOrigUUID, @@ -335,22 +332,20 @@ TEST_F(ReshardingRecipientServiceTest, HostAndPort(shards[1].getHost())); expectStaleEpochError(kOrigNss, "listIndexes"); + expectRefreshReturnForOriginalColl(kOrigNss, kShardKey, kOrigUUID, kOrigEpoch); expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); }); - resharding::createTemporaryReshardingCollectionLocally(operationContext(), - kOrigNss, - kReshardingNss, - kReshardingUUID, - kOrigUUID, - kDefaultFetchTimestamp); + RecipientStateMachineExternalStateImpl externalState; + externalState.ensureTempReshardingCollectionExistsWithIndexes( + operationContext(), kMetadata, kDefaultFetchTimestamp); future.default_timed_get(); verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); } -TEST_F(ReshardingRecipientServiceTest, +TEST_F(RecipientServiceExternalStateTest, CreateLocalReshardingCollectionCollectionAlreadyExistsWithNoIndexes) { auto shards = setupNShards(2); @@ -404,19 +399,16 @@ TEST_F(ReshardingRecipientServiceTest, expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); }); - resharding::createTemporaryReshardingCollectionLocally(operationContext(), - kOrigNss, - kReshardingNss, - kReshardingUUID, - kOrigUUID, - kDefaultFetchTimestamp); + RecipientStateMachineExternalStateImpl externalState; + externalState.ensureTempReshardingCollectionExistsWithIndexes( + operationContext(), kMetadata, kDefaultFetchTimestamp); future.default_timed_get(); verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); } -TEST_F(ReshardingRecipientServiceTest, +TEST_F(RecipientServiceExternalStateTest, CreateLocalReshardingCollectionCollectionAlreadyExistsWithSomeIndexes) { auto shards = setupNShards(2); @@ -472,19 +464,16 @@ TEST_F(ReshardingRecipientServiceTest, expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); }); - resharding::createTemporaryReshardingCollectionLocally(operationContext(), - kOrigNss, - kReshardingNss, - kReshardingUUID, - kOrigUUID, - kDefaultFetchTimestamp); + RecipientStateMachineExternalStateImpl externalState; + externalState.ensureTempReshardingCollectionExistsWithIndexes( + operationContext(), kMetadata, kDefaultFetchTimestamp); future.default_timed_get(); verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes); } -TEST_F(ReshardingRecipientServiceTest, +TEST_F(RecipientServiceExternalStateTest, CreateLocalReshardingCollectionCollectionAlreadyExistsWithAllIndexes) { auto shards = setupNShards(2); @@ -530,12 +519,9 @@ TEST_F(ReshardingRecipientServiceTest, expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost())); }); - resharding::createTemporaryReshardingCollectionLocally(operationContext(), - kOrigNss, - kReshardingNss, - kReshardingUUID, - kOrigUUID, - kDefaultFetchTimestamp); + RecipientStateMachineExternalStateImpl externalState; + externalState.ensureTempReshardingCollectionExistsWithIndexes( + operationContext(), kMetadata, kDefaultFetchTimestamp); future.default_timed_get(); |