diff options
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/collection_metadata.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_merge_chunks_command.cpp | 114 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager.h | 17 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp | 151 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 62 |
6 files changed, 337 insertions, 12 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 54fccaa8444..42283ace154 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -309,6 +309,7 @@ env.Library( 'config/configsvr_enable_sharding_command.cpp', 'config/configsvr_ensure_chunk_version_is_greater_than_command.cpp', 'config/configsvr_merge_chunk_command.cpp', + 'config/configsvr_merge_chunks_command.cpp', 'config/configsvr_move_chunk_command.cpp', 'config/configsvr_move_primary_command.cpp', 'config/configsvr_refine_collection_shard_key_command.cpp', diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h index 79f02080b19..74a71cd639a 100644 --- a/src/mongo/db/s/collection_metadata.h +++ b/src/mongo/db/s/collection_metadata.h @@ -159,6 +159,10 @@ public: return _cm->uuidMatches(uuid); } + boost::optional<UUID> getUUID() const { + return _cm->getUUID(); + } + /** * Returns just the shard key fields, if the collection is sharded, and the _id field, from * `doc`. Does not alter any field values (e.g. by hashing); values are copied verbatim. diff --git a/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp new file mode 100644 index 00000000000..73e4c127749 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/s/request_types/merge_chunks_request_type.h" + +namespace mongo { +namespace { + +/** + * Internal sharding command run on config servers to merge a set of chunks. + * + * Format: + * { + * _configsvrCommitChunksMerge: <string namespace>, + * collEpoch: <OID epoch>, + * lowerBound: <BSONObj minKey>, + * upperBound: <BSONObj maxKey>, + * shard: <string shard>, + * writeConcern: <BSONObj> + * } + */ +class ConfigSvrMergeChunksCommand : public BasicCommand { +public: + ConfigSvrMergeChunksCommand() : BasicCommand("_configsvrCommitChunksMerge") {} + + std::string help() const override { + return "Internal command, which is sent by a shard to the sharding config server. Do " + "not call directly. Receives, validates, and processes a MergeChunksRequest"; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return true; + } + + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) const override { + if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( + ResourcePattern::forClusterResource(), ActionType::internal)) { + return Status(ErrorCodes::Unauthorized, "Unauthorized"); + } + return Status::OK(); + } + + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + return CommandHelpers::parseNsFullyQualified(cmdObj); + } + + bool run(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + uassert(ErrorCodes::IllegalOperation, + "_configsvrCommitChunksMerge can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + + // Set the operation context read concern level to local for reads into the config database. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + auto parsedRequest = uassertStatusOK(MergeChunksRequest::parseFromConfigCommand(cmdObj)); + + const BSONObj shardAndCollVers = uassertStatusOK( + ShardingCatalogManager::get(opCtx)->commitChunksMerge(opCtx, + parsedRequest.getNamespace(), + parsedRequest.getCollectionUUID(), + parsedRequest.getChunkRange(), + parsedRequest.getShardId(), + parsedRequest.getValidAfter())); + result.appendElements(shardAndCollVers); + + return true; + } + +} configsvrMergeChunksCmd; +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 47e01831332..afe44f4cfd1 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -247,6 +247,23 @@ public: const boost::optional<Timestamp>& validAfter); /** + * Updates metadata in the config.chunks collection so the chunks within the specified key range + * are seen merged into a single larger chunk. + * If 'validAfter' is not set, this means the commit request came from an older server version, + * which is not history-aware. + * + * Returns a BSON object with the newly produced chunk versions after the migration: + * - shardVersion - The new shard version of the source shard + * - collectionVersion - The new collection version after the commit + */ + StatusWith<BSONObj> commitChunksMerge(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& requestCollectionUUID, + const ChunkRange& chunkRange, + const ShardId& shardId, + const boost::optional<Timestamp>& validAfter); + + /** * Updates metadata in config.chunks collection to show the given chunk in its new shard. * If 'validAfter' is not set, this means the commit request came from an older server version, * which is not history-aware. diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 97aaf4cb686..9d8cf39e48e 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -866,6 +866,157 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge( return getShardAndCollectionVersion(opCtx, coll, ShardId(shardName)); } +StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge( + OperationContext* opCtx, + const NamespaceString& nss, + const UUID& requestCollectionUUID, + const ChunkRange& chunkRange, + const ShardId& shardId, + const boost::optional<Timestamp>& validAfter) { + if (!validAfter) { + return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; + } + + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + + // 1. Retrieve the initial collection version info to build up the logging info. + auto swCollVersion = getCollectionVersion(opCtx, nss); + if (!swCollVersion.isOK()) { + return swCollVersion.getStatus().withContext(str::stream() + << "mergeChunk cannot merge chunks."); + } + + // 2. Retrieve the list of chunks belonging to the requested shard + key range; + // The query over config.collections is guaranteed to succeed, + // since it has been already issued & checked by getCollectionVersion() + auto findCollResponse = uassertStatusOK( + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + CollectionType::ConfigNS, + BSON(CollectionType::kNssFieldName << nss.ns()), + {}, + 1)); + const CollectionType coll(findCollResponse.docs[0]); + if (coll.getUuid() != requestCollectionUUID) { + return { + ErrorCodes::InvalidUUID, + str::stream() << "UUID of collection does not match UUID of request. Colletion UUID: " + << coll.getUuid() << ", request UUID: " << requestCollectionUUID}; + } + const auto shardChunksInRangeQuery = [&]() { + BSONObjBuilder queryBuilder; + if (coll.getTimestamp()) { + queryBuilder << ChunkType::collectionUUID << coll.getUuid(); + } else { + queryBuilder << ChunkType::ns(coll.getNss().ns()); + } + queryBuilder << ChunkType::shard(shardId.toString()); + queryBuilder << ChunkType::min(BSON("$gte" << chunkRange.getMin())); + queryBuilder << ChunkType::max(BSON("$lte" << chunkRange.getMax())); + return queryBuilder.obj(); + }(); + + const auto shardChunksInRangeResponse = uassertStatusOK( + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + shardChunksInRangeQuery, + BSON(ChunkType::min << 1), + boost::none)); + + // Check if the chunk(s) have already been merged. If so, return success. + if (shardChunksInRangeResponse.docs.size() == 1) { + auto chunk = + uassertStatusOK(ChunkType::fromConfigBSON(shardChunksInRangeResponse.docs.back())); + uassert( + ErrorCodes::IllegalOperation, + str::stream() << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + chunk.getRange() == chunkRange); + auto replyWithVersions = getShardAndCollectionVersion(opCtx, coll, shardId); + // Makes sure that the last thing we read in getCurrentChunk and + // getShardAndCollectionVersion gets majority written before to return from this command, + // otherwise next RoutingInfo cache refresh from the shard may not see those newest + // information. + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + return replyWithVersions; + } + + // 3. Prepare the data for the merge + // and ensure that the retrieved list of chunks covers the whole range. + std::vector<ChunkType> chunksToMerge; + for (const auto& chunkDoc : shardChunksInRangeResponse.docs) { + auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkDoc)); + if (chunksToMerge.empty()) { + uassert(ErrorCodes::IllegalOperation, + str::stream() + << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + chunk.getMin().woCompare(chunkRange.getMin()) == 0); + } else { + uassert(ErrorCodes::IllegalOperation, + str::stream() + << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + chunk.getMin().woCompare(chunksToMerge.back().getMax()) == 0); + } + chunksToMerge.push_back(std::move(chunk)); + } + uassert(ErrorCodes::IllegalOperation, + str::stream() << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + !chunksToMerge.empty() && + chunksToMerge.back().getMax().woCompare(chunkRange.getMax()) == 0); + + ChunkVersion initialVersion = swCollVersion.getValue(); + ChunkVersion mergeVersion = initialVersion; + mergeVersion.incMinor(); + + auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter); + auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, initialVersion); + + // 4. apply the batch of updates to local metadata + uassertStatusOK(Grid::get(opCtx)->catalogClient()->applyChunkOpsDeprecated( + opCtx, + updates, + preCond, + getNsOrUUIDForChunkTargeting(coll), + nss, + mergeVersion, + WriteConcernOptions(), + repl::ReadConcernLevel::kLocalReadConcern)); + + // 5. log changes + BSONObjBuilder logDetail; + { + BSONArrayBuilder b(logDetail.subarrayStart("merged")); + for (const auto& chunkToMerge : chunksToMerge) { + b.append(chunkToMerge.toConfigBSON()); + } + } + initialVersion.appendLegacyWithField(&logDetail, "prevShardVersion"); + mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion"); + logDetail.append("owningShard", shardId); + + ShardingLogging::get(opCtx)->logChange( + opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions()); + + return getShardAndCollectionVersion(opCtx, coll, shardId); +} + + StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index a48107135a5..815536ff78d 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -49,6 +49,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/merge_chunk_request_type.h" +#include "mongo/s/request_types/merge_chunks_request_type.h" #include "mongo/util/str.h" namespace mongo { @@ -74,12 +75,38 @@ bool checkMetadataForSuccess(OperationContext* opCtx, chunk.getMax().woCompare(chunkRange.getMax()) == 0; } -Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& minKey, - const BSONObj& maxKey, - const OID& epoch, - const CollectionMetadata& metadata) { +Shard::CommandResponse commitUsingChunkRange(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange, + const CollectionMetadata& metadata) { + auto const shardingState = ShardingState::get(opCtx); + const auto currentTime = VectorClock::get(opCtx)->getTime(); + auto collUUID = metadata.getUUID(); + invariant(collUUID); + MergeChunksRequest request{nss, + shardingState->shardId(), + *collUUID, + chunkRange, + currentTime.clusterTime().asTimestamp()}; + + auto configCmdObj = + request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + auto cmdResponse = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + configCmdObj, + Shard::RetryPolicy::kIdempotent)); + + return cmdResponse; +} + +Shard::CommandResponse commitUsingChunksList(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey, + const CollectionMetadata& metadata) { auto const shardingState = ShardingState::get(opCtx); // @@ -166,7 +193,7 @@ Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx, const auto currentTime = VectorClock::get(opCtx)->getTime(); MergeChunkRequest request{nss, shardingState->shardId().toString(), - epoch, + metadata.getShardVersion().epoch(), chunkBoundaries, currentTime.clusterTime().asTimestamp()}; @@ -183,6 +210,16 @@ Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx, return cmdResponse; } +Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange, + const CollectionMetadata& metadata) { + auto commandResponse = commitUsingChunkRange(opCtx, nss, chunkRange, metadata); + return commandResponse.commandStatus == ErrorCodes::CommandNotFound + ? commitUsingChunksList(opCtx, nss, chunkRange.getMin(), chunkRange.getMax(), metadata) + : commandResponse; +} + void mergeChunks(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey, @@ -223,15 +260,16 @@ void mergeChunks(OperationContext* opCtx, << ", current epoch: " << epoch << ")", expectedEpoch == epoch); + ChunkRange chunkRange(minKey, maxKey); + uassert(ErrorCodes::IllegalOperation, - str::stream() << "could not merge chunks, the range " - << redact(ChunkRange(minKey, maxKey).toString()) << " is not valid" + str::stream() << "could not merge chunks, the range " << redact(chunkRange.toString()) + << " is not valid" << " for collection " << nss.ns() << " with key pattern " << metadataBeforeMerge->getKeyPattern().toString(), metadataBeforeMerge->isValidKey(minKey) && metadataBeforeMerge->isValidKey(maxKey)); - auto cmdResponse = - commitMergeOnConfigServer(opCtx, nss, minKey, maxKey, epoch, metadataBeforeMerge.get()); + auto cmdResponse = commitMergeOnConfigServer(opCtx, nss, chunkRange, metadataBeforeMerge.get()); boost::optional<ChunkVersion> shardVersionReceived = [&]() -> boost::optional<ChunkVersion> { // old versions might not have the shardVersion field @@ -254,7 +292,7 @@ void mergeChunks(OperationContext* opCtx, auto writeConcernStatus = std::move(cmdResponse.writeConcernStatus); if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) && - checkMetadataForSuccess(opCtx, nss, epoch, ChunkRange(minKey, maxKey))) { + checkMetadataForSuccess(opCtx, nss, epoch, chunkRange)) { LOGV2_DEBUG(21983, 1, "mergeChunk interval [{minKey},{maxKey}) has already been committed", |