diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2021-06-08 13:22:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-11 15:49:35 +0000 |
commit | 93c36da06fa36454a7f8ff77ce2c86a07ba97e4f (patch) | |
tree | 761e26cbfe5d4dbebc8bbed5705337a06790d7ec /src/mongo | |
parent | 7bd0cf6c595a330d307a319a394355ded89839af (diff) | |
download | mongo-93c36da06fa36454a7f8ff77ce2c86a07ba97e4f.tar.gz |
SERVER-56786 expand the bounds parameter of mergeChunk in the config server
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/collection_metadata.h | 4 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_merge_chunks_command.cpp | 114 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager.h | 17 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp | 196 | ||||
-rw-r--r-- | src/mongo/db/s/merge_chunks_command.cpp | 150 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_chunk.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/catalog/type_chunk.h | 2 | ||||
-rw-r--r-- | src/mongo/s/request_types/merge_chunks_request_test.cpp | 164 | ||||
-rw-r--r-- | src/mongo/s/request_types/merge_chunks_request_type.cpp | 147 | ||||
-rw-r--r-- | src/mongo/s/request_types/merge_chunks_request_type.h | 106 |
12 files changed, 857 insertions, 51 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 47557cca651..bd0ce3e8bdf 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -324,6 +324,7 @@ env.Library( 'config/configsvr_drop_database_command.cpp', 'config/configsvr_enable_sharding_command.cpp', 'config/configsvr_merge_chunk_command.cpp', + 'config/configsvr_merge_chunks_command.cpp', 'config/configsvr_move_chunk_command.cpp', 'config/configsvr_move_primary_command.cpp', 'config/configsvr_remove_shard_command.cpp', diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h index 824787506f4..a80d2350756 100644 --- a/src/mongo/db/s/collection_metadata.h +++ b/src/mongo/db/s/collection_metadata.h @@ -124,6 +124,10 @@ public: return _cm->uuidMatches(uuid); } + boost::optional<UUID> getUUID() const { + return _cm->getUUID(); + } + /** * Returns just the shard key fields, if the collection is sharded, and the _id field, from * `doc`. Does not alter any field values (e.g. by hashing); values are copied verbatim. diff --git a/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp new file mode 100644 index 00000000000..73e4c127749 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/s/request_types/merge_chunks_request_type.h" + +namespace mongo { +namespace { + +/** + * Internal sharding command run on config servers to merge a set of chunks. + * + * Format: + * { + * _configsvrCommitChunksMerge: <string namespace>, + * collEpoch: <OID epoch>, + * lowerBound: <BSONObj minKey>, + * upperBound: <BSONObj maxKey>, + * shard: <string shard>, + * writeConcern: <BSONObj> + * } + */ +class ConfigSvrMergeChunksCommand : public BasicCommand { +public: + ConfigSvrMergeChunksCommand() : BasicCommand("_configsvrCommitChunksMerge") {} + + std::string help() const override { + return "Internal command, which is sent by a shard to the sharding config server. Do " + "not call directly. Receives, validates, and processes a MergeChunksRequest"; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return true; + } + + Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) const override { + if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( + ResourcePattern::forClusterResource(), ActionType::internal)) { + return Status(ErrorCodes::Unauthorized, "Unauthorized"); + } + return Status::OK(); + } + + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + return CommandHelpers::parseNsFullyQualified(cmdObj); + } + + bool run(OperationContext* opCtx, + const std::string& dbName, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + uassert(ErrorCodes::IllegalOperation, + "_configsvrCommitChunksMerge can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + + // Set the operation context read concern level to local for reads into the config database. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + auto parsedRequest = uassertStatusOK(MergeChunksRequest::parseFromConfigCommand(cmdObj)); + + const BSONObj shardAndCollVers = uassertStatusOK( + ShardingCatalogManager::get(opCtx)->commitChunksMerge(opCtx, + parsedRequest.getNamespace(), + parsedRequest.getCollectionUUID(), + parsedRequest.getChunkRange(), + parsedRequest.getShardId(), + parsedRequest.getValidAfter())); + result.appendElements(shardAndCollVers); + + return true; + } + +} configsvrMergeChunksCmd; +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 90c868a98b2..11131fdfdaf 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -207,6 +207,23 @@ public: const boost::optional<Timestamp>& validAfter); /** + * Updates metadata in the config.chunks collection so the chunks within the specified key range + * are seen merged into a single larger chunk. + * If 'validAfter' is not set, this means the commit request came from an older server version, + * which is not history-aware. + * + * Returns a BSON object with the newly produced chunk versions after the migration: + * - shardVersion - The new shard version of the source shard + * - collectionVersion - The new collection version after the commit + */ + StatusWith<BSONObj> commitChunksMerge(OperationContext* opCtx, + const NamespaceString& nss, + const UUID& requestCollectionUUID, + const ChunkRange& chunkRange, + const ShardId& shardId, + const boost::optional<Timestamp>& validAfter); + + /** * Updates metadata in config.chunks collection to show the given chunk in its new shard. * If 'validAfter' is not set, this means the commit request came from an older server version, * which is not history-aware. diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 2ec76871066..eb24df90173 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -681,6 +681,202 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge( return result.obj(); } +StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge( + OperationContext* opCtx, + const NamespaceString& nss, + const UUID& requestCollectionUUID, + const ChunkRange& chunkRange, + const ShardId& shardId, + const boost::optional<Timestamp>& validAfter) { + if (!validAfter) { + return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"}; + } + + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + + // 1. Retrieve the initial collection and shard version values + // to validate the request parameters and build up the logging info. + auto swCollVersion = getMaxChunkVersionFromQueryResponse( + nss, + configShard->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns()), // Query all chunks for this namespace. + BSON(ChunkType::lastmod << -1), // Sort by version. + 1)); // Limit 1. + + if (!swCollVersion.isOK()) { + return swCollVersion.getStatus().withContext(str::stream() + << "mergeChunk cannot merge chunks."); + } + + auto collVersion = swCollVersion.getValue(); + + auto swShardVersion = getMaxChunkVersionFromQueryResponse( + nss, + configShard->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns() << "shard" + << shardId.toString()), // Query all chunks for this namespace and shard. + BSON(ChunkType::lastmod << -1), // Sort by version. + 1)); // Limit 1. + + if (!swShardVersion.isOK()) { + return swShardVersion.getStatus().withContext("mergeChunk cannot merge chunks."); + } + + auto shardVersion = swShardVersion.getValue(); + + auto findCollResponse = + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + CollectionType::ConfigNS, + BSON("_id" << nss.ns()), + {}, + 1); + if (!findCollResponse.isOK()) { + return findCollResponse.getStatus(); + } + if (findCollResponse.getValue().docs.empty()) { + return {ErrorCodes::Error(5678601), + str::stream() << "Collection '" << nss.ns() << "' no longer either exists"}; + } + + auto coll = uassertStatusOK(CollectionType::fromBSON(findCollResponse.getValue().docs[0])); + invariant(coll.getUUID()); + auto collUUID = *(coll.getUUID()); + if (collUUID != requestCollectionUUID) { + return { + ErrorCodes::InvalidUUID, + str::stream() << "UUID of collection does not match UUID of request. Colletion UUID: " + << collUUID << ", request UUID: " << requestCollectionUUID}; + } + + // 2. Retrieve the list of chunks belonging to the requested shard + key range. + const auto shardChunksInRangeQuery = [&]() { + BSONObjBuilder queryBuilder; + queryBuilder << ChunkType::epoch(coll.getEpoch()); + queryBuilder << ChunkType::shard(shardId.toString()); + queryBuilder << ChunkType::min(BSON("$gte" << chunkRange.getMin())); + queryBuilder << ChunkType::max(BSON("$lte" << chunkRange.getMax())); + return queryBuilder.obj(); + }(); + + const auto shardChunksInRangeResponse = uassertStatusOK( + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + shardChunksInRangeQuery, + BSON(ChunkType::min << 1), + boost::none)); + + // Check if the chunk(s) have already been merged. If so, return success. + if (shardChunksInRangeResponse.docs.size() == 1) { + auto chunk = + uassertStatusOK(ChunkType::fromConfigBSON(shardChunksInRangeResponse.docs.back())); + uassert( + ErrorCodes::IllegalOperation, + str::stream() << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + chunk.getRange() == chunkRange); + BSONObjBuilder result; + collVersion.appendToCommand(&result); + return result.obj(); + } + + // 3. Prepare the data for the merge + // and ensure that the retrieved list of chunks covers the whole range. + std::vector<ChunkType> chunksToMerge; + for (const auto& chunkDoc : shardChunksInRangeResponse.docs) { + auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkDoc)); + if (chunksToMerge.empty()) { + uassert(ErrorCodes::IllegalOperation, + str::stream() + << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + chunk.getMin().woCompare(chunkRange.getMin()) == 0); + } else { + uassert(ErrorCodes::IllegalOperation, + str::stream() + << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + chunk.getMin().woCompare(chunksToMerge.back().getMax()) == 0); + } + chunksToMerge.push_back(std::move(chunk)); + } + uassert(ErrorCodes::IllegalOperation, + str::stream() << "could not merge chunks, shard " << shardId + << " does not contain a sequence of chunks that exactly fills the range " + << chunkRange.toString(), + !chunksToMerge.empty() && + chunksToMerge.back().getMax().woCompare(chunkRange.getMax()) == 0); + + ChunkVersion mergeVersion = collVersion; + // If the incrementChunkMajorVersionOnChunkSplits flag is set, increment the major version only + // if the shard that owns the chunks being merged has shardVersion == collectionVersion. See + // SERVER-41480 for details. + // + // This flag is only useful if there are some 4.0 routers still in the cluster, so we only use + // it if FCV is not fully upgraded. + const auto currentFCV = serverGlobalParams.featureCompatibility.getVersion(); + if (currentFCV != ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 && + incrementChunkMajorVersionOnChunkSplits.load() && shardVersion == collVersion) { + mergeVersion.incMajor(); + } else { + mergeVersion.incMinor(); + } + + auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter); + auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, collVersion); + + // 4. apply the batch of updates to local metadata + uassertStatusOK(Grid::get(opCtx)->catalogClient()->applyChunkOpsDeprecated( + opCtx, + updates, + preCond, + nss, + mergeVersion, + WriteConcernOptions(), + repl::ReadConcernLevel::kLocalReadConcern)); + + // 5. log changes + BSONObjBuilder logDetail; + { + BSONArrayBuilder b(logDetail.subarrayStart("merged")); + for (const auto& chunkToMerge : chunksToMerge) { + b.append(chunkToMerge.toConfigBSON()); + } + } + collVersion.appendLegacyWithField(&logDetail, "prevShardVersion"); + mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion"); + // logDetail.append("owningShard", shardId); TODO not needed in v4.4? + + ShardingLogging::get(opCtx)->logChange( + opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions()); + + BSONObjBuilder result; + mergeVersion.appendToCommand(&result); + + return result.obj(); +} + + StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration( OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index a8ee1bc4fba..e5bfe7bda14 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -47,6 +47,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/merge_chunk_request_type.h" +#include "mongo/s/request_types/merge_chunks_request_type.h" #include "mongo/util/log.h" #include "mongo/util/str.h" @@ -75,56 +76,37 @@ bool checkMetadataForSuccess(OperationContext* opCtx, chunk.getMax().woCompare(chunkRange.getMax()) == 0; } -void mergeChunks(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& minKey, - const BSONObj& maxKey, - const OID& epoch) { - const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from " - << redact(minKey) << " to " << redact(maxKey); - auto scopedDistLock = uassertStatusOKWithContext( - Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock( - opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout), - str::stream() << "could not acquire collection lock for " << nss.ns() - << " to merge chunks in [" << redact(minKey) << ", " << redact(maxKey) - << ")"); - +Shard::CommandResponse commitUsingChunkRange(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange, + const ScopedCollectionMetadata& metadata) { auto const shardingState = ShardingState::get(opCtx); + auto collUUID = metadata->getUUID(); + invariant(collUUID); + MergeChunksRequest request{nss, + shardingState->shardId(), + *collUUID, + chunkRange, + LogicalClock::get(opCtx)->getClusterTime().asTimestamp()}; - // We now have the collection distributed lock, refresh metadata to latest version and sanity - // check - const bool isVersioned = OperationShardingState::isOperationVersioned(opCtx); - if (!isVersioned) { - forceShardFilteringMetadataRefresh(opCtx, nss, true /* forceRefreshFromThisThread */); - } - - const auto metadata = [&] { - AutoGetCollection autoColl(opCtx, nss, MODE_IS); - auto css = CollectionShardingState::get(opCtx, nss); - // If there is a version attached to the OperationContext, validate it - if (isVersioned) { - css->checkShardVersionOrThrow(opCtx); - } - return css->getCurrentMetadata(); - }(); - - uassert(ErrorCodes::StaleEpoch, - str::stream() << "Collection " << nss.ns() << " became unsharded", - metadata->isSharded()); - - const auto shardVersion = metadata->getShardVersion(); - uassert(ErrorCodes::StaleEpoch, - str::stream() << "could not merge chunks, collection " << nss.ns() - << " has changed since merge was sent (sent epoch: " << epoch.toString() - << ", current epoch: " << shardVersion.epoch() << ")", - shardVersion.epoch() == epoch); + auto configCmdObj = + request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + auto cmdResponse = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + configCmdObj, + Shard::RetryPolicy::kIdempotent)); + return cmdResponse; +} - uassert(ErrorCodes::IllegalOperation, - str::stream() << "could not merge chunks, the range " - << redact(ChunkRange(minKey, maxKey).toString()) << " is not valid" - << " for collection " << nss.ns() << " with key pattern " - << metadata->getKeyPattern().toString(), - metadata->isValidKey(minKey) && metadata->isValidKey(maxKey)); +Shard::CommandResponse commitUsingChunksList(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey, + const ScopedCollectionMetadata& metadata) { + auto const shardingState = ShardingState::get(opCtx); // // Get merged chunk information @@ -209,7 +191,7 @@ void mergeChunks(OperationContext* opCtx, // MergeChunkRequest request{nss, shardingState->shardId().toString(), - shardVersion.epoch(), + metadata->getShardVersion().epoch(), chunkBoundaries, LogicalClock::get(opCtx)->getClusterTime().asTimestamp()}; @@ -223,6 +205,73 @@ void mergeChunks(OperationContext* opCtx, configCmdObj, Shard::RetryPolicy::kIdempotent)); + return cmdResponse; +} + +Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx, + const NamespaceString& nss, + const ChunkRange& chunkRange, + const ScopedCollectionMetadata& metadata) { + auto commandResponse = commitUsingChunkRange(opCtx, nss, chunkRange, metadata); + return commandResponse.commandStatus == ErrorCodes::CommandNotFound + ? commitUsingChunksList(opCtx, nss, chunkRange.getMin(), chunkRange.getMax(), metadata) + : commandResponse; +} + +void mergeChunks(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey, + const OID& epoch) { + + const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from " + << redact(minKey) << " to " << redact(maxKey); + ChunkRange chunkRange(minKey, maxKey); + + auto scopedDistLock = uassertStatusOKWithContext( + Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock( + opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout), + str::stream() << "could not acquire collection lock for " << nss.ns() + << " to merge chunks in " << redact(chunkRange.toString())); + + // We now have the collection distributed lock, refresh metadata to latest version and sanity + // check + const bool isVersioned = OperationShardingState::isOperationVersioned(opCtx); + if (!isVersioned) { + forceShardFilteringMetadataRefresh(opCtx, nss, true /* forceRefreshFromThisThread */); + } + + const auto metadata = [&] { + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + auto css = CollectionShardingState::get(opCtx, nss); + // If there is a version attached to the OperationContext, validate it + if (isVersioned) { + css->checkShardVersionOrThrow(opCtx); + } + return css->getCurrentMetadata(); + }(); + + uassert(ErrorCodes::StaleEpoch, + str::stream() << "Collection " << nss.ns() << " became unsharded", + metadata->isSharded()); + + const auto shardEpoch = metadata->getShardVersion().epoch(); + uassert(ErrorCodes::StaleEpoch, + str::stream() << "could not merge chunks, collection " << nss.ns() + << " has changed since merge was sent (sent epoch: " << epoch.toString() + << ", current epoch: " << shardEpoch << ")", + shardEpoch == epoch); + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "could not merge chunks, the range " << redact(chunkRange.toString()) + << " is not valid" + << " for collection " << nss.ns() << " with key pattern " + << metadata->getKeyPattern().toString(), + metadata->isValidKey(minKey) && metadata->isValidKey(maxKey)); + + auto cmdResponse = commitMergeOnConfigServer(opCtx, nss, chunkRange, metadata); + + // old versions might not have the shardVersion field if (cmdResponse.response[ChunkVersion::kShardVersionField]) { const auto cv = uassertStatusOK( @@ -243,9 +292,8 @@ void mergeChunks(OperationContext* opCtx, auto writeConcernStatus = std::move(cmdResponse.writeConcernStatus); if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) && - checkMetadataForSuccess(opCtx, nss, epoch, ChunkRange(minKey, maxKey))) { - LOG(1) << "mergeChunk [" << redact(minKey) << "," << redact(maxKey) - << ") has already been committed."; + checkMetadataForSuccess(opCtx, nss, epoch, chunkRange)) { + LOG(1) << "mergeChunk " << chunkRange.toString() << " has already been committed."; return; } diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 965ee8358f3..0376952c50e 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -161,6 +161,7 @@ env.Library( 'request_types/balance_chunk_request_type.cpp', 'request_types/commit_chunk_migration_request_type.cpp', 'request_types/merge_chunk_request_type.cpp', + 'request_types/merge_chunks_request_type.cpp', 'request_types/migration_secondary_throttle_options.cpp', 'request_types/move_chunk_request.cpp', 'request_types/remove_shard_from_zone_request_type.cpp', diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp index e26d063a939..a70a2088f51 100644 --- a/src/mongo/s/catalog/type_chunk.cpp +++ b/src/mongo/s/catalog/type_chunk.cpp @@ -115,6 +115,12 @@ StatusWith<ChunkRange> ChunkRange::fromBSON(const BSONObj& obj) { return ChunkRange(minKey.Obj().getOwned(), maxKey.Obj().getOwned()); } +BSONObj ChunkRange::toBSON() const { + BSONObjBuilder builder; + append(&builder); + return builder.obj(); +} + bool ChunkRange::containsKey(const BSONObj& key) const { return _minKey.woCompare(key) <= 0 && key.woCompare(_maxKey) < 0; } diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index 29b695f710d..72450bad707 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -67,6 +67,8 @@ public: return _maxKey; } + BSONObj toBSON() const; + const Status extractKeyPattern(KeyPattern* shardKeyPatternOut) const; /** diff --git a/src/mongo/s/request_types/merge_chunks_request_test.cpp b/src/mongo/s/request_types/merge_chunks_request_test.cpp new file mode 100644 index 00000000000..2b5e48ba9f2 --- /dev/null +++ b/src/mongo/s/request_types/merge_chunks_request_test.cpp @@ -0,0 +1,164 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/s/request_types/merge_chunks_request_type.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using unittest::assertGet; + +ChunkRange chunkRange(BSON("a" << 1), BSON("a" << 10)); + +TEST(MergeChunksRequest, BasicValidConfigCommand) { + auto collUUID = UUID::gen(); + auto request = assertGet(MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000"))); + ASSERT_EQ(NamespaceString("TestDB", "TestColl"), request.getNamespace()); + ASSERT_TRUE(collUUID == request.getCollectionUUID()); + ASSERT_TRUE(chunkRange == request.getChunkRange()); + ASSERT_EQ("shard0000", request.getShardId().toString()); +} + +TEST(MergeChunksRequest, ConfigCommandtoBSON) { + auto collUUID = UUID::gen(); + BSONObj serializedRequest = + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000" + << "validAfter" << Timestamp{100}); + BSONObj writeConcernObj = BSON("w" + << "majority"); + + BSONObjBuilder cmdBuilder; + { + cmdBuilder.appendElements(serializedRequest); + cmdBuilder.append("writeConcern", writeConcernObj); + } + + auto request = assertGet(MergeChunksRequest::parseFromConfigCommand(serializedRequest)); + auto requestToBSON = request.toConfigCommandBSON(writeConcernObj); + + ASSERT_BSONOBJ_EQ(cmdBuilder.obj(), requestToBSON); +} + +TEST(MergeChunksRequest, MissingNameSpaceErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(MergeChunksRequest, MissingcollUUIDErrors) { + auto request = MergeChunksRequest::parseFromConfigCommand(BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "chunkRange" + << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(MergeChunksRequest, MissingChunkRangeErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand(BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" + << collUUID.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(MergeChunksRequest, MissingShardIdErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON())); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(MergeChunksRequest, WrongNamespaceTypeErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" << 1234 << "collUUID" << collUUID.toBSON() + << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(MergeChunksRequest, WrongcollUUIDTypeErrors) { + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << 1234 << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(MergeChunksRequest, WrongChunkRangeTypeErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << 1234 << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(MergeChunksRequest, WrongShardIdTypeErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "TestDB.TestColl" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << 1234)); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(MergeChunksRequest, InvalidNamespaceErrors) { + auto collUUID = UUID::gen(); + auto request = MergeChunksRequest::parseFromConfigCommand( + BSON("_configsvrCommitChunksMerge" + << "" + << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::InvalidNamespace, request.getStatus()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/request_types/merge_chunks_request_type.cpp b/src/mongo/s/request_types/merge_chunks_request_type.cpp new file mode 100644 index 00000000000..1ea06a036fb --- /dev/null +++ b/src/mongo/s/request_types/merge_chunks_request_type.cpp @@ -0,0 +1,147 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/request_types/merge_chunks_request_type.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/platform/basic.h" + +namespace mongo { +namespace { + +const char kConfigsvrMergeChunks[] = "_configsvrCommitChunksMerge"; +const char kCollUUID[] = "collUUID"; +const char kChunkRange[] = "chunkRange"; +const char kShardId[] = "shard"; +const char kValidAfter[] = "validAfter"; +} // namespace + +MergeChunksRequest::MergeChunksRequest(NamespaceString nss, + ShardId shardId, + UUID collectionUUID, + ChunkRange chunkRange, + boost::optional<Timestamp> validAfter) + : _nss(std::move(nss)), + _collectionUUID(std::move(collectionUUID)), + _chunkRange(std::move(chunkRange)), + _shardId(std::move(shardId)), + _validAfter(validAfter) {} + +StatusWith<MergeChunksRequest> MergeChunksRequest::parseFromConfigCommand(const BSONObj& cmdObj) { + std::string ns; + { + auto parseNamespaceStatus = bsonExtractStringField(cmdObj, kConfigsvrMergeChunks, &ns); + if (!parseNamespaceStatus.isOK()) { + return parseNamespaceStatus; + } + } + NamespaceString nss(ns); + if (!nss.isValid()) { + return {ErrorCodes::InvalidNamespace, + str::stream() << "invalid namespace '" << nss.ns() << "' specified for request"}; + } + + BSONElement collUUIDElem; + { + auto parseCollUUIDStatus = + bsonExtractTypedField(cmdObj, kCollUUID, mongo::Object, &collUUIDElem); + if (!parseCollUUIDStatus.isOK()) { + return parseCollUUIDStatus; + } + } + + auto collUUID = UUID::parse(collUUIDElem.Obj().getField("uuid")); + if (!collUUID.isOK()) { + return collUUID.getStatus(); + } + + BSONElement chunkRangeElem; + { + auto chunkRangeStatus = + bsonExtractTypedField(cmdObj, kChunkRange, mongo::Object, &chunkRangeElem); + if (!chunkRangeStatus.isOK()) { + return chunkRangeStatus; + } + } + + auto chunkRange = ChunkRange::fromBSON(chunkRangeElem.Obj().getOwned()); + if (!chunkRange.isOK()) { + return chunkRange.getStatus(); + } + + std::string shardIdString; + { + auto parseShardIdStatus = bsonExtractStringField(cmdObj, kShardId, &shardIdString); + if (!parseShardIdStatus.isOK()) { + return parseShardIdStatus; + } + } + + boost::optional<Timestamp> validAfter = boost::none; + { + Timestamp ts{0}; + auto status = bsonExtractTimestampField(cmdObj, kValidAfter, &ts); + if (!status.isOK() && status != ErrorCodes::NoSuchKey) { + return status; + } + + if (status.isOK()) { + validAfter = ts; + } + } + + return MergeChunksRequest(std::move(nss), + ShardId(shardIdString), + std::move(collUUID.getValue()), + std::move(chunkRange.getValue()), + validAfter); +} + +BSONObj MergeChunksRequest::toConfigCommandBSON(const BSONObj& writeConcern) { + BSONObjBuilder cmdBuilder; + appendAsConfigCommand(&cmdBuilder); + + // Tack on passed-in writeConcern + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, writeConcern); + + return cmdBuilder.obj(); +} + +void MergeChunksRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) { + cmdBuilder->append(kConfigsvrMergeChunks, _nss.ns()); + cmdBuilder->append(kCollUUID, _collectionUUID.toBSON()); + cmdBuilder->append(kChunkRange, _chunkRange.toBSON()); + cmdBuilder->append(kShardId, _shardId); + invariant(_validAfter.is_initialized()); + cmdBuilder->append(kValidAfter, _validAfter.get()); +} + + +} // namespace mongo diff --git a/src/mongo/s/request_types/merge_chunks_request_type.h b/src/mongo/s/request_types/merge_chunks_request_type.h new file mode 100644 index 00000000000..bed10c7f338 --- /dev/null +++ b/src/mongo/s/request_types/merge_chunks_request_type.h @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/namespace_string.h" +#include "mongo/s/catalog/type_chunk.h" + +namespace mongo { + +/** + * Provides support for parsing and serialization of arguments to the config server mergeChunks + * command. + */ +class MergeChunksRequest { +public: + MergeChunksRequest(NamespaceString nss, + ShardId shardId, + UUID collUUID, + ChunkRange chunkRange, + boost::optional<Timestamp> validAfter); + /** + * Parses the provided BSON content as the internal _configsvrCommitChunksMerge command + * and if it contains the correct types, constructs a MergeChunksRequest object from it. + * + * { + * _configsvrCommitChunksMerge: <NamespaceString nss>, + * collUUID: <UUID>, + * chunkRage: <ChunkRange [minKey, maxKey)>, + * shard: <string shard> + * } + */ + static StatusWith<MergeChunksRequest> parseFromConfigCommand(const BSONObj& cmdObj); + + /** + * Creates a BSONObjBuilder and uses it to create and return a BSONObj from this + * MergeChunksRequest instance. Calls appendAsConfigCommand and tacks on the passed-in + * writeConcern. + */ + BSONObj toConfigCommandBSON(const BSONObj& writeConcern); + + /** + * Creates a serialized BSONObj of the internal _configsvCommitChunksMerge command + * from this MergeChunksRequest instance. + */ + void appendAsConfigCommand(BSONObjBuilder* cmdBuilder); + + const NamespaceString& getNamespace() const { + return _nss; + } + + const UUID& getCollectionUUID() const { + return _collectionUUID; + } + + const ChunkRange& getChunkRange() const { + return _chunkRange; + } + + const ShardId& getShardId() const { + return _shardId; + } + + const boost::optional<Timestamp>& getValidAfter() const { + return _validAfter; + } + +private: + NamespaceString _nss; + + UUID _collectionUUID; + + ChunkRange _chunkRange; + + ShardId _shardId; + + boost::optional<Timestamp> _validAfter; +}; + +} // namespace mongo |