diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/collection_metadata.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_merge_chunks_command.cpp | 114 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager.h | 17 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp | 151 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 62 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/request_types/merge_chunks_request_test.cpp | 164 | ||||
-rw-r--r-- | src/mongo/s/request_types/merge_chunks_request_type.cpp | 147 | ||||
-rw-r--r-- | src/mongo/s/request_types/merge_chunks_request_type.h | 106 |
10 files changed, 756 insertions, 12 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 54fccaa8444..42283ace154 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -309,6 +309,7 @@ env.Library( 'config/configsvr_enable_sharding_command.cpp', 'config/configsvr_ensure_chunk_version_is_greater_than_command.cpp', 'config/configsvr_merge_chunk_command.cpp', + 'config/configsvr_merge_chunks_command.cpp', 'config/configsvr_move_chunk_command.cpp', 'config/configsvr_move_primary_command.cpp', 'config/configsvr_refine_collection_shard_key_command.cpp', diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h index 79f02080b19..74a71cd639a 100644 --- a/src/mongo/db/s/collection_metadata.h +++ b/src/mongo/db/s/collection_metadata.h @@ -159,6 +159,10 @@ public: return _cm->uuidMatches(uuid); } + boost::optional<UUID> getUUID() const { + return _cm->getUUID(); + } + /** * Returns just the shard key fields, if the collection is sharded, and the _id field, from * `doc`. Does not alter any field values (e.g. by hashing); values are copied verbatim. diff --git a/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp new file mode 100644 index 00000000000..73e4c127749 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/s/request_types/merge_chunks_request_type.h" + +namespace mongo { +namespace { + +/** + * Internal sharding command run on config servers to merge a set of chunks. + * + * Format: + * { + * _configsvrCommitChunksMerge: <string namespace>, + * collEpoch: <OID epoch>, + * lowerBound: <BSONObj minKey>, + * upperBound: <BSONObj maxKey>, + * shard: <string shard>, + * writeConcern: <BSONObj> + * } + */ +class ConfigSvrMergeChunksCommand : public BasicCommand { +public: + ConfigSvrMergeChunksCommand() : BasicCommand("_configsvrCommitChunksMerge") {} + + std::string help() const override { + return "Internal command, which is sent by a shard to the sharding config server. Do " + "not call directly. Receives, validates, and processes a MergeChunksRequest"; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return true; + } + + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) const override { + if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( + ResourcePattern::forClusterResource(), ActionType::internal)) { + return Status(ErrorCodes::Unauthorized, "Unauthorized"); + } + return Status::OK(); + } + + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + return CommandHelpers::parseNsFullyQualified(cmdObj); + } + + bool run(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + uassert(ErrorCodes::IllegalOperation, + "_configsvrCommitChunksMerge can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + + // Set the operation context read concern level to local for reads into the config database. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + auto parsedRequest = uassertStatusOK(MergeChunksRequest::parseFromConfigCommand(cmdObj)); + + const BSONObj shardAndCollVers = uassertStatusOK( + ShardingCatalogManager::get(opCtx)->commitChunksMerge(opCtx, + parsedRequest.getNamespace(), + parsedRequest.getCollectionUUID(), + parsedRequest.getChunkRange(), + parsedRequest.getShardId(), + parsedRequest.getValidAfter())); + result.appendElements(shardAndCollVers); + + return true; + } + +} configsvrMergeChunksCmd; +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 47e01831332..afe44f4cfd1 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -247,6 +247,23 @@ public: const boost::optional<Timestamp>& validAfter); /** + * Updates metadata in the config.chunks collection so the chunks within the specified key range + * are seen merged into a single larger chunk. + * If 'validAfter' is not set, this means the commit request came from an older server version, + * which is not history-aware. + * + * Returns a BSON object with the newly produced chunk versions after the migration: + * - shardVersion - The new shard version of the source shard + * - collectionVersion - The new collection version after the commit + */ + StatusWith<BSONObj> commitChunksMerge(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& requestCollectionUUID, + const ChunkRange& chunkRange, + const ShardId& shardId, + const boost::optional<Timestamp>& validAfter); + + /** * Updates metadata in config.chunks collection to show the given chunk in its new shard. * If 'validAfter' is not set, this means the commit request came from an older server version, * which is not history-aware. diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 97aaf4cb686..9d8cf39e48e 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -866,6 +866,157 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge( return getShardAndCollectionVersion(opCtx, coll, ShardId(shardName)); } +StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge( + OperationContext* opCtx, + const NamespaceString& nss, + const UUID& requestCollectionUUID, + const ChunkRange& chunkRange, + const ShardId& shardId, + const boost::optional<Timestamp>& validAfter) { + if (!validAfter) { + return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; + } + + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + + // 1. Retrieve the initial collection version info to build up the logging info. + auto swCollVersion = getCollectionVersion(opCtx, nss); + if (!swCollVersion.isOK()) { + return swCollVersion.getStatus().withContext(str::stream() + << "mergeChunk cannot merge chunks."); + } + + // 2. Retrieve the list of chunks belonging to the requested shard + key range; + // The query over config.collections is guaranteed to succeed, + // since it has been already issued & checked by getCollectionVersion() + auto findCollResponse = uassertStatusOK( + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + CollectionType::ConfigNS, + BSON(CollectionType::kNssFieldName << nss.ns()), + {}, + 1)); + const CollectionType coll(findCollResponse.docs[0]); + if (coll.getUuid() != requestCollectionUUID) { + return { + ErrorCodes::InvalidUUID, + str::stream() << "UUID of collection does not match UUID of request. Colletion UUID: " + << coll.getUuid() << ", request UUID: " << requestCollectionUUID}; + } + const auto shardChunksInRangeQuery = [&]() { + BSONObjBuilder queryBuilder; + if (coll.getTimestamp()) { + queryBuilder << ChunkType::collectionUUID << coll.getUuid(); + } else { + queryBuilder << ChunkType::ns(coll.getNss().ns()); + } + queryBuilder << ChunkType::shard(shardId.toString()); + queryBuilder << ChunkType::min(BSON("$gte" << chunkRange.getMin())); + queryBuilder << ChunkType::max(BSON("$lte" << chunkRange.getMax())); + return queryBuilder.obj(); + }(); + + const auto shardChunksInRangeResponse = uassertStatusOK( + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + shardChunksInRangeQuery, + BSON(ChunkType::min << 1), + boost::none)); + + // Check if the chunk(s) have already been merged. If so, return success. + if (shardChunksInRangeResponse.docs.size() == 1) { + auto chunk = + uassertStatusOK(ChunkType::fromConfigBSON(shardChunksInRangeResponse.docs.back())); + uassert( + ErrorCodes::IllegalOperation, + str::stream() << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + chunk.getRange() == chunkRange); + auto replyWithVersions = getShardAndCollectionVersion(opCtx, coll, shardId); + // Makes sure that the last thing we read in getCurrentChunk and + // getShardAndCollectionVersion gets majority written before to return from this command, + // otherwise next RoutingInfo cache refresh from the shard may not see those newest + // information. + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + return replyWithVersions; + } + + // 3. Prepare the data for the merge + // and ensure that the retrieved list of chunks covers the whole range. + std::vector<ChunkType> chunksToMerge; + for (const auto& chunkDoc : shardChunksInRangeResponse.docs) { + auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkDoc)); + if (chunksToMerge.empty()) { + uassert(ErrorCodes::IllegalOperation, + str::stream() + << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + chunk.getMin().woCompare(chunkRange.getMin()) == 0); + } else { + uassert(ErrorCodes::IllegalOperation, + str::stream() + << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + chunk.getMin().woCompare(chunksToMerge.back().getMax()) == 0); + } + chunksToMerge.push_back(std::move(chunk)); + } + uassert(ErrorCodes::IllegalOperation, + str::stream() << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + !chunksToMerge.empty() && + chunksToMerge.back().getMax().woCompare(chunkRange.getMax()) == 0); + + ChunkVersion initialVersion = swCollVersion.getValue(); + ChunkVersion mergeVersion = initialVersion; + mergeVersion.incMinor(); + + auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter); + auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, initialVersion); + + // 4. apply the batch of updates to local metadata + uassertStatusOK(Grid::get(opCtx)->catalogClient()->applyChunkOpsDeprecated( + opCtx, + updates, + preCond, + getNsOrUUIDForChunkTargeting(coll), + nss, + mergeVersion, + WriteConcernOptions(), + repl::ReadConcernLevel::kLocalReadConcern)); + + // 5. log changes + BSONObjBuilder logDetail; + { + BSONArrayBuilder b(logDetail.subarrayStart("merged")); + for (const auto& chunkToMerge : chunksToMerge) { + b.append(chunkToMerge.toConfigBSON()); + } + } + initialVersion.appendLegacyWithField(&logDetail, "prevShardVersion"); + mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion"); + logDetail.append("owningShard", shardId); + + ShardingLogging::get(opCtx)->logChange( + opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions()); + + return getShardAndCollectionVersion(opCtx, coll, shardId); +} + + StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index a48107135a5..815536ff78d 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -49,6 +49,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/merge_chunk_request_type.h" +#include "mongo/s/request_types/merge_chunks_request_type.h" #include "mongo/util/str.h" namespace mongo { @@ -74,12 +75,38 @@ bool checkMetadataForSuccess(OperationContext* opCtx, chunk.getMax().woCompare(chunkRange.getMax()) == 0; } -Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& minKey, - const BSONObj& maxKey, - const OID& epoch, - const CollectionMetadata& metadata) { +Shard::CommandResponse commitUsingChunkRange(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange, + const CollectionMetadata& metadata) { + auto const shardingState = ShardingState::get(opCtx); + const auto currentTime = VectorClock::get(opCtx)->getTime(); + auto collUUID = metadata.getUUID(); + invariant(collUUID); + MergeChunksRequest request{nss, + shardingState->shardId(), + *collUUID, + chunkRange, + currentTime.clusterTime().asTimestamp()}; + + auto configCmdObj = + request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + auto cmdResponse = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + configCmdObj, + Shard::RetryPolicy::kIdempotent)); + + return cmdResponse; +} + +Shard::CommandResponse commitUsingChunksList(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey, + const CollectionMetadata& metadata) { auto const shardingState = ShardingState::get(opCtx); // @@ -166,7 +193,7 @@ Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx, const auto currentTime = VectorClock::get(opCtx)->getTime(); MergeChunkRequest request{nss, shardingState->shardId().toString(), - epoch, + metadata.getShardVersion().epoch(), chunkBoundaries, currentTime.clusterTime().asTimestamp()}; @@ -183,6 +210,16 @@ Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx, return cmdResponse; } +Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange, + const CollectionMetadata& metadata) { + auto commandResponse = commitUsingChunkRange(opCtx, nss, chunkRange, metadata); + return commandResponse.commandStatus == ErrorCodes::CommandNotFound + ? commitUsingChunksList(opCtx, nss, chunkRange.getMin(), chunkRange.getMax(), metadata) + : commandResponse; +} + void mergeChunks(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& minKey, @@ -223,15 +260,16 @@ void mergeChunks(OperationContext* opCtx, << ", current epoch: " << epoch << ")", expectedEpoch == epoch); + ChunkRange chunkRange(minKey, maxKey); + uassert(ErrorCodes::IllegalOperation, - str::stream() << "could not merge chunks, the range " - << redact(ChunkRange(minKey, maxKey).toString()) << " is not valid" + str::stream() << "could not merge chunks, the range " << redact(chunkRange.toString()) + << " is not valid" << " for collection " << nss.ns() << " with key pattern " << metadataBeforeMerge->getKeyPattern().toString(), metadataBeforeMerge->isValidKey(minKey) && metadataBeforeMerge->isValidKey(maxKey)); - auto cmdResponse = - commitMergeOnConfigServer(opCtx, nss, minKey, maxKey, epoch, metadataBeforeMerge.get()); + auto cmdResponse = commitMergeOnConfigServer(opCtx, nss, chunkRange, metadataBeforeMerge.get()); boost::optional<ChunkVersion> shardVersionReceived = [&]() -> boost::optional<ChunkVersion> { // old versions might not have the shardVersion field @@ -254,7 +292,7 @@ void mergeChunks(OperationContext* opCtx, auto writeConcernStatus = std::move(cmdResponse.writeConcernStatus); if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) && - checkMetadataForSuccess(opCtx, nss, epoch, ChunkRange(minKey, maxKey))) { + checkMetadataForSuccess(opCtx, nss, epoch, chunkRange)) { LOGV2_DEBUG(21983, 1, "mergeChunk interval [{minKey},{maxKey}) has already been committed", diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index eaa47847456..d51c705a301 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -164,6 +164,7 @@ env.Library( 'request_types/flush_routing_table_cache_updates.idl', 'request_types/get_database_version.idl', 'request_types/merge_chunk_request_type.cpp', + 'request_types/merge_chunks_request_type.cpp', 'request_types/migration_secondary_throttle_options.cpp', 'request_types/move_chunk_request.cpp', 'request_types/move_primary.idl', @@ -611,6 +612,7 @@ env.CppUnitTest( 'request_types/balance_chunk_request_test.cpp', 'request_types/commit_chunk_migration_request_test.cpp', 'request_types/merge_chunk_request_test.cpp', + 'request_types/merge_chunks_request_test.cpp', 'request_types/migration_secondary_throttle_options_test.cpp', 'request_types/move_chunk_request_test.cpp', 'request_types/remove_shard_from_zone_request_test.cpp', diff --git a/src/mongo/s/request_types/merge_chunks_request_test.cpp b/src/mongo/s/request_types/merge_chunks_request_test.cpp new file mode 100644 index 00000000000..2b5e48ba9f2 --- /dev/null +++ b/src/mongo/s/request_types/merge_chunks_request_test.cpp @@ -0,0 +1,164 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/s/request_types/merge_chunks_request_type.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using unittest::assertGet; + +ChunkRange chunkRange(BSON("a" << 1), BSON("a" << 10)); + +TEST(MergeChunksRequest, BasicValidConfigCommand) { + auto collUUID = UUID::gen(); + auto request = assertGet(MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000"))); + ASSERT_EQ(NamespaceString("TestDB", "TestColl"), request.getNamespace()); + ASSERT_TRUE(collUUID == request.getCollectionUUID()); + ASSERT_TRUE(chunkRange == request.getChunkRange()); + ASSERT_EQ("shard0000", request.getShardId().toString()); +} + +TEST(MergeChunksRequest, ConfigCommandtoBSON) { + auto collUUID = UUID::gen(); + BSONObj serializedRequest = + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000" + << "validAfter" << Timestamp{100}); + BSONObj writeConcernObj = BSON("w" + << "majority"); + + BSONObjBuilder cmdBuilder; + { + cmdBuilder.appendElements(serializedRequest); + cmdBuilder.append("writeConcern", writeConcernObj); + } + + auto request = assertGet(MergeChunksRequest::parseFromConfigCommand(serializedRequest)); + auto requestToBSON = request.toConfigCommandBSON(writeConcernObj); + + ASSERT_BSONOBJ_EQ(cmdBuilder.obj(), requestToBSON); +} + +TEST(MergeChunksRequest, MissingNameSpaceErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(MergeChunksRequest, MissingcollUUIDErrors) { + auto request = MergeChunksRequest::parseFromConfigCommand(BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "chunkRange" + << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(MergeChunksRequest, MissingChunkRangeErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand(BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" + << collUUID.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(MergeChunksRequest, MissingShardIdErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON())); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(MergeChunksRequest, WrongNamespaceTypeErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" << 1234 << "collUUID" << collUUID.toBSON() + << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(MergeChunksRequest, WrongcollUUIDTypeErrors) { + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << 1234 << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(MergeChunksRequest, WrongChunkRangeTypeErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << 1234 << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(MergeChunksRequest, WrongShardIdTypeErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << 1234)); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(MergeChunksRequest, InvalidNamespaceErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::InvalidNamespace, request.getStatus()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/request_types/merge_chunks_request_type.cpp b/src/mongo/s/request_types/merge_chunks_request_type.cpp new file mode 100644 index 00000000000..1ea06a036fb --- /dev/null +++ b/src/mongo/s/request_types/merge_chunks_request_type.cpp @@ -0,0 +1,147 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/request_types/merge_chunks_request_type.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/platform/basic.h" + +namespace mongo { +namespace { + +const char kConfigsvrMergeChunks[] = "_configsvrCommitChunksMerge"; +const char kCollUUID[] = "collUUID"; +const char kChunkRange[] = "chunkRange"; +const char kShardId[] = "shard"; +const char kValidAfter[] = "validAfter"; +} // namespace + +MergeChunksRequest::MergeChunksRequest(NamespaceString nss, + ShardId shardId, + UUID collectionUUID, + ChunkRange chunkRange, + boost::optional<Timestamp> validAfter) + : _nss(std::move(nss)), + _collectionUUID(std::move(collectionUUID)), + _chunkRange(std::move(chunkRange)), + _shardId(std::move(shardId)), + _validAfter(validAfter) {} + +StatusWith<MergeChunksRequest> MergeChunksRequest::parseFromConfigCommand(const BSONObj& cmdObj) { + std::string ns; + { + auto parseNamespaceStatus = bsonExtractStringField(cmdObj, kConfigsvrMergeChunks, &ns); + if (!parseNamespaceStatus.isOK()) { + return parseNamespaceStatus; + } + } + NamespaceString nss(ns); + if (!nss.isValid()) { + return {ErrorCodes::InvalidNamespace, + str::stream() << "invalid namespace '" << nss.ns() << "' specified for request"}; + } + + BSONElement collUUIDElem; + { + auto parseCollUUIDStatus = + bsonExtractTypedField(cmdObj, kCollUUID, mongo::Object, &collUUIDElem); + if (!parseCollUUIDStatus.isOK()) { + return parseCollUUIDStatus; + } + } + + auto collUUID = UUID::parse(collUUIDElem.Obj().getField("uuid")); + if (!collUUID.isOK()) { + return collUUID.getStatus(); + } + + BSONElement chunkRangeElem; + { + auto chunkRangeStatus = + bsonExtractTypedField(cmdObj, kChunkRange, mongo::Object, &chunkRangeElem); + if (!chunkRangeStatus.isOK()) { + return chunkRangeStatus; + } + } + + auto chunkRange = ChunkRange::fromBSON(chunkRangeElem.Obj().getOwned()); + if (!chunkRange.isOK()) { + return chunkRange.getStatus(); + } + + std::string shardIdString; + { + auto parseShardIdStatus = bsonExtractStringField(cmdObj, kShardId, &shardIdString); + if (!parseShardIdStatus.isOK()) { + return parseShardIdStatus; + } + } + + boost::optional<Timestamp> validAfter = boost::none; + { + Timestamp ts{0}; + auto status = bsonExtractTimestampField(cmdObj, kValidAfter, &ts); + if (!status.isOK() && status != ErrorCodes::NoSuchKey) { + return status; + } + + if (status.isOK()) { + validAfter = ts; + } + } + + return MergeChunksRequest(std::move(nss), + ShardId(shardIdString), + std::move(collUUID.getValue()), + std::move(chunkRange.getValue()), + validAfter); +} + +BSONObj MergeChunksRequest::toConfigCommandBSON(const BSONObj& writeConcern) { + BSONObjBuilder cmdBuilder; + appendAsConfigCommand(&cmdBuilder); + + // Tack on passed-in writeConcern + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, writeConcern); + + return cmdBuilder.obj(); +} + +void MergeChunksRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) { + cmdBuilder->append(kConfigsvrMergeChunks, _nss.ns()); + cmdBuilder->append(kCollUUID, _collectionUUID.toBSON()); + cmdBuilder->append(kChunkRange, _chunkRange.toBSON()); + cmdBuilder->append(kShardId, _shardId); + invariant(_validAfter.is_initialized()); + cmdBuilder->append(kValidAfter, _validAfter.get()); +} + + +} // namespace mongo diff --git a/src/mongo/s/request_types/merge_chunks_request_type.h b/src/mongo/s/request_types/merge_chunks_request_type.h new file mode 100644 index 00000000000..bed10c7f338 --- /dev/null +++ b/src/mongo/s/request_types/merge_chunks_request_type.h @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/namespace_string.h" +#include "mongo/s/catalog/type_chunk.h" + +namespace mongo { + +/** + * Provides support for parsing and serialization of arguments to the config server mergeChunks + * command. + */ +class MergeChunksRequest { +public: + MergeChunksRequest(NamespaceString nss, + ShardId shardId, + UUID collUUID, + ChunkRange chunkRange, + boost::optional<Timestamp> validAfter); + /** + * Parses the provided BSON content as the internal _configsvrCommitChunksMerge command + * and if it contains the correct types, constructs a MergeChunksRequest object from it. + * + * { + * _configsvrCommitChunksMerge: <NamespaceString nss>, + * collUUID: <UUID>, + * chunkRage: <ChunkRange [minKey, maxKey)>, + * shard: <string shard> + * } + */ + static StatusWith<MergeChunksRequest> parseFromConfigCommand(const BSONObj& cmdObj); + + /** + * Creates a BSONObjBuilder and uses it to create and return a BSONObj from this + * MergeChunksRequest instance. Calls appendAsConfigCommand and tacks on the passed-in + * writeConcern. + */ + BSONObj toConfigCommandBSON(const BSONObj& writeConcern); + + /** + * Creates a serialized BSONObj of the internal _configsvCommitChunksMerge command + * from this MergeChunksRequest instance. + */ + void appendAsConfigCommand(BSONObjBuilder* cmdBuilder); + + const NamespaceString& getNamespace() const { + return _nss; + } + + const UUID& getCollectionUUID() const { + return _collectionUUID; + } + + const ChunkRange& getChunkRange() const { + return _chunkRange; + } + + const ShardId& getShardId() const { + return _shardId; + } + + const boost::optional<Timestamp>& getValidAfter() const { + return _validAfter; + } + +private: + NamespaceString _nss; + + UUID _collectionUUID; + + ChunkRange _chunkRange; + + ShardId _shardId; + + boost::optional<Timestamp> _validAfter; +}; + +} // namespace mongo |