diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2023-02-06 14:46:27 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-06 15:59:26 +0000 |
commit | e8381aca1ecd8ee3ba30cbcf410f0ed05a41b4e5 (patch) | |
tree | fc7426106c60e55f0d14b2afa0b994b7234c70a2 | |
parent | 836929386580b13a569a3d7d1e74125855c67584 (diff) | |
download | mongo-e8381aca1ecd8ee3ba30cbcf410f0ed05a41b4e5.tar.gz |
SERVER-73513 Add fallback logic to commit placement metadata for moveChunk and renameCollection
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp | 137 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_ddl_util.cpp | 39 |
2 files changed, 115 insertions, 61 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 004bf8dd9ab..50d216212e1 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -443,6 +443,26 @@ void mergeAllChunksOnShardInTransaction(OperationContext* opCtx, txn.run(opCtx, updateChunksFn); } +/* + * Creates the request to persist a namespace descriptor object into config.placementHistory. + */ +write_ops::UpdateCommandRequest composePlacementUpsertRequest( + const NamespacePlacementType& placementInfo) { + write_ops::UpdateCommandRequest upsertRequest( + NamespaceString::kConfigsvrPlacementHistoryNamespace); + write_ops::UpdateOpEntry upsertEntry; + upsertEntry.setQ(BSON(NamespacePlacementType::kNssFieldName + << placementInfo.getNss().ns() + << NamespacePlacementType::kTimestampFieldName + << placementInfo.getTimestamp())); + upsertEntry.setU(write_ops::UpdateModification::parseFromClassicUpdate(placementInfo.toBSON())); + upsertEntry.setMulti(false); + // Upsert to account for concurrent migrations. + upsertEntry.setUpsert(true); + upsertRequest.setUpdates({std::move(upsertEntry)}); + return upsertRequest; +}; + } // namespace void ShardingCatalogManager::bumpMajorVersionOneChunkPerShard( @@ -2263,6 +2283,7 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction( }(); auto transactionChain = [nss, + collUuid = migratedChunk->getCollectionUUID(), donorShardId, recipientShardId = migratedChunk->getShard(), migrationCommitTime = @@ -2290,10 +2311,57 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction( return std::move(updateConfigChunksFuture).semi(); } - // Add the needed transaction statements to also upsert the new placement information. + // The main method to store placement info as part of the transaction, given a valid + // descriptor. + auto persistPlacementInfoSubchain = [txnExec, + &txnClient](NamespacePlacementType&& placementInfo) { + auto upsertRequest = composePlacementUpsertRequest(placementInfo); + return txnClient.runCRUDOp(upsertRequest, {}) + .thenRunOn(txnExec) + .then([](const BatchedCommandResponse& insertPlacementEntryResponse) { + uassertStatusOK(insertPlacementEntryResponse.toStatus()); + }) + .semi(); + }; + + // Obtain a valid placement descriptor from config.chunks and then store it as part of the + // transaction. + auto generateAndPersistPlacementInfoSubchain = + [txnExec, &txnClient](const NamespaceString& nss, + const UUID& collUuid, + const Timestamp& migrationCommitTime) { + // Compose the query - equivalent to + // 'configDb.chunks.distinct("shard", {uuid:collectionUuid})' + DistinctCommandRequest distinctRequest(ChunkType::ConfigNS); + distinctRequest.setKey(ChunkType::shard.name()); + distinctRequest.setQuery(BSON(ChunkType::collectionUUID.name() << collUuid)); + return txnClient.runCommand(NamespaceString::kConfigDb, distinctRequest.toBSON({})) + .thenRunOn(txnExec) + .then([=, &txnClient](BSONObj reply) { + uassertStatusOK(getStatusFromWriteCommandReply(reply)); + std::vector<ShardId> shardIds; + for (const auto& valueElement : reply.getField("values").Array()) { + shardIds.emplace_back(valueElement.String()); + } + + NamespacePlacementType placementInfo( + nss, migrationCommitTime, std::move(shardIds)); + placementInfo.setUuid(collUuid); + auto request = composePlacementUpsertRequest(placementInfo); + return txnClient.runCRUDOp(request, {}); + }) + .thenRunOn(txnExec) + .then([](const BatchedCommandResponse& insertPlacementEntryResponse) { + uassertStatusOK(insertPlacementEntryResponse.toStatus()); + }) + .semi(); + }; + + // Extend the transaction to also upsert the placement information that matches the + // migration commit. return std::move(updateConfigChunksFuture) .thenRunOn(txnExec) - .then([&txnClient, &nss, &migrationCommitTime] { + .then([&] { // Retrieve the previous placement entry - it will be used as a base for the next // update. FindCommandRequest placementInfoQuery{ @@ -2307,36 +2375,21 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction( return txnClient.exhaustiveFind(placementInfoQuery); }) .thenRunOn(txnExec) - .then([&nss, - &recipientShardId, - &migrationCommitTime, - &donorShardId, - &txnClient, - &donorToBeRemoved, - &recipientToBeAdded](const std::vector<BSONObj>& queryResponse) { - /* - * TODO SERVER-72870 replace the block below with a - * tassert(queryResponse.size() == 1) - */ - if (queryResponse.size() != 1) { - // If the collection has been created under a legacy FCV, no previous placement - // info will be available. Skip this statement. - LOGV2_WARNING(6892801, - "Unable to create a new placement entry for the namespace to " - "match a chunk migration", - "namespace"_attr = nss); - - BatchedCommandResponse noOpResponse; - noOpResponse.setStatus(Status::OK()); - noOpResponse.setN(0); - - return SemiFuture<BatchedCommandResponse>(std::move(noOpResponse)); - } - + .then([&, + persistPlacementInfo = std::move(persistPlacementInfoSubchain), + generateAndPersistPlacementInfo = + std::move(generateAndPersistPlacementInfoSubchain)]( + const std::vector<BSONObj>& queryResponse) { tassert(6892800, - str::stream() << "Unable to retrieve historical placement data for " - << nss.toString(), - queryResponse.size() == 1); + str::stream() + << "Unexpected number of placement entries retrieved" << nss.toString(), + queryResponse.size() <= 1); + + if (queryResponse.size() == 0) { + // Historical placement data may not be available due to an FCV transition - + // invoke the more expensive fallback method. + return generateAndPersistPlacementInfo(nss, collUuid, migrationCommitTime); + } // Leverage the most recent placement info to build the new version. auto placementInfo = NamespacePlacementType::parse( @@ -2365,27 +2418,7 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction( }); placementInfo.setShards(std::move(updatedShardList)); - // Persist the latest placement info (the update statement is needed to keep a - // single history record for concurrent migrations that get committed at the same - // "validAfter" time). - write_ops::UpdateCommandRequest upsertRequest( - NamespaceString::kConfigsvrPlacementHistoryNamespace); - write_ops::UpdateOpEntry upsertEntry; - upsertEntry.setQ(BSON(NamespacePlacementType::kNssFieldName - << nss.toString() - << NamespacePlacementType::kTimestampFieldName - << migrationCommitTime)); - upsertEntry.setU( - write_ops::UpdateModification::parseFromClassicUpdate(placementInfo.toBSON())); - upsertEntry.setMulti(false); - upsertEntry.setUpsert(true); - upsertRequest.setUpdates({std::move(upsertEntry)}); - - return txnClient.runCRUDOp(upsertRequest, {}); - }) - .thenRunOn(txnExec) - .then([](const BatchedCommandResponse& insertPlacementEntryResponse) { - uassertStatusOK(insertPlacementEntryResponse.toStatus()); + return persistPlacementInfo(std::move(placementInfo)); }) .semi(); }; diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp index 3eee51a20a2..f2b7480812a 100644 --- a/src/mongo/db/s/sharding_ddl_util.cpp +++ b/src/mongo/db/s/sharding_ddl_util.cpp @@ -37,6 +37,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/query/distinct_command_gen.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/remove_tags_gen.h" @@ -562,14 +563,15 @@ void shardedRenameMetadata(OperationContext* opCtx, // Update "FROM" tags to "TO". updateTags(opCtx, configShard, fromNss, toNss, writeConcern); - // Retrieve the most recent placement information about "FROM", excluding the entry that - // matches its deletion (an empty 'shards' field). auto renamedCollPlacementInfo = [&]() -> boost::optional<NamespacePlacementType> { + // TODO SERVER-72870 replace feature flag check if (!feature_flags::gHistoricalPlacementShardingCatalog.isEnabled( serverGlobalParams.featureCompatibility)) { return boost::none; } + // Retrieve the latest placement document about "FROM" prior to its deletion (which will + // have left an entry with an empty set of shards). auto query = BSON(NamespacePlacementType::kNssFieldName << fromNss.ns() << NamespacePlacementType::kShardsFieldName << BSON("$ne" << BSONArray())); @@ -585,17 +587,36 @@ void shardedRenameMetadata(OperationContext* opCtx, 1 /*limit*/)) .docs; - /* - * TODO SERVER-72870 Replace the if block below with - * uassert(RetriableErrorCode,queryResponse.size() == 1) - */ if (queryResponse.empty()) { - // If the "FROM" collection was created under a legacy FCV, no previous placement - // info will be available. Skip also the insertion of the updated one. + // Persisted placement information may be unavailable as a consequence of FCV + // transitions. Use the content of config.chunks as a fallback. LOGV2_WARNING(7068200, "Unable to retrieve placement entry for the namespace being renamed", "fromNss"_attr = fromNss); - return boost::none; + + DistinctCommandRequest distinctRequest(ChunkType::ConfigNS); + distinctRequest.setKey(ChunkType::shard.name()); + distinctRequest.setQuery(BSON(ChunkType::collectionUUID.name() << fromUUID)); + + auto reply = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet{}), + NamespaceString::kConfigDb.toString(), + distinctRequest.toBSON({}), + Shard::RetryPolicy::kIdempotent)); + + uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(reply)); + std::vector<ShardId> shardIds; + for (const auto& valueElement : reply.response.getField("values").Array()) { + shardIds.emplace_back(valueElement.String()); + } + + // Compose a placement info object based on the retrieved information; the timestamp + // field may be disregarded, since it will be overwritten by the caller before being + // consumed. + NamespacePlacementType placementInfo(fromNss, Timestamp(), std::move(shardIds)); + placementInfo.setUuid(fromUUID); + return placementInfo; } return NamespacePlacementType::parse(IDLParserContext("shardedRenameMetadata"), |