summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2021-06-08 13:22:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-11 15:49:35 +0000
commit93c36da06fa36454a7f8ff77ce2c86a07ba97e4f (patch)
tree761e26cbfe5d4dbebc8bbed5705337a06790d7ec /src/mongo
parent7bd0cf6c595a330d307a319a394355ded89839af (diff)
downloadmongo-93c36da06fa36454a7f8ff77ce2c86a07ba97e4f.tar.gz
SERVER-56786 expand the bounds parameter of mergeChunk in the config server
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/collection_metadata.h4
-rw-r--r--src/mongo/db/s/config/configsvr_merge_chunks_command.cpp114
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h17
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp196
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp150
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/catalog/type_chunk.cpp6
-rw-r--r--src/mongo/s/catalog/type_chunk.h2
-rw-r--r--src/mongo/s/request_types/merge_chunks_request_test.cpp164
-rw-r--r--src/mongo/s/request_types/merge_chunks_request_type.cpp147
-rw-r--r--src/mongo/s/request_types/merge_chunks_request_type.h106
12 files changed, 857 insertions, 51 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 47557cca651..bd0ce3e8bdf 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -324,6 +324,7 @@ env.Library(
'config/configsvr_drop_database_command.cpp',
'config/configsvr_enable_sharding_command.cpp',
'config/configsvr_merge_chunk_command.cpp',
+ 'config/configsvr_merge_chunks_command.cpp',
'config/configsvr_move_chunk_command.cpp',
'config/configsvr_move_primary_command.cpp',
'config/configsvr_remove_shard_command.cpp',
diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h
index 824787506f4..a80d2350756 100644
--- a/src/mongo/db/s/collection_metadata.h
+++ b/src/mongo/db/s/collection_metadata.h
@@ -124,6 +124,10 @@ public:
return _cm->uuidMatches(uuid);
}
+ boost::optional<UUID> getUUID() const {
+ return _cm->getUUID();
+ }
+
/**
* Returns just the shard key fields, if the collection is sharded, and the _id field, from
* `doc`. Does not alter any field values (e.g. by hashing); values are copied verbatim.
diff --git a/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp
new file mode 100644
index 00000000000..73e4c127749
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp
@@ -0,0 +1,114 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/s/request_types/merge_chunks_request_type.h"
+
+namespace mongo {
+namespace {
+
+/**
+ * Internal sharding command run on config servers to merge a set of chunks.
+ *
+ * Format:
+ * {
+ * _configsvrCommitChunksMerge: <string namespace>,
+ * collEpoch: <OID epoch>,
+ * lowerBound: <BSONObj minKey>,
+ * upperBound: <BSONObj maxKey>,
+ * shard: <string shard>,
+ * writeConcern: <BSONObj>
+ * }
+ */
+class ConfigSvrMergeChunksCommand : public BasicCommand {
+public:
+ ConfigSvrMergeChunksCommand() : BasicCommand("_configsvrCommitChunksMerge") {}
+
+ std::string help() const override {
+ return "Internal command, which is sent by a shard to the sharding config server. Do "
+ "not call directly. Receives, validates, and processes a MergeChunksRequest";
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return true;
+ }
+
+ Status checkAuthForCommand(Client* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) const override {
+ if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
+ ResourcePattern::forClusterResource(), ActionType::internal)) {
+ return Status(ErrorCodes::Unauthorized, "Unauthorized");
+ }
+ return Status::OK();
+ }
+
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
+ return CommandHelpers::parseNsFullyQualified(cmdObj);
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
+ uassert(ErrorCodes::IllegalOperation,
+ "_configsvrCommitChunksMerge can only be run on config servers",
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+
+ // Set the operation context read concern level to local for reads into the config database.
+ repl::ReadConcernArgs::get(opCtx) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ auto parsedRequest = uassertStatusOK(MergeChunksRequest::parseFromConfigCommand(cmdObj));
+
+ const BSONObj shardAndCollVers = uassertStatusOK(
+ ShardingCatalogManager::get(opCtx)->commitChunksMerge(opCtx,
+ parsedRequest.getNamespace(),
+ parsedRequest.getCollectionUUID(),
+ parsedRequest.getChunkRange(),
+ parsedRequest.getShardId(),
+ parsedRequest.getValidAfter()));
+ result.appendElements(shardAndCollVers);
+
+ return true;
+ }
+
+} configsvrMergeChunksCmd;
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 90c868a98b2..11131fdfdaf 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -207,6 +207,23 @@ public:
const boost::optional<Timestamp>& validAfter);
/**
+ * Updates metadata in the config.chunks collection so the chunks within the specified key range
+ * are seen merged into a single larger chunk.
+ * If 'validAfter' is not set, this means the commit request came from an older server version,
+ * which is not history-aware.
+ *
+ * Returns a BSON object with the newly produced chunk versions after the migration:
+ * - shardVersion - The new shard version of the source shard
+ * - collectionVersion - The new collection version after the commit
+ */
+ StatusWith<BSONObj> commitChunksMerge(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& requestCollectionUUID,
+ const ChunkRange& chunkRange,
+ const ShardId& shardId,
+ const boost::optional<Timestamp>& validAfter);
+
+ /**
* Updates metadata in config.chunks collection to show the given chunk in its new shard.
* If 'validAfter' is not set, this means the commit request came from an older server version,
* which is not history-aware.
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 2ec76871066..eb24df90173 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -681,6 +681,202 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge(
return result.obj();
}
+StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& requestCollectionUUID,
+ const ChunkRange& chunkRange,
+ const ShardId& shardId,
+ const boost::optional<Timestamp>& validAfter) {
+ if (!validAfter) {
+ return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
+ }
+
+ const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations
+ // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/
+ // move chunks on different collections to proceed in parallel
+ Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+
+ // 1. Retrieve the initial collection and shard version values
+ // to validate the request parameters and build up the logging info.
+ auto swCollVersion = getMaxChunkVersionFromQueryResponse(
+ nss,
+ configShard->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()), // Query all chunks for this namespace.
+ BSON(ChunkType::lastmod << -1), // Sort by version.
+ 1)); // Limit 1.
+
+ if (!swCollVersion.isOK()) {
+ return swCollVersion.getStatus().withContext(str::stream()
+ << "mergeChunk cannot merge chunks.");
+ }
+
+ auto collVersion = swCollVersion.getValue();
+
+ auto swShardVersion = getMaxChunkVersionFromQueryResponse(
+ nss,
+ configShard->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns() << "shard"
+ << shardId.toString()), // Query all chunks for this namespace and shard.
+ BSON(ChunkType::lastmod << -1), // Sort by version.
+ 1)); // Limit 1.
+
+ if (!swShardVersion.isOK()) {
+ return swShardVersion.getStatus().withContext("mergeChunk cannot merge chunks.");
+ }
+
+ auto shardVersion = swShardVersion.getValue();
+
+ auto findCollResponse =
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ CollectionType::ConfigNS,
+ BSON("_id" << nss.ns()),
+ {},
+ 1);
+ if (!findCollResponse.isOK()) {
+ return findCollResponse.getStatus();
+ }
+ if (findCollResponse.getValue().docs.empty()) {
+ return {ErrorCodes::Error(5678601),
+ str::stream() << "Collection '" << nss.ns() << "' no longer either exists"};
+ }
+
+ auto coll = uassertStatusOK(CollectionType::fromBSON(findCollResponse.getValue().docs[0]));
+ invariant(coll.getUUID());
+ auto collUUID = *(coll.getUUID());
+ if (collUUID != requestCollectionUUID) {
+ return {
+ ErrorCodes::InvalidUUID,
+ str::stream() << "UUID of collection does not match UUID of request. Colletion UUID: "
+ << collUUID << ", request UUID: " << requestCollectionUUID};
+ }
+
+ // 2. Retrieve the list of chunks belonging to the requested shard + key range.
+ const auto shardChunksInRangeQuery = [&]() {
+ BSONObjBuilder queryBuilder;
+ queryBuilder << ChunkType::epoch(coll.getEpoch());
+ queryBuilder << ChunkType::shard(shardId.toString());
+ queryBuilder << ChunkType::min(BSON("$gte" << chunkRange.getMin()));
+ queryBuilder << ChunkType::max(BSON("$lte" << chunkRange.getMax()));
+ return queryBuilder.obj();
+ }();
+
+ const auto shardChunksInRangeResponse = uassertStatusOK(
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ shardChunksInRangeQuery,
+ BSON(ChunkType::min << 1),
+ boost::none));
+
+ // Check if the chunk(s) have already been merged. If so, return success.
+ if (shardChunksInRangeResponse.docs.size() == 1) {
+ auto chunk =
+ uassertStatusOK(ChunkType::fromConfigBSON(shardChunksInRangeResponse.docs.back()));
+ uassert(
+ ErrorCodes::IllegalOperation,
+ str::stream() << "could not merge chunks, shard " << shardId
+ << " does not contain a sequence of chunks that exactly fills the range "
+ << chunkRange.toString(),
+ chunk.getRange() == chunkRange);
+ BSONObjBuilder result;
+ collVersion.appendToCommand(&result);
+ return result.obj();
+ }
+
+ // 3. Prepare the data for the merge
+ // and ensure that the retrieved list of chunks covers the whole range.
+ std::vector<ChunkType> chunksToMerge;
+ for (const auto& chunkDoc : shardChunksInRangeResponse.docs) {
+ auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkDoc));
+ if (chunksToMerge.empty()) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream()
+ << "could not merge chunks, shard " << shardId
+ << " does not contain a sequence of chunks that exactly fills the range "
+ << chunkRange.toString(),
+ chunk.getMin().woCompare(chunkRange.getMin()) == 0);
+ } else {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream()
+ << "could not merge chunks, shard " << shardId
+ << " does not contain a sequence of chunks that exactly fills the range "
+ << chunkRange.toString(),
+ chunk.getMin().woCompare(chunksToMerge.back().getMax()) == 0);
+ }
+ chunksToMerge.push_back(std::move(chunk));
+ }
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "could not merge chunks, shard " << shardId
+ << " does not contain a sequence of chunks that exactly fills the range "
+ << chunkRange.toString(),
+ !chunksToMerge.empty() &&
+ chunksToMerge.back().getMax().woCompare(chunkRange.getMax()) == 0);
+
+ ChunkVersion mergeVersion = collVersion;
+ // If the incrementChunkMajorVersionOnChunkSplits flag is set, increment the major version only
+ // if the shard that owns the chunks being merged has shardVersion == collectionVersion. See
+ // SERVER-41480 for details.
+ //
+ // This flag is only useful if there are some 4.0 routers still in the cluster, so we only use
+ // it if FCV is not fully upgraded.
+ const auto currentFCV = serverGlobalParams.featureCompatibility.getVersion();
+ if (currentFCV != ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 &&
+ incrementChunkMajorVersionOnChunkSplits.load() && shardVersion == collVersion) {
+ mergeVersion.incMajor();
+ } else {
+ mergeVersion.incMinor();
+ }
+
+ auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter);
+ auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, collVersion);
+
+ // 4. apply the batch of updates to local metadata
+ uassertStatusOK(Grid::get(opCtx)->catalogClient()->applyChunkOpsDeprecated(
+ opCtx,
+ updates,
+ preCond,
+ nss,
+ mergeVersion,
+ WriteConcernOptions(),
+ repl::ReadConcernLevel::kLocalReadConcern));
+
+ // 5. log changes
+ BSONObjBuilder logDetail;
+ {
+ BSONArrayBuilder b(logDetail.subarrayStart("merged"));
+ for (const auto& chunkToMerge : chunksToMerge) {
+ b.append(chunkToMerge.toConfigBSON());
+ }
+ }
+ collVersion.appendLegacyWithField(&logDetail, "prevShardVersion");
+ mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion");
+ // logDetail.append("owningShard", shardId); TODO not needed in v4.4?
+
+ ShardingLogging::get(opCtx)->logChange(
+ opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions());
+
+ BSONObjBuilder result;
+ mergeVersion.appendToCommand(&result);
+
+ return result.obj();
+}
+
+
StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index a8ee1bc4fba..e5bfe7bda14 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -47,6 +47,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/merge_chunk_request_type.h"
+#include "mongo/s/request_types/merge_chunks_request_type.h"
#include "mongo/util/log.h"
#include "mongo/util/str.h"
@@ -75,56 +76,37 @@ bool checkMetadataForSuccess(OperationContext* opCtx,
chunk.getMax().woCompare(chunkRange.getMax()) == 0;
}
-void mergeChunks(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& minKey,
- const BSONObj& maxKey,
- const OID& epoch) {
- const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from "
- << redact(minKey) << " to " << redact(maxKey);
- auto scopedDistLock = uassertStatusOKWithContext(
- Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock(
- opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout),
- str::stream() << "could not acquire collection lock for " << nss.ns()
- << " to merge chunks in [" << redact(minKey) << ", " << redact(maxKey)
- << ")");
-
+Shard::CommandResponse commitUsingChunkRange(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkRange& chunkRange,
+ const ScopedCollectionMetadata& metadata) {
auto const shardingState = ShardingState::get(opCtx);
+ auto collUUID = metadata->getUUID();
+ invariant(collUUID);
+ MergeChunksRequest request{nss,
+ shardingState->shardId(),
+ *collUUID,
+ chunkRange,
+ LogicalClock::get(opCtx)->getClusterTime().asTimestamp()};
- // We now have the collection distributed lock, refresh metadata to latest version and sanity
- // check
- const bool isVersioned = OperationShardingState::isOperationVersioned(opCtx);
- if (!isVersioned) {
- forceShardFilteringMetadataRefresh(opCtx, nss, true /* forceRefreshFromThisThread */);
- }
-
- const auto metadata = [&] {
- AutoGetCollection autoColl(opCtx, nss, MODE_IS);
- auto css = CollectionShardingState::get(opCtx, nss);
- // If there is a version attached to the OperationContext, validate it
- if (isVersioned) {
- css->checkShardVersionOrThrow(opCtx);
- }
- return css->getCurrentMetadata();
- }();
-
- uassert(ErrorCodes::StaleEpoch,
- str::stream() << "Collection " << nss.ns() << " became unsharded",
- metadata->isSharded());
-
- const auto shardVersion = metadata->getShardVersion();
- uassert(ErrorCodes::StaleEpoch,
- str::stream() << "could not merge chunks, collection " << nss.ns()
- << " has changed since merge was sent (sent epoch: " << epoch.toString()
- << ", current epoch: " << shardVersion.epoch() << ")",
- shardVersion.epoch() == epoch);
+ auto configCmdObj =
+ request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ auto cmdResponse =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ configCmdObj,
+ Shard::RetryPolicy::kIdempotent));
+ return cmdResponse;
+}
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "could not merge chunks, the range "
- << redact(ChunkRange(minKey, maxKey).toString()) << " is not valid"
- << " for collection " << nss.ns() << " with key pattern "
- << metadata->getKeyPattern().toString(),
- metadata->isValidKey(minKey) && metadata->isValidKey(maxKey));
+Shard::CommandResponse commitUsingChunksList(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const ScopedCollectionMetadata& metadata) {
+ auto const shardingState = ShardingState::get(opCtx);
//
// Get merged chunk information
@@ -209,7 +191,7 @@ void mergeChunks(OperationContext* opCtx,
//
MergeChunkRequest request{nss,
shardingState->shardId().toString(),
- shardVersion.epoch(),
+ metadata->getShardVersion().epoch(),
chunkBoundaries,
LogicalClock::get(opCtx)->getClusterTime().asTimestamp()};
@@ -223,6 +205,73 @@ void mergeChunks(OperationContext* opCtx,
configCmdObj,
Shard::RetryPolicy::kIdempotent));
+ return cmdResponse;
+}
+
+Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkRange& chunkRange,
+ const ScopedCollectionMetadata& metadata) {
+ auto commandResponse = commitUsingChunkRange(opCtx, nss, chunkRange, metadata);
+ return commandResponse.commandStatus == ErrorCodes::CommandNotFound
+ ? commitUsingChunksList(opCtx, nss, chunkRange.getMin(), chunkRange.getMax(), metadata)
+ : commandResponse;
+}
+
+void mergeChunks(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const OID& epoch) {
+
+ const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from "
+ << redact(minKey) << " to " << redact(maxKey);
+ ChunkRange chunkRange(minKey, maxKey);
+
+ auto scopedDistLock = uassertStatusOKWithContext(
+ Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock(
+ opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout),
+ str::stream() << "could not acquire collection lock for " << nss.ns()
+ << " to merge chunks in " << redact(chunkRange.toString()));
+
+ // We now have the collection distributed lock, refresh metadata to latest version and sanity
+ // check
+ const bool isVersioned = OperationShardingState::isOperationVersioned(opCtx);
+ if (!isVersioned) {
+ forceShardFilteringMetadataRefresh(opCtx, nss, true /* forceRefreshFromThisThread */);
+ }
+
+ const auto metadata = [&] {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+ auto css = CollectionShardingState::get(opCtx, nss);
+ // If there is a version attached to the OperationContext, validate it
+ if (isVersioned) {
+ css->checkShardVersionOrThrow(opCtx);
+ }
+ return css->getCurrentMetadata();
+ }();
+
+ uassert(ErrorCodes::StaleEpoch,
+ str::stream() << "Collection " << nss.ns() << " became unsharded",
+ metadata->isSharded());
+
+ const auto shardEpoch = metadata->getShardVersion().epoch();
+ uassert(ErrorCodes::StaleEpoch,
+ str::stream() << "could not merge chunks, collection " << nss.ns()
+ << " has changed since merge was sent (sent epoch: " << epoch.toString()
+ << ", current epoch: " << shardEpoch << ")",
+ shardEpoch == epoch);
+
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "could not merge chunks, the range " << redact(chunkRange.toString())
+ << " is not valid"
+ << " for collection " << nss.ns() << " with key pattern "
+ << metadata->getKeyPattern().toString(),
+ metadata->isValidKey(minKey) && metadata->isValidKey(maxKey));
+
+ auto cmdResponse = commitMergeOnConfigServer(opCtx, nss, chunkRange, metadata);
+
+
// old versions might not have the shardVersion field
if (cmdResponse.response[ChunkVersion::kShardVersionField]) {
const auto cv = uassertStatusOK(
@@ -243,9 +292,8 @@ void mergeChunks(OperationContext* opCtx,
auto writeConcernStatus = std::move(cmdResponse.writeConcernStatus);
if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) &&
- checkMetadataForSuccess(opCtx, nss, epoch, ChunkRange(minKey, maxKey))) {
- LOG(1) << "mergeChunk [" << redact(minKey) << "," << redact(maxKey)
- << ") has already been committed.";
+ checkMetadataForSuccess(opCtx, nss, epoch, chunkRange)) {
+ LOG(1) << "mergeChunk " << chunkRange.toString() << " has already been committed.";
return;
}
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 965ee8358f3..0376952c50e 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -161,6 +161,7 @@ env.Library(
'request_types/balance_chunk_request_type.cpp',
'request_types/commit_chunk_migration_request_type.cpp',
'request_types/merge_chunk_request_type.cpp',
+ 'request_types/merge_chunks_request_type.cpp',
'request_types/migration_secondary_throttle_options.cpp',
'request_types/move_chunk_request.cpp',
'request_types/remove_shard_from_zone_request_type.cpp',
diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp
index e26d063a939..a70a2088f51 100644
--- a/src/mongo/s/catalog/type_chunk.cpp
+++ b/src/mongo/s/catalog/type_chunk.cpp
@@ -115,6 +115,12 @@ StatusWith<ChunkRange> ChunkRange::fromBSON(const BSONObj& obj) {
return ChunkRange(minKey.Obj().getOwned(), maxKey.Obj().getOwned());
}
+BSONObj ChunkRange::toBSON() const {
+ BSONObjBuilder builder;
+ append(&builder);
+ return builder.obj();
+}
+
bool ChunkRange::containsKey(const BSONObj& key) const {
return _minKey.woCompare(key) <= 0 && key.woCompare(_maxKey) < 0;
}
diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h
index 29b695f710d..72450bad707 100644
--- a/src/mongo/s/catalog/type_chunk.h
+++ b/src/mongo/s/catalog/type_chunk.h
@@ -67,6 +67,8 @@ public:
return _maxKey;
}
+ BSONObj toBSON() const;
+
const Status extractKeyPattern(KeyPattern* shardKeyPatternOut) const;
/**
diff --git a/src/mongo/s/request_types/merge_chunks_request_test.cpp b/src/mongo/s/request_types/merge_chunks_request_test.cpp
new file mode 100644
index 00000000000..2b5e48ba9f2
--- /dev/null
+++ b/src/mongo/s/request_types/merge_chunks_request_test.cpp
@@ -0,0 +1,164 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/s/request_types/merge_chunks_request_type.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using unittest::assertGet;
+
+ChunkRange chunkRange(BSON("a" << 1), BSON("a" << 10));
+
+TEST(MergeChunksRequest, BasicValidConfigCommand) {
+ auto collUUID = UUID::gen();
+ auto request = assertGet(MergeChunksRequest::parseFromConfigCommand(
+ BSON("_configsvrCommitChunksMerge"
+ << "TestDB.TestColl"
+ << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard"
+ << "shard0000")));
+ ASSERT_EQ(NamespaceString("TestDB", "TestColl"), request.getNamespace());
+ ASSERT_TRUE(collUUID == request.getCollectionUUID());
+ ASSERT_TRUE(chunkRange == request.getChunkRange());
+ ASSERT_EQ("shard0000", request.getShardId().toString());
+}
+
+TEST(MergeChunksRequest, ConfigCommandtoBSON) {
+ auto collUUID = UUID::gen();
+ BSONObj serializedRequest =
+ BSON("_configsvrCommitChunksMerge"
+ << "TestDB.TestColl"
+ << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard"
+ << "shard0000"
+ << "validAfter" << Timestamp{100});
+ BSONObj writeConcernObj = BSON("w"
+ << "majority");
+
+ BSONObjBuilder cmdBuilder;
+ {
+ cmdBuilder.appendElements(serializedRequest);
+ cmdBuilder.append("writeConcern", writeConcernObj);
+ }
+
+ auto request = assertGet(MergeChunksRequest::parseFromConfigCommand(serializedRequest));
+ auto requestToBSON = request.toConfigCommandBSON(writeConcernObj);
+
+ ASSERT_BSONOBJ_EQ(cmdBuilder.obj(), requestToBSON);
+}
+
+TEST(MergeChunksRequest, MissingNameSpaceErrors) {
+ auto collUUID = UUID::gen();
+ auto request = MergeChunksRequest::parseFromConfigCommand(
+ BSON("collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
+}
+
+TEST(MergeChunksRequest, MissingcollUUIDErrors) {
+ auto request = MergeChunksRequest::parseFromConfigCommand(BSON("_configsvrCommitChunksMerge"
+ << "TestDB.TestColl"
+ << "chunkRange"
+ << chunkRange.toBSON() << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
+}
+
+TEST(MergeChunksRequest, MissingChunkRangeErrors) {
+ auto collUUID = UUID::gen();
+ auto request = MergeChunksRequest::parseFromConfigCommand(BSON("_configsvrCommitChunksMerge"
+ << "TestDB.TestColl"
+ << "collUUID"
+ << collUUID.toBSON() << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
+}
+
+TEST(MergeChunksRequest, MissingShardIdErrors) {
+ auto collUUID = UUID::gen();
+ auto request = MergeChunksRequest::parseFromConfigCommand(
+ BSON("_configsvrCommitChunksMerge"
+ << "TestDB.TestColl"
+ << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON()));
+ ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
+}
+
+TEST(MergeChunksRequest, WrongNamespaceTypeErrors) {
+ auto collUUID = UUID::gen();
+ auto request = MergeChunksRequest::parseFromConfigCommand(
+ BSON("_configsvrCommitChunksMerge" << 1234 << "collUUID" << collUUID.toBSON()
+ << "chunkRange" << chunkRange.toBSON() << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
+}
+
+TEST(MergeChunksRequest, WrongcollUUIDTypeErrors) {
+ auto request = MergeChunksRequest::parseFromConfigCommand(
+ BSON("_configsvrCommitChunksMerge"
+ << "TestDB.TestColl"
+ << "collUUID" << 1234 << "chunkRange" << chunkRange.toBSON() << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
+}
+
+TEST(MergeChunksRequest, WrongChunkRangeTypeErrors) {
+ auto collUUID = UUID::gen();
+ auto request = MergeChunksRequest::parseFromConfigCommand(
+ BSON("_configsvrCommitChunksMerge"
+ << "TestDB.TestColl"
+ << "collUUID" << collUUID.toBSON() << "chunkRange" << 1234 << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
+}
+
+TEST(MergeChunksRequest, WrongShardIdTypeErrors) {
+ auto collUUID = UUID::gen();
+ auto request = MergeChunksRequest::parseFromConfigCommand(
+ BSON("_configsvrCommitChunksMerge"
+ << "TestDB.TestColl"
+ << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard"
+ << 1234));
+ ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
+}
+
+TEST(MergeChunksRequest, InvalidNamespaceErrors) {
+ auto collUUID = UUID::gen();
+ auto request = MergeChunksRequest::parseFromConfigCommand(
+ BSON("_configsvrCommitChunksMerge"
+ << ""
+ << "collUUID" << collUUID.toBSON() << "chunkRange" << chunkRange.toBSON() << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::InvalidNamespace, request.getStatus());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/request_types/merge_chunks_request_type.cpp b/src/mongo/s/request_types/merge_chunks_request_type.cpp
new file mode 100644
index 00000000000..1ea06a036fb
--- /dev/null
+++ b/src/mongo/s/request_types/merge_chunks_request_type.cpp
@@ -0,0 +1,147 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/s/request_types/merge_chunks_request_type.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/db/write_concern_options.h"
+#include "mongo/platform/basic.h"
+
+namespace mongo {
+namespace {
+
+const char kConfigsvrMergeChunks[] = "_configsvrCommitChunksMerge";
+const char kCollUUID[] = "collUUID";
+const char kChunkRange[] = "chunkRange";
+const char kShardId[] = "shard";
+const char kValidAfter[] = "validAfter";
+} // namespace
+
+MergeChunksRequest::MergeChunksRequest(NamespaceString nss,
+ ShardId shardId,
+ UUID collectionUUID,
+ ChunkRange chunkRange,
+ boost::optional<Timestamp> validAfter)
+ : _nss(std::move(nss)),
+ _collectionUUID(std::move(collectionUUID)),
+ _chunkRange(std::move(chunkRange)),
+ _shardId(std::move(shardId)),
+ _validAfter(validAfter) {}
+
+StatusWith<MergeChunksRequest> MergeChunksRequest::parseFromConfigCommand(const BSONObj& cmdObj) {
+ std::string ns;
+ {
+ auto parseNamespaceStatus = bsonExtractStringField(cmdObj, kConfigsvrMergeChunks, &ns);
+ if (!parseNamespaceStatus.isOK()) {
+ return parseNamespaceStatus;
+ }
+ }
+ NamespaceString nss(ns);
+ if (!nss.isValid()) {
+ return {ErrorCodes::InvalidNamespace,
+ str::stream() << "invalid namespace '" << nss.ns() << "' specified for request"};
+ }
+
+ BSONElement collUUIDElem;
+ {
+ auto parseCollUUIDStatus =
+ bsonExtractTypedField(cmdObj, kCollUUID, mongo::Object, &collUUIDElem);
+ if (!parseCollUUIDStatus.isOK()) {
+ return parseCollUUIDStatus;
+ }
+ }
+
+ auto collUUID = UUID::parse(collUUIDElem.Obj().getField("uuid"));
+ if (!collUUID.isOK()) {
+ return collUUID.getStatus();
+ }
+
+ BSONElement chunkRangeElem;
+ {
+ auto chunkRangeStatus =
+ bsonExtractTypedField(cmdObj, kChunkRange, mongo::Object, &chunkRangeElem);
+ if (!chunkRangeStatus.isOK()) {
+ return chunkRangeStatus;
+ }
+ }
+
+ auto chunkRange = ChunkRange::fromBSON(chunkRangeElem.Obj().getOwned());
+ if (!chunkRange.isOK()) {
+ return chunkRange.getStatus();
+ }
+
+ std::string shardIdString;
+ {
+ auto parseShardIdStatus = bsonExtractStringField(cmdObj, kShardId, &shardIdString);
+ if (!parseShardIdStatus.isOK()) {
+ return parseShardIdStatus;
+ }
+ }
+
+ boost::optional<Timestamp> validAfter = boost::none;
+ {
+ Timestamp ts{0};
+ auto status = bsonExtractTimestampField(cmdObj, kValidAfter, &ts);
+ if (!status.isOK() && status != ErrorCodes::NoSuchKey) {
+ return status;
+ }
+
+ if (status.isOK()) {
+ validAfter = ts;
+ }
+ }
+
+ return MergeChunksRequest(std::move(nss),
+ ShardId(shardIdString),
+ std::move(collUUID.getValue()),
+ std::move(chunkRange.getValue()),
+ validAfter);
+}
+
+BSONObj MergeChunksRequest::toConfigCommandBSON(const BSONObj& writeConcern) {
+ BSONObjBuilder cmdBuilder;
+ appendAsConfigCommand(&cmdBuilder);
+
+ // Tack on passed-in writeConcern
+ cmdBuilder.append(WriteConcernOptions::kWriteConcernField, writeConcern);
+
+ return cmdBuilder.obj();
+}
+
+void MergeChunksRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) {
+ cmdBuilder->append(kConfigsvrMergeChunks, _nss.ns());
+ cmdBuilder->append(kCollUUID, _collectionUUID.toBSON());
+ cmdBuilder->append(kChunkRange, _chunkRange.toBSON());
+ cmdBuilder->append(kShardId, _shardId);
+ invariant(_validAfter.is_initialized());
+ cmdBuilder->append(kValidAfter, _validAfter.get());
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/s/request_types/merge_chunks_request_type.h b/src/mongo/s/request_types/merge_chunks_request_type.h
new file mode 100644
index 00000000000..bed10c7f338
--- /dev/null
+++ b/src/mongo/s/request_types/merge_chunks_request_type.h
@@ -0,0 +1,106 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/s/catalog/type_chunk.h"
+
+namespace mongo {
+
+/**
+ * Provides support for parsing and serialization of arguments to the config server mergeChunks
+ * command.
+ */
+class MergeChunksRequest {
+public:
+ MergeChunksRequest(NamespaceString nss,
+ ShardId shardId,
+ UUID collUUID,
+ ChunkRange chunkRange,
+ boost::optional<Timestamp> validAfter);
+ /**
+ * Parses the provided BSON content as the internal _configsvrCommitChunksMerge command
+ * and if it contains the correct types, constructs a MergeChunksRequest object from it.
+ *
+ * {
+ * _configsvrCommitChunksMerge: <NamespaceString nss>,
+ * collUUID: <UUID>,
+ * chunkRage: <ChunkRange [minKey, maxKey)>,
+ * shard: <string shard>
+ * }
+ */
+ static StatusWith<MergeChunksRequest> parseFromConfigCommand(const BSONObj& cmdObj);
+
+ /**
+ * Creates a BSONObjBuilder and uses it to create and return a BSONObj from this
+ * MergeChunksRequest instance. Calls appendAsConfigCommand and tacks on the passed-in
+ * writeConcern.
+ */
+ BSONObj toConfigCommandBSON(const BSONObj& writeConcern);
+
+ /**
+ * Creates a serialized BSONObj of the internal _configsvCommitChunksMerge command
+ * from this MergeChunksRequest instance.
+ */
+ void appendAsConfigCommand(BSONObjBuilder* cmdBuilder);
+
+ const NamespaceString& getNamespace() const {
+ return _nss;
+ }
+
+ const UUID& getCollectionUUID() const {
+ return _collectionUUID;
+ }
+
+ const ChunkRange& getChunkRange() const {
+ return _chunkRange;
+ }
+
+ const ShardId& getShardId() const {
+ return _shardId;
+ }
+
+ const boost::optional<Timestamp>& getValidAfter() const {
+ return _validAfter;
+ }
+
+private:
+ NamespaceString _nss;
+
+ UUID _collectionUUID;
+
+ ChunkRange _chunkRange;
+
+ ShardId _shardId;
+
+ boost::optional<Timestamp> _validAfter;
+};
+
+} // namespace mongo