summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/collection_metadata.h4
-rw-r--r--src/mongo/db/s/config/configsvr_merge_chunks_command.cpp114
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h17
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp151
-rw-r--r--src/mongo/db/s/merge_chunks_command.cpp62
6 files changed, 337 insertions, 12 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 54fccaa8444..42283ace154 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -309,6 +309,7 @@ env.Library(
'config/configsvr_enable_sharding_command.cpp',
'config/configsvr_ensure_chunk_version_is_greater_than_command.cpp',
'config/configsvr_merge_chunk_command.cpp',
+ 'config/configsvr_merge_chunks_command.cpp',
'config/configsvr_move_chunk_command.cpp',
'config/configsvr_move_primary_command.cpp',
'config/configsvr_refine_collection_shard_key_command.cpp',
diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h
index 79f02080b19..74a71cd639a 100644
--- a/src/mongo/db/s/collection_metadata.h
+++ b/src/mongo/db/s/collection_metadata.h
@@ -159,6 +159,10 @@ public:
return _cm->uuidMatches(uuid);
}
+ boost::optional<UUID> getUUID() const {
+ return _cm->getUUID();
+ }
+
/**
* Returns just the shard key fields, if the collection is sharded, and the _id field, from
* `doc`. Does not alter any field values (e.g. by hashing); values are copied verbatim.
diff --git a/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp
new file mode 100644
index 00000000000..73e4c127749
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_merge_chunks_command.cpp
@@ -0,0 +1,114 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/s/request_types/merge_chunks_request_type.h"
+
+namespace mongo {
+namespace {
+
+/**
+ * Internal sharding command run on config servers to merge a set of chunks.
+ *
+ * Format:
+ * {
+ * _configsvrCommitChunksMerge: <string namespace>,
+ * collEpoch: <OID epoch>,
+ * lowerBound: <BSONObj minKey>,
+ * upperBound: <BSONObj maxKey>,
+ * shard: <string shard>,
+ * writeConcern: <BSONObj>
+ * }
+ */
+class ConfigSvrMergeChunksCommand : public BasicCommand {
+public:
+ ConfigSvrMergeChunksCommand() : BasicCommand("_configsvrCommitChunksMerge") {}
+
+ std::string help() const override {
+ return "Internal command, which is sent by a shard to the sharding config server. Do "
+ "not call directly. Receives, validates, and processes a MergeChunksRequest";
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return true;
+ }
+
+ Status checkAuthForCommand(Client* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) const override {
+ if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
+ ResourcePattern::forClusterResource(), ActionType::internal)) {
+ return Status(ErrorCodes::Unauthorized, "Unauthorized");
+ }
+ return Status::OK();
+ }
+
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
+ return CommandHelpers::parseNsFullyQualified(cmdObj);
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string& dbName,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
+ uassert(ErrorCodes::IllegalOperation,
+ "_configsvrCommitChunksMerge can only be run on config servers",
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+
+ // Set the operation context read concern level to local for reads into the config database.
+ repl::ReadConcernArgs::get(opCtx) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ auto parsedRequest = uassertStatusOK(MergeChunksRequest::parseFromConfigCommand(cmdObj));
+
+ const BSONObj shardAndCollVers = uassertStatusOK(
+ ShardingCatalogManager::get(opCtx)->commitChunksMerge(opCtx,
+ parsedRequest.getNamespace(),
+ parsedRequest.getCollectionUUID(),
+ parsedRequest.getChunkRange(),
+ parsedRequest.getShardId(),
+ parsedRequest.getValidAfter()));
+ result.appendElements(shardAndCollVers);
+
+ return true;
+ }
+
+} configsvrMergeChunksCmd;
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 47e01831332..afe44f4cfd1 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -247,6 +247,23 @@ public:
const boost::optional<Timestamp>& validAfter);
/**
+ * Updates metadata in the config.chunks collection so the chunks within the specified key range
+ * are seen merged into a single larger chunk.
+ * If 'validAfter' is not set, this means the commit request came from an older server version,
+ * which is not history-aware.
+ *
+ * Returns a BSON object with the newly produced chunk versions after the migration:
+ * - shardVersion - The new shard version of the source shard
+ * - collectionVersion - The new collection version after the commit
+ */
+ StatusWith<BSONObj> commitChunksMerge(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& requestCollectionUUID,
+ const ChunkRange& chunkRange,
+ const ShardId& shardId,
+ const boost::optional<Timestamp>& validAfter);
+
+ /**
* Updates metadata in config.chunks collection to show the given chunk in its new shard.
* If 'validAfter' is not set, this means the commit request came from an older server version,
* which is not history-aware.
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 97aaf4cb686..9d8cf39e48e 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -866,6 +866,157 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge(
return getShardAndCollectionVersion(opCtx, coll, ShardId(shardName));
}
+StatusWith<BSONObj> ShardingCatalogManager::commitChunksMerge(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& requestCollectionUUID,
+ const ChunkRange& chunkRange,
+ const ShardId& shardId,
+ const boost::optional<Timestamp>& validAfter) {
+ if (!validAfter) {
+ return {ErrorCodes::IllegalOperation, "chunk operation requires validAfter timestamp"};
+ }
+
+ const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations
+ // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/
+ // move chunks on different collections to proceed in parallel
+ Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+
+ // 1. Retrieve the initial collection version info to build up the logging info.
+ auto swCollVersion = getCollectionVersion(opCtx, nss);
+ if (!swCollVersion.isOK()) {
+ return swCollVersion.getStatus().withContext(str::stream()
+ << "mergeChunk cannot merge chunks.");
+ }
+
+ // 2. Retrieve the list of chunks belonging to the requested shard + key range;
+ // The query over config.collections is guaranteed to succeed,
+ // since it has been already issued & checked by getCollectionVersion()
+ auto findCollResponse = uassertStatusOK(
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ CollectionType::ConfigNS,
+ BSON(CollectionType::kNssFieldName << nss.ns()),
+ {},
+ 1));
+ const CollectionType coll(findCollResponse.docs[0]);
+ if (coll.getUuid() != requestCollectionUUID) {
+ return {
+ ErrorCodes::InvalidUUID,
+ str::stream() << "UUID of collection does not match UUID of request. Colletion UUID: "
+ << coll.getUuid() << ", request UUID: " << requestCollectionUUID};
+ }
+ const auto shardChunksInRangeQuery = [&]() {
+ BSONObjBuilder queryBuilder;
+ if (coll.getTimestamp()) {
+ queryBuilder << ChunkType::collectionUUID << coll.getUuid();
+ } else {
+ queryBuilder << ChunkType::ns(coll.getNss().ns());
+ }
+ queryBuilder << ChunkType::shard(shardId.toString());
+ queryBuilder << ChunkType::min(BSON("$gte" << chunkRange.getMin()));
+ queryBuilder << ChunkType::max(BSON("$lte" << chunkRange.getMax()));
+ return queryBuilder.obj();
+ }();
+
+ const auto shardChunksInRangeResponse = uassertStatusOK(
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ shardChunksInRangeQuery,
+ BSON(ChunkType::min << 1),
+ boost::none));
+
+ // Check if the chunk(s) have already been merged. If so, return success.
+ if (shardChunksInRangeResponse.docs.size() == 1) {
+ auto chunk =
+ uassertStatusOK(ChunkType::fromConfigBSON(shardChunksInRangeResponse.docs.back()));
+ uassert(
+ ErrorCodes::IllegalOperation,
+ str::stream() << "could not merge chunks, shard " << shardId
+ << " does not contain a sequence of chunks that exactly fills the range "
+ << chunkRange.toString(),
+ chunk.getRange() == chunkRange);
+ auto replyWithVersions = getShardAndCollectionVersion(opCtx, coll, shardId);
+ // Makes sure that the last thing we read in getCurrentChunk and
+ // getShardAndCollectionVersion gets majority written before to return from this command,
+ // otherwise next RoutingInfo cache refresh from the shard may not see those newest
+ // information.
+ repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
+ return replyWithVersions;
+ }
+
+ // 3. Prepare the data for the merge
+ // and ensure that the retrieved list of chunks covers the whole range.
+ std::vector<ChunkType> chunksToMerge;
+ for (const auto& chunkDoc : shardChunksInRangeResponse.docs) {
+ auto chunk = uassertStatusOK(ChunkType::fromConfigBSON(chunkDoc));
+ if (chunksToMerge.empty()) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream()
+ << "could not merge chunks, shard " << shardId
+ << " does not contain a sequence of chunks that exactly fills the range "
+ << chunkRange.toString(),
+ chunk.getMin().woCompare(chunkRange.getMin()) == 0);
+ } else {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream()
+ << "could not merge chunks, shard " << shardId
+ << " does not contain a sequence of chunks that exactly fills the range "
+ << chunkRange.toString(),
+ chunk.getMin().woCompare(chunksToMerge.back().getMax()) == 0);
+ }
+ chunksToMerge.push_back(std::move(chunk));
+ }
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "could not merge chunks, shard " << shardId
+ << " does not contain a sequence of chunks that exactly fills the range "
+ << chunkRange.toString(),
+ !chunksToMerge.empty() &&
+ chunksToMerge.back().getMax().woCompare(chunkRange.getMax()) == 0);
+
+ ChunkVersion initialVersion = swCollVersion.getValue();
+ ChunkVersion mergeVersion = initialVersion;
+ mergeVersion.incMinor();
+
+ auto updates = buildMergeChunksTransactionUpdates(chunksToMerge, mergeVersion, validAfter);
+ auto preCond = buildMergeChunksTransactionPrecond(chunksToMerge, initialVersion);
+
+ // 4. apply the batch of updates to local metadata
+ uassertStatusOK(Grid::get(opCtx)->catalogClient()->applyChunkOpsDeprecated(
+ opCtx,
+ updates,
+ preCond,
+ getNsOrUUIDForChunkTargeting(coll),
+ nss,
+ mergeVersion,
+ WriteConcernOptions(),
+ repl::ReadConcernLevel::kLocalReadConcern));
+
+ // 5. log changes
+ BSONObjBuilder logDetail;
+ {
+ BSONArrayBuilder b(logDetail.subarrayStart("merged"));
+ for (const auto& chunkToMerge : chunksToMerge) {
+ b.append(chunkToMerge.toConfigBSON());
+ }
+ }
+ initialVersion.appendLegacyWithField(&logDetail, "prevShardVersion");
+ mergeVersion.appendLegacyWithField(&logDetail, "mergedVersion");
+ logDetail.append("owningShard", shardId);
+
+ ShardingLogging::get(opCtx)->logChange(
+ opCtx, "merge", nss.ns(), logDetail.obj(), WriteConcernOptions());
+
+ return getShardAndCollectionVersion(opCtx, coll, shardId);
+}
+
+
StatusWith<BSONObj> ShardingCatalogManager::commitChunkMigration(
OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp
index a48107135a5..815536ff78d 100644
--- a/src/mongo/db/s/merge_chunks_command.cpp
+++ b/src/mongo/db/s/merge_chunks_command.cpp
@@ -49,6 +49,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/merge_chunk_request_type.h"
+#include "mongo/s/request_types/merge_chunks_request_type.h"
#include "mongo/util/str.h"
namespace mongo {
@@ -74,12 +75,38 @@ bool checkMetadataForSuccess(OperationContext* opCtx,
chunk.getMax().woCompare(chunkRange.getMax()) == 0;
}
-Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& minKey,
- const BSONObj& maxKey,
- const OID& epoch,
- const CollectionMetadata& metadata) {
+Shard::CommandResponse commitUsingChunkRange(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkRange& chunkRange,
+ const CollectionMetadata& metadata) {
+ auto const shardingState = ShardingState::get(opCtx);
+ const auto currentTime = VectorClock::get(opCtx)->getTime();
+ auto collUUID = metadata.getUUID();
+ invariant(collUUID);
+ MergeChunksRequest request{nss,
+ shardingState->shardId(),
+ *collUUID,
+ chunkRange,
+ currentTime.clusterTime().asTimestamp()};
+
+ auto configCmdObj =
+ request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ auto cmdResponse =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ configCmdObj,
+ Shard::RetryPolicy::kIdempotent));
+
+ return cmdResponse;
+}
+
+Shard::CommandResponse commitUsingChunksList(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& minKey,
+ const BSONObj& maxKey,
+ const CollectionMetadata& metadata) {
auto const shardingState = ShardingState::get(opCtx);
//
@@ -166,7 +193,7 @@ Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx,
const auto currentTime = VectorClock::get(opCtx)->getTime();
MergeChunkRequest request{nss,
shardingState->shardId().toString(),
- epoch,
+ metadata.getShardVersion().epoch(),
chunkBoundaries,
currentTime.clusterTime().asTimestamp()};
@@ -183,6 +210,16 @@ Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx,
return cmdResponse;
}
+Shard::CommandResponse commitMergeOnConfigServer(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ChunkRange& chunkRange,
+ const CollectionMetadata& metadata) {
+ auto commandResponse = commitUsingChunkRange(opCtx, nss, chunkRange, metadata);
+ return commandResponse.commandStatus == ErrorCodes::CommandNotFound
+ ? commitUsingChunksList(opCtx, nss, chunkRange.getMin(), chunkRange.getMax(), metadata)
+ : commandResponse;
+}
+
void mergeChunks(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& minKey,
@@ -223,15 +260,16 @@ void mergeChunks(OperationContext* opCtx,
<< ", current epoch: " << epoch << ")",
expectedEpoch == epoch);
+ ChunkRange chunkRange(minKey, maxKey);
+
uassert(ErrorCodes::IllegalOperation,
- str::stream() << "could not merge chunks, the range "
- << redact(ChunkRange(minKey, maxKey).toString()) << " is not valid"
+ str::stream() << "could not merge chunks, the range " << redact(chunkRange.toString())
+ << " is not valid"
<< " for collection " << nss.ns() << " with key pattern "
<< metadataBeforeMerge->getKeyPattern().toString(),
metadataBeforeMerge->isValidKey(minKey) && metadataBeforeMerge->isValidKey(maxKey));
- auto cmdResponse =
- commitMergeOnConfigServer(opCtx, nss, minKey, maxKey, epoch, metadataBeforeMerge.get());
+ auto cmdResponse = commitMergeOnConfigServer(opCtx, nss, chunkRange, metadataBeforeMerge.get());
boost::optional<ChunkVersion> shardVersionReceived = [&]() -> boost::optional<ChunkVersion> {
// old versions might not have the shardVersion field
@@ -254,7 +292,7 @@ void mergeChunks(OperationContext* opCtx,
auto writeConcernStatus = std::move(cmdResponse.writeConcernStatus);
if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) &&
- checkMetadataForSuccess(opCtx, nss, epoch, ChunkRange(minKey, maxKey))) {
+ checkMetadataForSuccess(opCtx, nss, epoch, chunkRange)) {
LOGV2_DEBUG(21983,
1,
"mergeChunk interval [{minKey},{maxKey}) has already been committed",