diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_remove_chunks_command.cpp | 149 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.cpp | 159 | ||||
-rw-r--r-- | src/mongo/db/s/create_collection_coordinator.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/remove_chunks.idl | 46 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_create_collection_participant_command.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/transaction_validation.cpp | 2 |
7 files changed, 317 insertions, 49 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 84a14e690de..043f124e49a 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -315,6 +315,7 @@ env.Library( 'config/configsvr_move_chunk_command.cpp', 'config/configsvr_move_primary_command.cpp', 'config/configsvr_refine_collection_shard_key_command.cpp', + 'config/configsvr_remove_chunks_command.cpp', 'config/configsvr_remove_shard_command.cpp', 'config/configsvr_remove_shard_from_zone_command.cpp', 'config/configsvr_remove_tags_command.cpp', @@ -341,6 +342,7 @@ env.Library( 'migration_destination_manager_legacy_commands.cpp', 'move_chunk_command.cpp', 'move_primary_coordinator.cpp', + 'remove_chunks.idl', 'rename_collection_coordinator.cpp', 'sharded_rename_collection.idl', 'rename_collection_participant_service.cpp', diff --git a/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp new file mode 100644 index 00000000000..e0b40d08a64 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/cancelable_operation_context.h" +#include "mongo/db/commands.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/remove_chunks_gen.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/grid.h" + +namespace mongo { +namespace { + +class ConfigsvrRemoveChunksCommand final : public TypedCommand<ConfigsvrRemoveChunksCommand> { +public: + using Request = ConfigsvrRemoveChunks; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + const UUID& collectionUUID = request().getCollectionUUID(); + + opCtx->setAlwaysInterruptAtStepDownOrUp(); + + uassert(ErrorCodes::IllegalOperation, + "_configsvrRemoveChunks can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + uassert(ErrorCodes::InvalidOptions, + "_configsvrRemoveChunks must be called with majority writeConcern", + opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); + + // Set the operation context read concern level to local for reads into the config + // database. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + auto txnParticipant = TransactionParticipant::get(opCtx); + uassert( + 5665000, "_configsvrRemoveChunks must be run as a retryable write", txnParticipant); + + { + // Use an ACR because we will perform a {multi: true} delete, which is otherwise not + // supported on a session. + auto newClient = opCtx->getServiceContext()->makeClient("RemoveChunksMetadata"); + { + stdx::lock_guard<Client> lk(*newClient.get()); + newClient->setSystemOperationKillableByStepdown(lk); + } + + AlternativeClientRegion acr(newClient); + auto executor = + Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + auto newOpCtxPtr = CancelableOperationContext( + cc().makeOperationContext(), opCtx->getCancellationToken(), executor); + + // Write with localWriteConcern because we cannot wait for replication with a + // session checked out. The command will wait for majority WC on the epilogue after + // the session has been checked in. + uassertStatusOK( + Grid::get(newOpCtxPtr.get()) + ->catalogClient() + ->removeConfigDocuments(newOpCtxPtr.get(), + ChunkType::ConfigNS, + BSON(ChunkType::collectionUUID << collectionUUID), + ShardingCatalogClient::kLocalWriteConcern)); + } + + // Since we no write happened on this txnNumber, we need to make a dummy write so that + // secondaries can be aware of this txn. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" + << "RemoveChunksMetadataStats"), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } + + private: + NamespaceString ns() const override { + return NamespaceString(request().getDbName(), ""); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + + std::string help() const override { + return "Internal command, which is exported by the sharding config server. Do not call " + "directly. Removes the chunks for the specified collectionUUID."; + } + + bool adminOnly() const override { + return true; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } +} configsvrRemoveChunksCmd; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index 11a787cae82..95b7a127f10 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -40,6 +40,7 @@ #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/create_collection_coordinator.h" #include "mongo/db/s/recoverable_critical_section_service.h" +#include "mongo/db/s/remove_chunks_gen.h" #include "mongo/db/s/shard_key_util.h" #include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_logging.h" @@ -210,33 +211,30 @@ BSONObj getCollation(OperationContext* opCtx, return actualCollatorBSON; } -void removeChunks(OperationContext* opCtx, const UUID& uuid) { - BatchWriteExecStats stats; - BatchedCommandResponse response; - - BatchedCommandRequest deleteRequest([&]() { - write_ops::DeleteCommandRequest deleteOp(ChunkType::ConfigNS); - deleteOp.setWriteCommandRequestBase([] { - write_ops::WriteCommandRequestBase writeCommandBase; - writeCommandBase.setOrdered(false); - return writeCommandBase; - }()); - deleteOp.setDeletes(std::vector{write_ops::DeleteOpEntry( - BSON(ChunkType::collectionUUID << uuid), true /* multi: true */)}); - return deleteOp; - }()); - - deleteRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); +void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx, + const UUID& uuid, + const OperationSessionInfo& osi) { + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - cluster::write(opCtx, deleteRequest, &stats, &response, boost::none); + // Remove the chunks matching uuid + ConfigsvrRemoveChunks configsvrRemoveChunksCmd(uuid); + configsvrRemoveChunksCmd.setDbName(NamespaceString::kAdminDb); - uassertStatusOK(response.toStatus()); + const auto swRemoveChunksResult = configShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + NamespaceString::kAdminDb.toString(), + CommandHelpers::appendMajorityWriteConcern(configsvrRemoveChunksCmd.toBSON(osi.toBSON())), + Shard::RetryPolicy::kIdempotent); + + uassertStatusOKWithContext( + Shard::CommandResponse::getEffectiveStatus(std::move(swRemoveChunksResult)), + str::stream() << "Error removing chunks matching uuid " << uuid); } -void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) { - BatchWriteExecStats stats; - BatchedCommandResponse response; - +void upsertChunks(OperationContext* opCtx, + std::vector<ChunkType>& chunks, + const OperationSessionInfo& osi) { BatchedCommandRequest updateRequest([&]() { write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS); std::vector<write_ops::UpdateOpEntry> entries; @@ -257,14 +255,31 @@ void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) { updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - cluster::write(opCtx, updateRequest, &stats, &response, boost::none); + const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON()); + + BatchedCommandResponse batchResponse; + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + const auto response = + configShard->runCommand(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + ChunkType::ConfigNS.db().toString(), + cmdObj, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); - uassertStatusOK(response.toStatus()); + const auto writeStatus = + Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); + + uassertStatusOK(batchResponse.toStatus()); + uassertStatusOK(writeStatus); } -void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, CollectionType& coll) { - BatchWriteExecStats stats; - BatchedCommandResponse response; +void updateCatalogEntry(OperationContext* opCtx, + const NamespaceString& nss, + CollectionType& coll, + const OperationSessionInfo& osi) { + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + BatchedCommandRequest updateRequest([&]() { write_ops::UpdateCommandRequest updateOp(CollectionType::ConfigNS); updateOp.setUpdates({[&] { @@ -279,13 +294,26 @@ void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, Col }()); updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - try { - cluster::write(opCtx, updateRequest, &stats, &response, boost::none); + const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON()); - uassertStatusOK(response.toStatus()); + try { + BatchedCommandResponse batchResponse; + const auto response = + configShard->runCommand(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + CollectionType::ConfigNS.db().toString(), + cmdObj, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + const auto writeStatus = + Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); + + uassertStatusOK(batchResponse.toStatus()); + uassertStatusOK(writeStatus); } catch (const DBException&) { // If an error happens when contacting the config server, we don't know if the update - // succeded or not, which might cause the local shard version to differ from the config + // succeeded or not, which might cause the local shard version to differ from the config // server, so we clear the metadata to allow another operation to refresh it. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, nss, MODE_IX); @@ -296,21 +324,18 @@ void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, Col void broadcastDropCollection(OperationContext* opCtx, const NamespaceString& nss, - const std::shared_ptr<executor::TaskExecutor>& executor) { + const std::shared_ptr<executor::TaskExecutor>& executor, + const OperationSessionInfo& osi) { const auto primaryShardId = ShardingState::get(opCtx)->shardId(); const ShardsvrDropCollectionParticipant dropCollectionParticipant(nss); auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); - // Remove prymary shard from participants + // Remove primary shard from participants participants.erase(std::remove(participants.begin(), participants.end(), primaryShardId), participants.end()); - sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, - nss.db(), - CommandHelpers::appendMajorityWriteConcern(dropCollectionParticipant.toBSON({})), - participants, - executor); + sharding_ddl_util::sendDropCollectionParticipantCommandToShards( + opCtx, nss, participants, executor, osi); } } // namespace @@ -379,6 +404,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + if (!_firstExecution) { + // Perform a noop write on the participants in order to advance the txnNumber + // for this coordinator's lsid so that requests with older txnNumbers can no + // longer execute. + _performNoopRetryableWriteOnParticipants(opCtx, **executor); + } + if (_recoveredFromDisk) { // If a stedown happened it could've ocurred while waiting for majority when // writing config.collections. If the refresh happens before this write is @@ -428,8 +460,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( 1, "Removing partial changes from previous run", "namespace"_attr = nss()); - removeChunks(opCtx, *uuid); - broadcastDropCollection(opCtx, nss(), **executor); + + _doc = _updateSession(opCtx, _doc); + cleanupPartialChunksFromPreviousAttempt( + opCtx, *uuid, getCurrentSession(_doc)); + + _doc = _updateSession(opCtx, _doc); + broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession(_doc)); } } @@ -455,7 +492,16 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); - _createCollectionOnNonPrimaryShards(opCtx); + + _doc = _updateSession(opCtx, _doc); + try { + _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession(_doc)); + } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) { + // Older 5.0 binaries don't support running the + // _shardsvrCreateCollectionParticipant command as a retryable write yet. In + // that case, retry without attaching session info. + _createCollectionOnNonPrimaryShards(opCtx, boost::none); + } _commit(opCtx); } @@ -638,7 +684,8 @@ void CreateCollectionCoordinator::_createPolicyAndChunks(OperationContext* opCtx _numChunks = _initialChunks.chunks.size(); } -void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationContext* opCtx) { +void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards( + OperationContext* opCtx, const boost::optional<OperationSessionInfo>& osi) { LOGV2_DEBUG(5277905, 2, "Create collection _createCollectionOnNonPrimaryShards", @@ -667,8 +714,8 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC requests.emplace_back( chunkShardId, - createCollectionParticipantRequest.toBSON( - BSON("writeConcern" << ShardingCatalogClient::kMajorityWriteConcern.toBSON()))); + CommandHelpers::appendMajorityWriteConcern( + createCollectionParticipantRequest.toBSON(osi ? osi->toBSON() : BSONObj()))); initializedShards.emplace(chunkShardId); } @@ -704,7 +751,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = nss()); // Upsert Chunks. - upsertChunks(opCtx, _initialChunks.chunks); + _doc = _updateSession(opCtx, _doc); + upsertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc)); CollectionType coll(nss(), _initialChunks.collVersion().epoch(), @@ -722,7 +770,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { coll.setUnique(*_doc.getUnique()); } - updateCatalogEntry(opCtx, nss(), coll); + _doc = _updateSession(opCtx, _doc); + updateCatalogEntry(opCtx, nss(), coll, getCurrentSession(_doc)); } void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) { @@ -799,6 +848,20 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj()); } +void CreateCollectionCoordinator::_performNoopRetryableWriteOnParticipants( + OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor) { + auto shardsAndConfigsvr = [&] { + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto participants = shardRegistry->getAllShardIds(opCtx); + participants.emplace_back(shardRegistry->getConfigShard()->getId()); + return participants; + }(); + + _doc = _updateSession(opCtx, _doc); + sharding_ddl_util::performNoopRetryableWriteOnShards( + opCtx, shardsAndConfigsvr, getCurrentSession(_doc), executor); +} + // Phase change API. void CreateCollectionCoordinator::_enterPhase(Phase newPhase) { diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h index bb1f303e3c3..8d57374d640 100644 --- a/src/mongo/db/s/create_collection_coordinator.h +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -111,7 +111,8 @@ private: * If the optimized path can be taken, ensure the collection is already created in all the * participant shards. */ - void _createCollectionOnNonPrimaryShards(OperationContext* opCtx); + void _createCollectionOnNonPrimaryShards(OperationContext* opCtx, + const boost::optional<OperationSessionInfo>& osi); /** * Does the following writes: @@ -135,6 +136,10 @@ private: */ void _logEndCreateCollection(OperationContext* opCtx); + void _performNoopRetryableWriteOnParticipants( + OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor); + + CreateCollectionCoordinatorDocument _doc; const BSONObj _critSecReason; diff --git a/src/mongo/db/s/remove_chunks.idl b/src/mongo/db/s/remove_chunks.idl new file mode 100644 index 00000000000..a997b8c282e --- /dev/null +++ b/src/mongo/db/s/remove_chunks.idl @@ -0,0 +1,46 @@ +# Copyright (C) 2021-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +commands: + _configsvrRemoveChunks: + command_name: _configsvrRemoveChunks + cpp_name: ConfigsvrRemoveChunks + description: "internal _configsvrRemoveChunks command for config server" + namespace: ignored + api_version: "" + strict: false + fields: + collectionUUID: + type: uuid + description: "The collection uuid of the chunks to be removed" diff --git a/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp index e4708a1ce94..e49c2d78aac 100644 --- a/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp +++ b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp @@ -70,6 +70,7 @@ public: << request().toBSON(BSONObj()), opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); + opCtx->setAlwaysInterruptAtStepDownOrUp(); MigrationDestinationManager::cloneCollectionIndexesAndOptions( opCtx, diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp index 574af8ba82d..f30d64eb205 100644 --- a/src/mongo/db/transaction_validation.cpp +++ b/src/mongo/db/transaction_validation.cpp @@ -51,7 +51,9 @@ const StringMap<int> retryableWriteCommands = {{"delete", 1}, {"insert", 1}, {"update", 1}, {"_recvChunkStart", 1}, + {"_configsvrRemoveChunks", 1}, {"_configsvrRemoveTags", 1}, + {"_shardsvrCreateCollectionParticipant", 1}, {"_shardsvrDropCollectionParticipant", 1}, {"_shardsvrRenameCollectionParticipant", 1}, {"_shardsvrRenameCollectionParticipantUnblock", 1}, |