summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2023-02-06 14:46:27 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-06 15:59:26 +0000
commite8381aca1ecd8ee3ba30cbcf410f0ed05a41b4e5 (patch)
treefc7426106c60e55f0d14b2afa0b994b7234c70a2
parent836929386580b13a569a3d7d1e74125855c67584 (diff)
downloadmongo-e8381aca1ecd8ee3ba30cbcf410f0ed05a41b4e5.tar.gz
SERVER-73513 Add fallback logic to commit placement metadata for moveChunk and renameCollection
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp137
-rw-r--r--src/mongo/db/s/sharding_ddl_util.cpp39
2 files changed, 115 insertions, 61 deletions
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 004bf8dd9ab..50d216212e1 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -443,6 +443,26 @@ void mergeAllChunksOnShardInTransaction(OperationContext* opCtx,
txn.run(opCtx, updateChunksFn);
}
+/*
+ * Creates the request to persist a namespace descriptor object into config.placementHistory.
+ */
+write_ops::UpdateCommandRequest composePlacementUpsertRequest(
+ const NamespacePlacementType& placementInfo) {
+ write_ops::UpdateCommandRequest upsertRequest(
+ NamespaceString::kConfigsvrPlacementHistoryNamespace);
+ write_ops::UpdateOpEntry upsertEntry;
+ upsertEntry.setQ(BSON(NamespacePlacementType::kNssFieldName
+ << placementInfo.getNss().ns()
+ << NamespacePlacementType::kTimestampFieldName
+ << placementInfo.getTimestamp()));
+ upsertEntry.setU(write_ops::UpdateModification::parseFromClassicUpdate(placementInfo.toBSON()));
+ upsertEntry.setMulti(false);
+ // Upsert to account for concurrent migrations.
+ upsertEntry.setUpsert(true);
+ upsertRequest.setUpdates({std::move(upsertEntry)});
+ return upsertRequest;
+};
+
} // namespace
void ShardingCatalogManager::bumpMajorVersionOneChunkPerShard(
@@ -2263,6 +2283,7 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction(
}();
auto transactionChain = [nss,
+ collUuid = migratedChunk->getCollectionUUID(),
donorShardId,
recipientShardId = migratedChunk->getShard(),
migrationCommitTime =
@@ -2290,10 +2311,57 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction(
return std::move(updateConfigChunksFuture).semi();
}
- // Add the needed transaction statements to also upsert the new placement information.
+ // The main method to store placement info as part of the transaction, given a valid
+ // descriptor.
+ auto persistPlacementInfoSubchain = [txnExec,
+ &txnClient](NamespacePlacementType&& placementInfo) {
+ auto upsertRequest = composePlacementUpsertRequest(placementInfo);
+ return txnClient.runCRUDOp(upsertRequest, {})
+ .thenRunOn(txnExec)
+ .then([](const BatchedCommandResponse& insertPlacementEntryResponse) {
+ uassertStatusOK(insertPlacementEntryResponse.toStatus());
+ })
+ .semi();
+ };
+
+ // Obtain a valid placement descriptor from config.chunks and then store it as part of the
+ // transaction.
+ auto generateAndPersistPlacementInfoSubchain =
+ [txnExec, &txnClient](const NamespaceString& nss,
+ const UUID& collUuid,
+ const Timestamp& migrationCommitTime) {
+ // Compose the query - equivalent to
+ // 'configDb.chunks.distinct("shard", {uuid:collectionUuid})'
+ DistinctCommandRequest distinctRequest(ChunkType::ConfigNS);
+ distinctRequest.setKey(ChunkType::shard.name());
+ distinctRequest.setQuery(BSON(ChunkType::collectionUUID.name() << collUuid));
+ return txnClient.runCommand(NamespaceString::kConfigDb, distinctRequest.toBSON({}))
+ .thenRunOn(txnExec)
+ .then([=, &txnClient](BSONObj reply) {
+ uassertStatusOK(getStatusFromWriteCommandReply(reply));
+ std::vector<ShardId> shardIds;
+ for (const auto& valueElement : reply.getField("values").Array()) {
+ shardIds.emplace_back(valueElement.String());
+ }
+
+ NamespacePlacementType placementInfo(
+ nss, migrationCommitTime, std::move(shardIds));
+ placementInfo.setUuid(collUuid);
+ auto request = composePlacementUpsertRequest(placementInfo);
+ return txnClient.runCRUDOp(request, {});
+ })
+ .thenRunOn(txnExec)
+ .then([](const BatchedCommandResponse& insertPlacementEntryResponse) {
+ uassertStatusOK(insertPlacementEntryResponse.toStatus());
+ })
+ .semi();
+ };
+
+ // Extend the transaction to also upsert the placement information that matches the
+ // migration commit.
return std::move(updateConfigChunksFuture)
.thenRunOn(txnExec)
- .then([&txnClient, &nss, &migrationCommitTime] {
+ .then([&] {
// Retrieve the previous placement entry - it will be used as a base for the next
// update.
FindCommandRequest placementInfoQuery{
@@ -2307,36 +2375,21 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction(
return txnClient.exhaustiveFind(placementInfoQuery);
})
.thenRunOn(txnExec)
- .then([&nss,
- &recipientShardId,
- &migrationCommitTime,
- &donorShardId,
- &txnClient,
- &donorToBeRemoved,
- &recipientToBeAdded](const std::vector<BSONObj>& queryResponse) {
- /*
- * TODO SERVER-72870 replace the block below with a
- * tassert(queryResponse.size() == 1)
- */
- if (queryResponse.size() != 1) {
- // If the collection has been created under a legacy FCV, no previous placement
- // info will be available. Skip this statement.
- LOGV2_WARNING(6892801,
- "Unable to create a new placement entry for the namespace to "
- "match a chunk migration",
- "namespace"_attr = nss);
-
- BatchedCommandResponse noOpResponse;
- noOpResponse.setStatus(Status::OK());
- noOpResponse.setN(0);
-
- return SemiFuture<BatchedCommandResponse>(std::move(noOpResponse));
- }
-
+ .then([&,
+ persistPlacementInfo = std::move(persistPlacementInfoSubchain),
+ generateAndPersistPlacementInfo =
+ std::move(generateAndPersistPlacementInfoSubchain)](
+ const std::vector<BSONObj>& queryResponse) {
tassert(6892800,
- str::stream() << "Unable to retrieve historical placement data for "
- << nss.toString(),
- queryResponse.size() == 1);
+ str::stream()
+ << "Unexpected number of placement entries retrieved" << nss.toString(),
+ queryResponse.size() <= 1);
+
+ if (queryResponse.size() == 0) {
+ // Historical placement data may not be available due to an FCV transition -
+ // invoke the more expensive fallback method.
+ return generateAndPersistPlacementInfo(nss, collUuid, migrationCommitTime);
+ }
// Leverage the most recent placement info to build the new version.
auto placementInfo = NamespacePlacementType::parse(
@@ -2365,27 +2418,7 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction(
});
placementInfo.setShards(std::move(updatedShardList));
- // Persist the latest placement info (the update statement is needed to keep a
- // single history record for concurrent migrations that get committed at the same
- // "validAfter" time).
- write_ops::UpdateCommandRequest upsertRequest(
- NamespaceString::kConfigsvrPlacementHistoryNamespace);
- write_ops::UpdateOpEntry upsertEntry;
- upsertEntry.setQ(BSON(NamespacePlacementType::kNssFieldName
- << nss.toString()
- << NamespacePlacementType::kTimestampFieldName
- << migrationCommitTime));
- upsertEntry.setU(
- write_ops::UpdateModification::parseFromClassicUpdate(placementInfo.toBSON()));
- upsertEntry.setMulti(false);
- upsertEntry.setUpsert(true);
- upsertRequest.setUpdates({std::move(upsertEntry)});
-
- return txnClient.runCRUDOp(upsertRequest, {});
- })
- .thenRunOn(txnExec)
- .then([](const BatchedCommandResponse& insertPlacementEntryResponse) {
- uassertStatusOK(insertPlacementEntryResponse.toStatus());
+ return persistPlacementInfo(std::move(placementInfo));
})
.semi();
};
diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp
index 3eee51a20a2..f2b7480812a 100644
--- a/src/mongo/db/s/sharding_ddl_util.cpp
+++ b/src/mongo/db/s/sharding_ddl_util.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/query/distinct_command_gen.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/remove_tags_gen.h"
@@ -562,14 +563,15 @@ void shardedRenameMetadata(OperationContext* opCtx,
// Update "FROM" tags to "TO".
updateTags(opCtx, configShard, fromNss, toNss, writeConcern);
- // Retrieve the most recent placement information about "FROM", excluding the entry that
- // matches its deletion (an empty 'shards' field).
auto renamedCollPlacementInfo = [&]() -> boost::optional<NamespacePlacementType> {
+ // TODO SERVER-72870 replace feature flag check
if (!feature_flags::gHistoricalPlacementShardingCatalog.isEnabled(
serverGlobalParams.featureCompatibility)) {
return boost::none;
}
+ // Retrieve the latest placement document about "FROM" prior to its deletion (which will
+ // have left an entry with an empty set of shards).
auto query = BSON(NamespacePlacementType::kNssFieldName
<< fromNss.ns() << NamespacePlacementType::kShardsFieldName
<< BSON("$ne" << BSONArray()));
@@ -585,17 +587,36 @@ void shardedRenameMetadata(OperationContext* opCtx,
1 /*limit*/))
.docs;
- /*
- * TODO SERVER-72870 Replace the if block below with
- * uassert(RetriableErrorCode,queryResponse.size() == 1)
- */
if (queryResponse.empty()) {
- // If the "FROM" collection was created under a legacy FCV, no previous placement
- // info will be available. Skip also the insertion of the updated one.
+ // Persisted placement information may be unavailable as a consequence of FCV
+ // transitions. Use the content of config.chunks as a fallback.
LOGV2_WARNING(7068200,
"Unable to retrieve placement entry for the namespace being renamed",
"fromNss"_attr = fromNss);
- return boost::none;
+
+ DistinctCommandRequest distinctRequest(ChunkType::ConfigNS);
+ distinctRequest.setKey(ChunkType::shard.name());
+ distinctRequest.setQuery(BSON(ChunkType::collectionUUID.name() << fromUUID));
+
+ auto reply = uassertStatusOK(configShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet{}),
+ NamespaceString::kConfigDb.toString(),
+ distinctRequest.toBSON({}),
+ Shard::RetryPolicy::kIdempotent));
+
+ uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(reply));
+ std::vector<ShardId> shardIds;
+ for (const auto& valueElement : reply.response.getField("values").Array()) {
+ shardIds.emplace_back(valueElement.String());
+ }
+
+ // Compose a placement info object based on the retrieved information; the timestamp
+ // field may be disregarded, since it will be overwritten by the caller before being
+ // consumed.
+ NamespacePlacementType placementInfo(fromNss, Timestamp(), std::move(shardIds));
+ placementInfo.setUuid(fromUUID);
+ return placementInfo;
}
return NamespacePlacementType::parse(IDLParserContext("shardedRenameMetadata"),