diff options
author | Jordi Serra Torrens <jordi.serra-torrens@mongodb.com> | 2021-02-17 12:54:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-02-25 08:49:19 +0000 |
commit | 1d3c1ca0ddb2c984ca66381cc39565cbec01c645 (patch) | |
tree | 9f3d69748bd8eaee8fa2af56792981f722699769 /src/mongo | |
parent | d1a3ee71e82eb35a7f1e1ea6fcee4fa920317346 (diff) | |
download | mongo-1d3c1ca0ddb2c984ca66381cc39565cbec01c645.tar.gz |
SERVER-53861: Implement stop migrations procedure for DDL operations
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp | 105 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager.h | 8 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/s/drop_collection_coordinator.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/s/drop_collection_coordinator.h | 1 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_service.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.cpp | 58 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_util.h | 15 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_util.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_util.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_util.cpp | 103 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_util.h | 58 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_util_refresh_test.cpp (renamed from src/mongo/db/s/resharding/resharding_util_refresh_test.cpp) | 28 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/request_types/set_allow_migrations.idl | 50 |
16 files changed, 439 insertions, 104 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 00f970bb141..a855f50c787 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -266,6 +266,7 @@ env.Library( 'drop_database_legacy.cpp', 'type_lockpings.cpp', 'type_locks.cpp', + 'sharding_util.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/catalog_raii', @@ -327,6 +328,7 @@ env.Library( 'config/configsvr_remove_shard_command.cpp', 'config/configsvr_remove_shard_from_zone_command.cpp', 'config/configsvr_reshard_collection_cmd.cpp', + 'config/configsvr_set_allow_migrations_command.cpp', 'config/configsvr_shard_collection_command.cpp', 'config/configsvr_split_chunk_command.cpp', 'config/configsvr_update_zone_key_range_command.cpp', @@ -561,9 +563,9 @@ env.CppUnitTest( 'config/sharding_catalog_manager_split_chunk_test.cpp', 'resharding/resharding_coordinator_observer_test.cpp', 'resharding/resharding_coordinator_test.cpp', - 'resharding/resharding_util_refresh_test.cpp', 'resharding/resharding_util_test.cpp', 'sharding_ddl_util_test.cpp', + 'sharding_util_refresh_test.cpp', 'type_lockpings_test.cpp', 'type_locks_test.cpp', 'vector_clock_config_server_test.cpp', diff --git a/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp b/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp new file mode 100644 index 00000000000..8ab105f72e6 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/s/request_types/set_allow_migrations_gen.h" + +namespace mongo { +namespace { + +class ConfigsvrSetAllowMigrationsCommand final + : public TypedCommand<ConfigsvrSetAllowMigrationsCommand> { +public: + using Request = ConfigsvrSetAllowMigrations; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + const NamespaceString& nss = ns(); + + uassert(ErrorCodes::IllegalOperation, + "_configsvrSetAllowMigrations can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + uassert(ErrorCodes::InvalidOptions, + "_configsvrSetAllowMigrations must be called with majority writeConcern", + opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); + + // Set the operation context read concern level to local for reads into the config + // database. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + const auto allowMigrations = request().getAllowMigrations(); + + ShardingCatalogManager::get(opCtx)->setAllowMigrationsAndBumpOneChunk( + opCtx, nss, allowMigrations); + } + + private: + NamespaceString ns() const override { + return request().getCommandParameter(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + + std::string help() const override { + return "Internal command, which is exported by the sharding config server. Do not call " + "directly. Sets the allowMigrations flag on the specified collection."; + } + + bool adminOnly() const override { + return true; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } +} configsvrSetAllowMigrationsCmd; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 9de828a0b46..7413e595e0d 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -301,6 +301,14 @@ public: const NamespaceString& nss, const BSONObj& minKey); + /** + * In a transaction, sets the 'allowMigrations' to the requested state and bumps the collection + * version. + */ + void setAllowMigrationsAndBumpOneChunk(OperationContext* opCtx, + const NamespaceString& nss, + bool allowMigrations); + // // Database Operations // diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 811d9e9cc66..f4a1d7c5349 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -45,6 +45,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/sharding_logging.h" +#include "mongo/db/s/sharding_util.h" #include "mongo/db/server_options.h" #include "mongo/db/snapshot_window_options_gen.h" #include "mongo/db/transaction_participant_gen.h" @@ -1453,4 +1454,55 @@ void ShardingCatalogManager::splitOrMarkJumbo(OperationContext* opCtx, } } +void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(OperationContext* opCtx, + const NamespaceString& nss, + bool allowMigrations) { + std::set<ShardId> shardsIds; + { + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + + const auto cm = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, + nss)); + cm.getAllShardIds(&shardsIds); + withTransaction( + opCtx, CollectionType::ConfigNS, [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Update the 'allowMigrations' field. An unset 'allowMigrations' field implies + // 'true'. To ease backwards compatibility we omit 'allowMigrations' instead of + // setting it explicitly to 'true'. + const auto update = allowMigrations + ? BSON("$unset" << BSON(CollectionType::kAllowMigrationsFieldName << "")) + : BSON("$set" << BSON(CollectionType::kAllowMigrationsFieldName << false)); + + writeToConfigDocumentInTxn( + opCtx, + CollectionType::ConfigNS, + BatchedCommandRequest::buildUpdateOp( + CollectionType::ConfigNS, + BSON(CollectionType::kNssFieldName << nss.ns()) /* query */, + update /* update */, + false /* upsert */, + false /* multi */), + txnNumber); + + // Bump the chunk version for one single chunk + invariant(!shardsIds.empty()); + bumpMajorVersionOneChunkPerShard(opCtx, nss, txnNumber, {*shardsIds.begin()}); + }); + + // From now on migrations are not allowed anymore, so it is not possible that new shards + // will own chunks for this collection. + } + + // Trigger a refresh on each shard containing chunks for this collection. + const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + sharding_util::tellShardsToRefreshCollection( + opCtx, + {std::make_move_iterator(shardsIds.begin()), std::make_move_iterator(shardsIds.end())}, + nss, + executor); +} + } // namespace mongo diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp index 49e9ef287b2..a824e2f916d 100644 --- a/src/mongo/db/s/drop_collection_coordinator.cpp +++ b/src/mongo/db/s/drop_collection_coordinator.cpp @@ -77,17 +77,6 @@ void DropCollectionCoordinator::_sendDropCollToParticipants(OperationContext* op } } -void DropCollectionCoordinator::_stopMigrations(OperationContext* opCtx) { - // TODO SERVER-53861 this will not stop current ongoing migrations - uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument( - opCtx, - CollectionType::ConfigNS, - BSON(CollectionType::kNssFieldName << _nss.ns()), - BSON("$set" << BSON(CollectionType::kAllowMigrationsFieldName << false)), - false /* upsert */, - ShardingCatalogClient::kMajorityWriteConcern)); -} - SemiFuture<void> DropCollectionCoordinator::runImpl( std::shared_ptr<executor::TaskExecutor> executor) { return ExecutorFuture<void>(executor, Status::OK()) @@ -103,7 +92,11 @@ SemiFuture<void> DropCollectionCoordinator::runImpl( const auto collDistLock = uassertStatusOK(distLockManager->lock( opCtx, _nss.ns(), "DropCollection", DistLockManager::kDefaultLockTimeout)); - _stopMigrations(opCtx); + try { + sharding_ddl_util::stopMigrations(opCtx, _nss); + } catch (ExceptionFor<ErrorCodes::NamespaceNotSharded>&) { + // The collection is not sharded or doesn't exist. + } const auto catalogClient = Grid::get(opCtx)->catalogClient(); diff --git a/src/mongo/db/s/drop_collection_coordinator.h b/src/mongo/db/s/drop_collection_coordinator.h index b462fd04e04..8ddb85dfcad 100644 --- a/src/mongo/db/s/drop_collection_coordinator.h +++ b/src/mongo/db/s/drop_collection_coordinator.h @@ -43,7 +43,6 @@ public: private: SemiFuture<void> runImpl(std::shared_ptr<executor::TaskExecutor> executor) override; - void _stopMigrations(OperationContext* opCtx); void _sendDropCollToParticipants(OperationContext* opCtx); ServiceContext* _serviceContext; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index a51bddbef22..0f451f28a9d 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -40,6 +40,7 @@ #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_util.h" #include "mongo/db/vector_clock.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -1287,14 +1288,16 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe nssToRefresh = _coordinatorDoc.getNss(); } - tellShardsToRefresh(opCtx.get(), recipientIds, nssToRefresh, **executor); + sharding_util::tellShardsToRefreshCollection( + opCtx.get(), recipientIds, nssToRefresh, **executor); } void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { auto opCtx = cc().makeOperationContext(); auto donorIds = extractShardIds(_coordinatorDoc.getDonorShards()); - tellShardsToRefresh(opCtx.get(), donorIds, _coordinatorDoc.getNss(), **executor); + sharding_util::tellShardsToRefreshCollection( + opCtx.get(), donorIds, _coordinatorDoc.getNss(), **executor); } void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh( @@ -1306,11 +1309,11 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsTo std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()}; participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end()); - sendCommandToShards(opCtx.get(), - refreshCmd, - {participantShardIds.begin(), participantShardIds.end()}, - _coordinatorDoc.getNss(), - **executor); + sharding_util::sendCommandToShards(opCtx.get(), + refreshCmd, + {participantShardIds.begin(), participantShardIds.end()}, + _coordinatorDoc.getNss(), + **executor); } } // namespace mongo diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 61035669c3f..858c1fe2c90 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -140,64 +140,6 @@ std::set<ShardId> getRecipientShards(OperationContext* opCtx, return recipients; } -void tellShardsToRefresh(OperationContext* opCtx, - const std::vector<ShardId>& shardIds, - const NamespaceString& nss, - const std::shared_ptr<executor::TaskExecutor>& executor) { - auto cmd = _flushRoutingTableCacheUpdatesWithWriteConcern(nss); - cmd.setSyncFromConfig(true); - cmd.setDbName(nss.db()); - auto cmdObj = - cmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); - - sendCommandToShards(opCtx, cmdObj, shardIds, nss, executor); -} - -void sendCommandToShards(OperationContext* opCtx, - const BSONObj& command, - const std::vector<ShardId>& shardIds, - const NamespaceString& nss, - const std::shared_ptr<executor::TaskExecutor>& executor) { - std::vector<AsyncRequestsSender::Request> requests; - for (const auto& shardId : shardIds) { - requests.emplace_back(shardId, command); - } - - if (!requests.empty()) { - // The _flushRoutingTableCacheUpdatesWithWriteConcern command will fail with a - // QueryPlanKilled error response if the config.cache.chunks collection is dropped - // concurrently. The config.cache.chunks collection is dropped by the shard when it detects - // the sharded collection's epoch having changed. We use kIdempotentOrCursorInvalidated so - // the ARS automatically retries in that situation. - AsyncRequestsSender ars(opCtx, - executor, - "admin", - requests, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - Shard::RetryPolicy::kIdempotentOrCursorInvalidated); - - while (!ars.done()) { - // Retrieve the responses and throw at the first failure. - auto response = ars.next(); - - auto generateErrorContext = [&]() -> std::string { - return str::stream() - << "Unable to _flushRoutingTableCacheUpdatesWithWriteConcern for namespace " - << nss.ns() << " on " << response.shardId; - }; - - auto shardResponse = - uassertStatusOKWithContext(std::move(response.swResponse), generateErrorContext()); - - auto status = getStatusFromCommandResult(shardResponse.data); - uassertStatusOKWithContext(status, generateErrorContext()); - - auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data); - uassertStatusOKWithContext(wcStatus, generateErrorContext()); - } - } -} - void checkForHolesAndOverlapsInChunks(std::vector<ReshardedChunk>& chunks, const KeyPattern& keyPattern) { std::sort(chunks.begin(), chunks.end(), [](const ReshardedChunk& a, const ReshardedChunk& b) { diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index bbc36407ab1..007202ca4ab 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -202,21 +202,6 @@ std::set<ShardId> getRecipientShards(OperationContext* opCtx, const UUID& reshardingUUID); /** - * Sends _flushRoutingTableCacheUpdatesWithWriteConcern to a list of shards. Throws if one of the - * shards fails to refresh. - */ -void tellShardsToRefresh(OperationContext* opCtx, - const std::vector<ShardId>& shardIds, - const NamespaceString& nss, - const std::shared_ptr<executor::TaskExecutor>& executor); - -void sendCommandToShards(OperationContext* opCtx, - const BSONObj& command, - const std::vector<ShardId>& shardIds, - const NamespaceString& nss, - const std::shared_ptr<executor::TaskExecutor>& executor); - -/** * Asserts that there is not a hole or overlap in the chunks. */ void checkForHolesAndOverlapsInChunks(std::vector<ReshardedChunk>& chunks, diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp index 715d8cda56d..90a9cd70cad 100644 --- a/src/mongo/db/s/sharding_ddl_util.cpp +++ b/src/mongo/db/s/sharding_ddl_util.cpp @@ -41,6 +41,7 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_tags.h" #include "mongo/s/grid.h" +#include "mongo/s/request_types/set_allow_migrations_gen.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -258,5 +259,25 @@ void releaseCriticalSection(OperationContext* opCtx, const NamespaceString& nss) csr->clearFilteringMetadata(opCtx); } +void stopMigrations(OperationContext* opCtx, const NamespaceString& nss) { + const ConfigsvrSetAllowMigrations configsvrSetAllowMigrationsCmd(nss, + false /* allowMigrations */); + const auto swSetAllowMigrationsResult = + Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + NamespaceString::kAdminDb.toString(), + CommandHelpers::appendMajorityWriteConcern(configsvrSetAllowMigrationsCmd.toBSON({})), + Shard::RetryPolicy::kIdempotent // Although ConfigsvrSetAllowMigrations is not really + // idempotent (because it will cause the collection + // version to be bumped), it is safe to be retried. + ); + + uassertStatusOKWithContext( + Shard::CommandResponse::getEffectiveStatus(std::move(swSetAllowMigrationsResult)), + str::stream() << "Error setting allowMigrations to false for collection " + << nss.toString()); +} + } // namespace sharding_ddl_util } // namespace mongo diff --git a/src/mongo/db/s/sharding_ddl_util.h b/src/mongo/db/s/sharding_ddl_util.h index 0fb804781b6..9f0ae72e67c 100644 --- a/src/mongo/db/s/sharding_ddl_util.h +++ b/src/mongo/db/s/sharding_ddl_util.h @@ -91,5 +91,10 @@ void acquireCriticalSection(OperationContext* opCtx, const NamespaceString& nss) */ void releaseCriticalSection(OperationContext* opCtx, const NamespaceString& nss); +/** + * Stops ongoing migrations and prevents future ones to start for the given nss. + */ +void stopMigrations(OperationContext* opCtx, const NamespaceString& nss); + } // namespace sharding_ddl_util } // namespace mongo diff --git a/src/mongo/db/s/sharding_util.cpp b/src/mongo/db/s/sharding_util.cpp new file mode 100644 index 00000000000..7f0d5080bdb --- /dev/null +++ b/src/mongo/db/s/sharding_util.cpp @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_util.h" + +#include "mongo/db/commands.h" +#include "mongo/logv2/log.h" +#include "mongo/s/async_requests_sender.h" +#include "mongo/s/request_types/flush_routing_table_cache_updates_gen.h" + +namespace mongo { + +namespace sharding_util { + +void tellShardsToRefreshCollection(OperationContext* opCtx, + const std::vector<ShardId>& shardIds, + const NamespaceString& nss, + const std::shared_ptr<executor::TaskExecutor>& executor) { + auto cmd = _flushRoutingTableCacheUpdatesWithWriteConcern(nss); + cmd.setSyncFromConfig(true); + cmd.setDbName(nss.db()); + auto cmdObj = CommandHelpers::appendMajorityWriteConcern(cmd.toBSON({})); + + sendCommandToShards(opCtx, cmdObj, shardIds, nss, executor); +} + +void sendCommandToShards(OperationContext* opCtx, + const BSONObj& command, + const std::vector<ShardId>& shardIds, + const NamespaceString& nss, + const std::shared_ptr<executor::TaskExecutor>& executor) { + std::vector<AsyncRequestsSender::Request> requests; + for (const auto& shardId : shardIds) { + requests.emplace_back(shardId, command); + } + + if (!requests.empty()) { + // The _flushRoutingTableCacheUpdatesWithWriteConcern command will fail with a + // QueryPlanKilled error response if the config.cache.chunks collection is dropped + // concurrently. The config.cache.chunks collection is dropped by the shard when it detects + // the sharded collection's epoch having changed. We use kIdempotentOrCursorInvalidated so + // the ARS automatically retries in that situation. + AsyncRequestsSender ars(opCtx, + executor, + "admin", + requests, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotentOrCursorInvalidated); + + while (!ars.done()) { + // Retrieve the responses and throw at the first failure. + auto response = ars.next(); + + auto generateErrorContext = [&]() -> std::string { + return str::stream() + << "Unable to _flushRoutingTableCacheUpdatesWithWriteConcern for namespace " + << nss.ns() << " on " << response.shardId; + }; + + auto shardResponse = + uassertStatusOKWithContext(std::move(response.swResponse), generateErrorContext()); + + auto status = getStatusFromCommandResult(shardResponse.data); + uassertStatusOKWithContext(status, generateErrorContext()); + + auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data); + uassertStatusOKWithContext(wcStatus, generateErrorContext()); + } + } +} + +} // namespace sharding_util +} // namespace mongo diff --git a/src/mongo/db/s/sharding_util.h b/src/mongo/db/s/sharding_util.h new file mode 100644 index 00000000000..85946fd5cea --- /dev/null +++ b/src/mongo/db/s/sharding_util.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/shard_id.h" + +namespace mongo { +namespace sharding_util { + +/** + * Sends _flushRoutingTableCacheUpdatesWithWriteConcern to a list of shards. Throws if one of the + * shards fails to refresh. + */ +void tellShardsToRefreshCollection(OperationContext* opCtx, + const std::vector<ShardId>& shardIds, + const NamespaceString& nss, + const std::shared_ptr<executor::TaskExecutor>& executor); + +void sendCommandToShards(OperationContext* opCtx, + const BSONObj& command, + const std::vector<ShardId>& shardIds, + const NamespaceString& nss, + const std::shared_ptr<executor::TaskExecutor>& executor); + +} // namespace sharding_util +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_util_refresh_test.cpp b/src/mongo/db/s/sharding_util_refresh_test.cpp index f13a484f857..90850ecc206 100644 --- a/src/mongo/db/s/resharding/resharding_util_refresh_test.cpp +++ b/src/mongo/db/s/sharding_util_refresh_test.cpp @@ -33,7 +33,7 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/s/config/config_server_test_fixture.h" -#include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_util.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_shard.h" @@ -55,7 +55,7 @@ const BSONObj kMockResWithWriteConcernError = const Status kRetryableError{ErrorCodes::HostUnreachable, "RetryableError for test"}; -class ReshardingRefresherTest : public ConfigServerTestFixture { +class ShardingRefresherTest : public ConfigServerTestFixture { protected: void setUp() { ConfigServerTestFixture::setUp(); @@ -74,10 +74,12 @@ protected: } }; -TEST_F(ReshardingRefresherTest, refresherTwoShardsSucceed) { +TEST_F(ShardingRefresherTest, refresherTwoShardsSucceed) { auto opCtx = operationContext(); auto nss = NamespaceString("mydb", "mycoll"); - auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); }); + auto future = launchAsync([&] { + sharding_util::tellShardsToRefreshCollection(opCtx, kShardIdList, nss, executor()); + }); onCommand([&](const executor::RemoteCommandRequest& request) { return BSON("ok" << 1); }); onCommand([&](const executor::RemoteCommandRequest& request) { return BSON("ok" << 1); }); @@ -85,20 +87,24 @@ TEST_F(ReshardingRefresherTest, refresherTwoShardsSucceed) { future.default_timed_get(); } -TEST_F(ReshardingRefresherTest, refresherTwoShardsFirstErrors) { +TEST_F(ShardingRefresherTest, refresherTwoShardsFirstErrors) { auto opCtx = operationContext(); auto nss = NamespaceString("mydb", "mycoll"); - auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); }); + auto future = launchAsync([&] { + sharding_util::tellShardsToRefreshCollection(opCtx, kShardIdList, nss, executor()); + }); onCommand([&](const executor::RemoteCommandRequest& request) { return kMockErrorRes; }); ASSERT_THROWS_CODE(future.default_timed_get(), DBException, kMockStatus.code()); } -TEST_F(ReshardingRefresherTest, refresherTwoShardsSecondErrors) { +TEST_F(ShardingRefresherTest, refresherTwoShardsSecondErrors) { auto opCtx = operationContext(); auto nss = NamespaceString("mydb", "mycoll"); - auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); }); + auto future = launchAsync([&] { + sharding_util::tellShardsToRefreshCollection(opCtx, kShardIdList, nss, executor()); + }); onCommand([&](const executor::RemoteCommandRequest& request) { return BSON("ok" << 1); }); onCommand([&](const executor::RemoteCommandRequest& request) { return kMockErrorRes; }); @@ -106,10 +112,12 @@ TEST_F(ReshardingRefresherTest, refresherTwoShardsSecondErrors) { ASSERT_THROWS_CODE(future.default_timed_get(), DBException, kMockStatus.code()); } -TEST_F(ReshardingRefresherTest, refresherTwoShardsWriteConcernFailed) { +TEST_F(ShardingRefresherTest, refresherTwoShardsWriteConcernFailed) { auto opCtx = operationContext(); auto nss = NamespaceString("mydb", "mycoll"); - auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); }); + auto future = launchAsync([&] { + sharding_util::tellShardsToRefreshCollection(opCtx, kShardIdList, nss, executor()); + }); onCommand([&](const executor::RemoteCommandRequest& request) { return kMockResWithWriteConcernError; diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index c1b13f8a89e..f78c1273d24 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -168,6 +168,7 @@ env.Library( 'request_types/refine_collection_shard_key.idl', 'request_types/remove_shard_from_zone_request_type.cpp', 'request_types/reshard_collection.idl', + 'request_types/set_allow_migrations.idl', 'request_types/set_shard_version_request.cpp', 'request_types/shard_collection.idl', 'request_types/sharded_ddl_commands.idl', diff --git a/src/mongo/s/request_types/set_allow_migrations.idl b/src/mongo/s/request_types/set_allow_migrations.idl new file mode 100644 index 00000000000..5c1829218de --- /dev/null +++ b/src/mongo/s/request_types/set_allow_migrations.idl @@ -0,0 +1,50 @@ +# Copyright (C) 2021-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# configsvrSetAllowMigrations IDL File + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +commands: + _configsvrSetAllowMigrations: + command_name: _configsvrSetAllowMigrations + cpp_name: ConfigsvrSetAllowMigrations + description: "internal setAllowMigrations command for config server" + namespace: type + api_version: "" + type: namespacestring + strict: false + fields: + allowMigrations: + type: bool + description: "The new allowMigrations flag state to be set." + optional: false |