diff options
author | Enrico Golfieri <enrico.golfieri@mongodb.com> | 2023-05-09 08:00:22 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-09 08:38:12 +0000 |
commit | 53cdeb55770a686b3ff4f09ed95176d54b9064d7 (patch) | |
tree | 74d9526e884dbbfa57fcb2ed2b0c189ee4b813e3 /src/mongo/db/s | |
parent | d4e78b7d757ccf1225bcd87d44ade39d1ce38586 (diff) | |
download | mongo-53cdeb55770a686b3ff4f09ed95176d54b9064d7.tar.gz |
SERVER-74192 execute commit phase of renameCollection in a single transaction
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.cpp | 387 | ||||
-rw-r--r-- | src/mongo/db/s/rename_collection_coordinator.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator.idl | 2 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_coordinator_service.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_rename_collection_command.cpp | 6 |
5 files changed, 349 insertions, 53 deletions
diff --git a/src/mongo/db/s/rename_collection_coordinator.cpp b/src/mongo/db/s/rename_collection_coordinator.cpp index 29ed595b6b4..fe580a20329 100644 --- a/src/mongo/db/s/rename_collection_coordinator.cpp +++ b/src/mongo/db/s/rename_collection_coordinator.cpp @@ -38,6 +38,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/ops/insert.h" #include "mongo/db/persistent_task_store.h" +#include "mongo/db/query/distinct_command_gen.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/sharded_index_catalog_commands_gen.h" #include "mongo/db/s/sharding_ddl_util.h" @@ -50,6 +51,7 @@ #include "mongo/logv2/log.h" #include "mongo/s/analyze_shard_key_documents_gen.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_tags.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/index_version.h" @@ -102,12 +104,11 @@ void renameIndexMetadataInShards(OperationContext* opCtx, return {vt.configTime(), vt.clusterTime().asTimestamp()}; }(); - // Bump the index version only if there are indexes in the source - // collection. + // Bump the index version only if there are indexes in the source collection. auto optShardedCollInfo = doc->getOptShardedCollInfo(); if (optShardedCollInfo && optShardedCollInfo->getIndexVersion()) { - // Bump sharding catalog's index version on the config server if the source - // collection is sharded. It will be updated later on. + // Bump sharding catalog's index version on the config server if the source collection is + // sharded. It will be updated later on. optShardedCollInfo->setIndexVersion({optShardedCollInfo->getUuid(), newIndexVersion}); doc->setOptShardedCollInfo(optShardedCollInfo); } @@ -127,6 +128,318 @@ void renameIndexMetadataInShards(OperationContext* opCtx, participants, executor); } + +std::vector<ShardId> getLatestCollectionPlacementInfoFor(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid) { + // Use the content of config.chunks to obtain the placement of the collection being renamed. + // The request is equivalent to 'configDb.chunks.distinct("shard", {uuid:collectionUuid})'. + auto query = BSON(NamespacePlacementType::kNssFieldName << nss.ns()); + + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + + DistinctCommandRequest distinctRequest(ChunkType::ConfigNS); + distinctRequest.setKey(ChunkType::shard.name()); + distinctRequest.setQuery(BSON(ChunkType::collectionUUID.name() << uuid)); + auto rc = BSON(repl::ReadConcernArgs::kReadConcernFieldName << repl::ReadConcernArgs::kLocal); + + auto reply = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet{}), + DatabaseName::kConfig.toString(), + distinctRequest.toBSON({rc}), + Shard::RetryPolicy::kIdempotent)); + + uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(reply)); + std::vector<ShardId> shardIds; + for (const auto& valueElement : reply.response.getField("values").Array()) { + shardIds.emplace_back(valueElement.String()); + } + + return shardIds; +} + +SemiFuture<BatchedCommandResponse> noOpStatement() { + BatchedCommandResponse noOpResponse; + noOpResponse.setStatus(Status::OK()); + noOpResponse.setN(0); + return SemiFuture<BatchedCommandResponse>(std::move(noOpResponse)); +} + +SemiFuture<BatchedCommandResponse> deleteShardedCollectionStatement( + const txn_api::TransactionClient& txnClient, + const NamespaceString& nss, + const boost::optional<UUID>& uuid, + int stmtId) { + + if (uuid) { + const auto deleteCollectionQuery = BSON( + CollectionType::kNssFieldName << nss.ns() << CollectionType::kUuidFieldName << *uuid); + + write_ops::DeleteCommandRequest deleteOp(CollectionType::ConfigNS); + deleteOp.setDeletes({[&]() { + write_ops::DeleteOpEntry entry; + entry.setMulti(false); + entry.setQ(deleteCollectionQuery); + return entry; + }()}); + + return txnClient.runCRUDOp(deleteOp, {stmtId}); + } else { + return noOpStatement(); + } +} + +SemiFuture<BatchedCommandResponse> renameShardedCollectionStatement( + const txn_api::TransactionClient& txnClient, + const CollectionType& oldCollection, + const NamespaceString& newNss, + const Timestamp& timeInsertion, + int stmtId) { + + auto newCollectionType = oldCollection; + newCollectionType.setNss(newNss); + newCollectionType.setTimestamp(timeInsertion); + newCollectionType.setEpoch(OID::gen()); + + auto query = BSON(CollectionType::kNssFieldName << newNss.ns()); + write_ops::InsertCommandRequest insertOp(CollectionType::ConfigNS, + {newCollectionType.toBSON()}); + + return txnClient.runCRUDOp(insertOp, {stmtId} /*stmtIds*/); +} + +SemiFuture<BatchedCommandResponse> insertToPlacementHistoryStatement( + const txn_api::TransactionClient& txnClient, + const NamespaceString& nss, + const boost::optional<UUID>& uuid, + const Timestamp& clusterTime, + const std::vector<ShardId>& shards, + int stmtId, + const BatchedCommandResponse& previousOperationResult) { + + // Skip the insertion of the placement entry if the previous statement didn't change any + // document - we can deduce that the whole transaction was already committed in a previous + // attempt. + if (previousOperationResult.getN() == 0) { + return noOpStatement(); + } + + NamespacePlacementType placementInfo(NamespaceString(nss), clusterTime, shards); + if (uuid) + placementInfo.setUuid(*uuid); + write_ops::InsertCommandRequest insertPlacementEntry( + NamespaceString::kConfigsvrPlacementHistoryNamespace, {placementInfo.toBSON()}); + + return txnClient.runCRUDOp(insertPlacementEntry, {stmtId} /*stmtIds*/); +} + + +SemiFuture<BatchedCommandResponse> updateZonesStatement(const txn_api::TransactionClient& txnClient, + const NamespaceString& oldNss, + const NamespaceString& newNss) { + + const auto query = BSON(TagsType::ns(oldNss.ns().toString())); + const auto update = BSON("$set" << BSON(TagsType::ns(newNss.ns().toString()))); + + BatchedCommandRequest request([&] { + write_ops::UpdateCommandRequest updateOp(TagsType::ConfigNS); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(query); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); + entry.setUpsert(false); + entry.setMulti(true); + return entry; + }()}); + return updateOp; + }()); + return txnClient.runCRUDOp(request, {-1} /*stmtIds*/); +} + +SemiFuture<BatchedCommandResponse> deleteZonesStatement(const txn_api::TransactionClient& txnClient, + const NamespaceString& nss) { + + const auto query = BSON(TagsType::ns(nss.ns().toString())); + const auto hint = BSON(TagsType::ns() << 1 << TagsType::min() << 1); + + BatchedCommandRequest request([&] { + write_ops::DeleteCommandRequest deleteOp(TagsType::ConfigNS); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(query); + entry.setMulti(true); + entry.setHint(hint); + return entry; + }()}); + return deleteOp; + }()); + + return txnClient.runCRUDOp(request, {-1}); +} + +SemiFuture<BatchedCommandResponse> deleteShardingIndexCatalogMetadataStatement( + const txn_api::TransactionClient& txnClient, const boost::optional<UUID>& uuid) { + if (uuid) { + // delete index catalog metadata + BatchedCommandRequest request([&] { + write_ops::DeleteCommandRequest deleteOp( + NamespaceString::kConfigsvrIndexCatalogNamespace); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(BSON(IndexCatalogType::kCollectionUUIDFieldName << *uuid)); + entry.setMulti(true); + return entry; + }()}); + return deleteOp; + }()); + + return txnClient.runCRUDOp(request, {-1}); + } else { + return noOpStatement(); + } +} + + +void renameCollectionMetadataInTransaction(OperationContext* opCtx, + const boost::optional<CollectionType>& optFromCollType, + const NamespaceString& toNss, + const boost::optional<UUID>& droppedTargetUUID, + const WriteConcernOptions& writeConcern, + const std::shared_ptr<executor::TaskExecutor>& executor, + const OperationSessionInfo& osi) { + if (optFromCollType) { + // Case sharded FROM collection + auto fromNss = optFromCollType->getNss(); + auto fromUUID = optFromCollType->getUuid(); + + // Every statement in the transaction runs under the same clusterTime. To ensure in the + // placementHistory the drop of the target will appear earlier then the insert of the target + // we forcely add a tick to have 2 valid timestamp that we can use to differentiate the 2 + // operations. + auto now = VectorClock::get(opCtx)->getTime(); + auto nowClusterTime = now.clusterTime(); + auto timeDrop = nowClusterTime.asTimestamp(); + + nowClusterTime.addTicks(1); + auto timeInsert = nowClusterTime.asTimestamp(); + + // Retrieve the latest placement information about "FROM". + auto fromNssShards = getLatestCollectionPlacementInfoFor(opCtx, fromNss, fromUUID); + + auto transactionChain = [&](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + // Remove config.collection entry. Query by 'ns' AND 'uuid' so that the remove can be + // resolved with an IXSCAN (thanks to the index on '_id') and is idempotent (thanks to + // the 'uuid') delete TO collection if exists. + return deleteShardedCollectionStatement(txnClient, toNss, droppedTargetUUID, 1) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& deleteCollResponse) { + uassertStatusOK(deleteCollResponse.toStatus()); + + return insertToPlacementHistoryStatement( + txnClient, toNss, droppedTargetUUID, timeDrop, {}, 2, deleteCollResponse); + }) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& response) { + uassertStatusOK(response.toStatus()); + + return deleteShardingIndexCatalogMetadataStatement(txnClient, + droppedTargetUUID); + }) + // Delete "FROM" collection + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& response) { + uassertStatusOK(response.toStatus()); + return deleteShardedCollectionStatement(txnClient, fromNss, fromUUID, 3); + }) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& deleteCollResponse) { + uassertStatusOK(deleteCollResponse.toStatus()); + + return insertToPlacementHistoryStatement( + txnClient, fromNss, fromUUID, timeDrop, {}, 4, deleteCollResponse); + }) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& deleteCollResponse) { + uassertStatusOK(deleteCollResponse.toStatus()); + // Use the modified entries to insert collection and placement entries for "TO". + return renameShardedCollectionStatement( + txnClient, *optFromCollType, toNss, timeInsert, 5); + }) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& upsertCollResponse) { + uassertStatusOK(upsertCollResponse.toStatus()); + + return insertToPlacementHistoryStatement(txnClient, + toNss, + fromUUID, + timeInsert, + fromNssShards, + 6, + upsertCollResponse); + }) + // update tags and check it was sucessful + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& insertCollResponse) { + uassertStatusOK(insertCollResponse.toStatus()); + + return updateZonesStatement(txnClient, fromNss, toNss); + }) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& response) { + uassertStatusOK(response.toStatus()); + }) + .semi(); + }; + const bool useClusterTransaction = true; + sharding_ddl_util::runTransactionOnShardingCatalog( + opCtx, std::move(transactionChain), writeConcern, osi, useClusterTransaction, executor); + } else { + // Case unsharded FROM collection : just delete the target collection if sharded + auto now = VectorClock::get(opCtx)->getTime(); + auto newTimestamp = now.clusterTime().asTimestamp(); + + auto transactionChain = [&](const txn_api::TransactionClient& txnClient, + ExecutorPtr txnExec) { + return deleteShardedCollectionStatement(txnClient, toNss, droppedTargetUUID, 1) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& deleteCollResponse) { + uassertStatusOK(deleteCollResponse.toStatus()); + return insertToPlacementHistoryStatement(txnClient, + toNss, + droppedTargetUUID, + newTimestamp, + {}, + 2, + deleteCollResponse); + }) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& response) { + uassertStatusOK(response.toStatus()); + + return deleteShardingIndexCatalogMetadataStatement(txnClient, + droppedTargetUUID); + }) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& response) { + uassertStatusOK(response.toStatus()); + + return deleteZonesStatement(txnClient, toNss); + }) + .thenRunOn(txnExec) + .then([&](const BatchedCommandResponse& response) { + uassertStatusOK(response.toStatus()); + }) + .semi(); + }; + + const bool useClusterTransaction = true; + sharding_ddl_util::runTransactionOnShardingCatalog( + opCtx, std::move(transactionChain), writeConcern, osi, useClusterTransaction, executor); + } +} } // namespace RenameCollectionCoordinator::RenameCollectionCoordinator(ShardingDDLCoordinatorService* service, @@ -363,8 +676,8 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( const OperationSessionInfo osi = getCurrentSession(); // On participant shards: - // - Block CRUD on source and target collection in case at least one - // of such collections is currently sharded. + // - Block CRUD on source and target collection in case at least one of such + // collections is currently sharded // - Locally drop the target collection // - Locally rename source to target ShardsvrRenameCollectionParticipant renameCollParticipantRequest( @@ -376,14 +689,12 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( renameCollParticipantRequest.toBSON({})) .addFields(osi.toBSON()); - // We need to send the command to all the shards because both - // movePrimary and moveChunk leave garbage behind for sharded - // collections. - // At the same time, the primary shard needs to be last participant to perfom its - // local rename operation: this will ensure that the op entries generated by the - // collections being renamed/dropped will be generated at points in time where all - // shards have a consistent view of the metadata and no concurrent writes are being - // performed. + // We need to send the command to all the shards because both movePrimary and + // moveChunk leave garbage behind for sharded collections. At the same time, the + // primary shard needs to be last participant to perfom its local rename operation: + // this will ensure that the op entries generated by the collections being + // renamed/dropped will be generated at points in time where all shards have a + // consistent view of the metadata and no concurrent writes are being performed. const auto primaryShardId = ShardingState::get(opCtx)->shardId(); auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); participants.erase( @@ -420,35 +731,20 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( opCtx, getCurrentSession(), **executor); } - if (!_isPre63Compatible() && - (_doc.getTargetIsSharded() || _doc.getOptShardedCollInfo())) { + if ((_doc.getTargetIsSharded() || _doc.getOptShardedCollInfo())) { renameIndexMetadataInShards( opCtx, nss(), _request, getCurrentSession(), **executor, &_doc); } - ConfigsvrRenameCollectionMetadata req(nss(), _request.getTo()); - req.setOptFromCollection(_doc.getOptShardedCollInfo()); - const auto cmdObj = CommandHelpers::appendMajorityWriteConcern(req.toBSON({})); - const auto& configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - - uassertStatusOK(Shard::CommandResponse::getEffectiveStatus( - configShard->runCommand(opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - "admin", - cmdObj.addFields(getCurrentSession().toBSON()), - Shard::RetryPolicy::kIdempotent))); - - // (SERVER-67730) Delete potential orphaned chunk entries from CSRS since - // ConfigsvrRenameCollectionMetadata is not idempotent in case of a CSRS step-down - auto uuid = _doc.getTargetUUID(); - if (uuid) { - auto query = BSON("uuid" << *uuid); - uassertStatusOK(Grid::get(opCtx)->catalogClient()->removeConfigDocuments( - opCtx, - ChunkType::ConfigNS, - query, - ShardingCatalogClient::kMajorityWriteConcern)); - } + _updateSession(opCtx); + + renameCollectionMetadataInTransaction(opCtx, + _doc.getOptShardedCollInfo(), + _request.getTo(), + _doc.getTargetUUID(), + ShardingCatalogClient::kMajorityWriteConcern, + **executor, + getCurrentSession()); })) .then(_buildPhaseHandler( Phase::kUnblockCRUD, @@ -479,6 +775,19 @@ ExecutorFuture<void> RenameCollectionCoordinator::_runImpl( sharding_ddl_util::sendAuthenticatedCommandToShards( opCtx, fromNss.db(), cmdObj.addFields(osi.toBSON()), participants, **executor); + + // Delete chunks belonging to the previous incarnation of the target collection. + // This is performed after releasing the critical section in order to reduce stalls + // and performed outside of a transaction to prevent timeout. + auto targetUUID = _doc.getTargetUUID(); + if (targetUUID) { + auto query = BSON("uuid" << *targetUUID); + uassertStatusOK(Grid::get(opCtx)->catalogClient()->removeConfigDocuments( + opCtx, + ChunkType::ConfigNS, + query, + ShardingCatalogClient::kMajorityWriteConcern)); + } })) .then(_buildPhaseHandler(Phase::kSetResponse, [this, anchor = shared_from_this()] { auto opCtxHolder = cc().makeOperationContext(); diff --git a/src/mongo/db/s/rename_collection_coordinator.h b/src/mongo/db/s/rename_collection_coordinator.h index ee67700a987..32621bb6ea4 100644 --- a/src/mongo/db/s/rename_collection_coordinator.h +++ b/src/mongo/db/s/rename_collection_coordinator.h @@ -67,11 +67,6 @@ private: return _doc.getPhase() >= Phase::kFreezeMigrations; }; - // TODO SERVER-72796: Remove once gGlobalIndexesShardingCatalog is enabled. - bool _isPre63Compatible() const { - return operationType() == DDLCoordinatorTypeEnum::kRenameCollectionPre63Compatible; - } - ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; diff --git a/src/mongo/db/s/sharding_ddl_coordinator.idl b/src/mongo/db/s/sharding_ddl_coordinator.idl index a73be4a8de1..2eeace9f7ad 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.idl +++ b/src/mongo/db/s/sharding_ddl_coordinator.idl @@ -52,8 +52,6 @@ enums: # TODO SERVER-73627: Remove once 7.0 becomes last LTS. kDropCollectionPre70Compatible: "dropCollection" kRenameCollection: "renameCollection_V2" - # TODO SERVER-72796: Remove once gGlobalIndexesShardingCatalog is enabled. - kRenameCollectionPre63Compatible: "renameCollection" kCreateCollection: "createCollection_V3" # TODO SERVER-68008: Remove once 7.0 becomes last LTS kCreateCollectionPre61Compatible: "createCollection_V2" diff --git a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp index f60fcfa2140..df15f650727 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator_service.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator_service.cpp @@ -75,8 +75,6 @@ std::shared_ptr<ShardingDDLCoordinator> constructShardingDDLCoordinatorInstance( return std::make_shared<DropCollectionCoordinator>(service, std::move(initialState)); break; case DDLCoordinatorTypeEnum::kRenameCollection: - // TODO SERVER-72796: Remove once gGlobalIndexesShardingCatalog is enabled. - case DDLCoordinatorTypeEnum::kRenameCollectionPre63Compatible: return std::make_shared<RenameCollectionCoordinator>(service, std::move(initialState)); case DDLCoordinatorTypeEnum::kCreateCollection: // TODO SERVER-68008 Remove the Pre61Compatible case once 7.0 becomes last LTS diff --git a/src/mongo/db/s/shardsvr_rename_collection_command.cpp b/src/mongo/db/s/shardsvr_rename_collection_command.cpp index 8422f94167d..b00299336c0 100644 --- a/src/mongo/db/s/shardsvr_rename_collection_command.cpp +++ b/src/mongo/db/s/shardsvr_rename_collection_command.cpp @@ -96,12 +96,8 @@ public: FixedFCVRegion fixedFcvRegion{opCtx}; auto coordinatorDoc = RenameCollectionCoordinatorDocument(); coordinatorDoc.setRenameCollectionRequest(req.getRenameCollectionRequest()); - // TODO SERVER-72796: Remove once gGlobalIndexesShardingCatalog is enabled. coordinatorDoc.setShardingDDLCoordinatorMetadata( - {{fromNss, - feature_flags::gGlobalIndexesShardingCatalog.isEnabled(*fixedFcvRegion) - ? DDLCoordinatorTypeEnum::kRenameCollection - : DDLCoordinatorTypeEnum::kRenameCollectionPre63Compatible}}); + {{fromNss, DDLCoordinatorTypeEnum::kRenameCollection}}); coordinatorDoc.setAllowEncryptedCollectionRename( req.getAllowEncryptedCollectionRename().value_or(false)); auto service = ShardingDDLCoordinatorService::getService(opCtx); |