diff options
21 files changed, 1161 insertions, 127 deletions
diff --git a/etc/evergreen.yml b/etc/evergreen.yml index 1669104c6be..d6e81a727c4 100644 --- a/etc/evergreen.yml +++ b/etc/evergreen.yml @@ -6308,7 +6308,7 @@ tasks: vars: depends_on: jsCore use_large_distro: "true" - resmoke_args: --storageEngine=wiredTiger + resmoke_args: "--storageEngine=wiredTiger --mongodSetParameters='{featureFlagDisableIncompleteShardingDDLSupport: true}'" fallback_num_sub_suites: 37 - name: tenant_migration_jscore_passthrough_gen diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index b537ddc91f9..c4f4d1e0e13 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -119,6 +119,7 @@ let viewsCommandTests = { _shardsvrDropCollection: {skip: isAnInternalCommand}, _shardsvrDropCollectionParticipant: {skip: isAnInternalCommand}, _shardsvrCreateCollection: {skip: isAnInternalCommand}, + _shardsvrCreateCollectionParticipant: {skip: isAnInternalCommand}, _shardsvrDropDatabase: {skip: isAnInternalCommand}, _shardsvrDropDatabaseParticipant: {skip: isAnInternalCommand}, _shardsvrMovePrimary: {skip: isAnInternalCommand}, diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index bfdcaa1eb47..77b461c2829 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -67,6 +67,7 @@ const allCommands = { _shardsvrDropCollection: {skip: isPrimaryOnly}, _shardsvrCreateCollection: {skip: isPrimaryOnly}, _shardsvrDropCollectionParticipant: {skip: isPrimaryOnly}, + _shardsvrCreateCollectionParticipant: {skip: isPrimaryOnly}, _shardsvrMovePrimary: {skip: isPrimaryOnly}, _shardsvrRenameCollection: {skip: isPrimaryOnly}, _shardsvrRenameCollectionParticipant: {skip: isAnInternalCommand}, diff --git a/jstests/replsets/tenant_migration_concurrent_writes.js b/jstests/replsets/tenant_migration_concurrent_writes.js index 40a199a2c9d..603738080b4 100644 --- a/jstests/replsets/tenant_migration_concurrent_writes.js +++ b/jstests/replsets/tenant_migration_concurrent_writes.js @@ -456,6 +456,7 @@ const testCases = { _recvChunkStatus: {skip: isNotRunOnUserDatabase}, _shardsvrCloneCatalogData: {skip: isNotRunOnUserDatabase}, _shardsvrCreateCollection: {skip: isOnlySupportedOnShardedCluster}, + _shardsvrCreateCollectionParticipant: {skip: isOnlySupportedOnShardedCluster}, _shardsvrMovePrimary: {skip: isNotRunOnUserDatabase}, _shardsvrShardCollection: {skip: isNotRunOnUserDatabase}, _shardsvrRenameCollection: {skip: isOnlySupportedOnShardedCluster}, diff --git a/jstests/sharding/index_commands_during_initial_split.js b/jstests/sharding/index_commands_during_initial_split.js index 39c70773ee9..350149b9d9c 100644 --- a/jstests/sharding/index_commands_during_initial_split.js +++ b/jstests/sharding/index_commands_during_initial_split.js @@ -59,6 +59,17 @@ function runCommandDuringShardCollection(st, ns, shardKey, zones, failpointName, const numShards = 4; const st = new ShardingTest({shards: numShards}); + +const featureFlagParam = assert.commandWorked( + st.configRS.getPrimary().adminCommand({getParameter: 1, featureFlagShardingFullDDLSupport: 1})); + +if (featureFlagParam.featureFlagShardingFullDDLSupport.value) { + jsTest.log( + 'Skipping test because featureFlagShardingFullDDLSupport feature flag is enabled and this test expects the exacty legacy steps, so, it is incompatible with the new path.'); + st.stop(); + return; +} + const allShards = []; for (let i = 0; i < numShards; i++) { allShards.push(st["shard" + i]); diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js index 24d2b31bb94..5c1705ea960 100644 --- a/jstests/sharding/read_write_concern_defaults_application.js +++ b/jstests/sharding/read_write_concern_defaults_application.js @@ -115,6 +115,7 @@ let testCases = { _recvChunkStatus: {skip: "internal command"}, _shardsvrCloneCatalogData: {skip: "internal command"}, _shardsvrCreateCollection: {skip: "internal command"}, + _shardsvrCreateCollectionParticipant: {skip: "internal command"}, _shardsvrDropCollection: {skip: "internal command"}, _shardsvrDropCollectionParticipant: {skip: "internal command"}, _shardsvrDropDatabase: {skip: "internal command"}, diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 593cf71a7ff..e2dd0f71059 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -130,6 +130,7 @@ env.Library( '$BUILD_DIR/mongo/db/transaction', '$BUILD_DIR/mongo/db/vector_clock_mongod', '$BUILD_DIR/mongo/s/query/cluster_aggregate', + '$BUILD_DIR/mongo/s/sharding_api', '$BUILD_DIR/mongo/s/sharding_initialization', 'chunk_splitter', 'sharding_api_d', @@ -326,6 +327,7 @@ env.Library( 'config/configsvr_shard_collection_command.cpp', 'config/configsvr_split_chunk_command.cpp', 'config/configsvr_update_zone_key_range_command.cpp', + 'create_collection_coordinator.cpp', 'drop_collection_coordinator.cpp', 'drop_database_coordinator.cpp', 'flush_database_cache_updates_command.cpp', @@ -344,6 +346,7 @@ env.Library( 'sharding_server_status.cpp', 'sharding_state_command.cpp', 'shardsvr_create_collection_command.cpp', + 'shardsvr_create_collection_participant_command.cpp', 'shardsvr_drop_collection_command.cpp', 'shardsvr_drop_collection_participant_command.cpp', 'shardsvr_drop_database_command.cpp', diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 3ec7fa7f13f..ba9fb36ede2 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -239,41 +239,41 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle std::unique_ptr<InitialSplitPolicy> InitialSplitPolicy::calculateOptimizationStrategy( OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern, - const ShardsvrShardCollectionRequest& request, + const std::int64_t numInitialChunks, + const bool presplitHashedZones, + const boost::optional<std::vector<BSONObj>>& initialSplitPoints, const std::vector<TagsType>& tags, size_t numShards, bool collectionIsEmpty) { uassert(ErrorCodes::InvalidOptions, str::stream() << "numInitialChunks is only supported when the collection is empty " "and has a hashed field in the shard key pattern", - !request.getNumInitialChunks() || - (shardKeyPattern.isHashedPattern() && collectionIsEmpty)); + !numInitialChunks || (shardKeyPattern.isHashedPattern() && collectionIsEmpty)); uassert(ErrorCodes::InvalidOptions, str::stream() << "When the prefix of the hashed shard key is a range field, " "'numInitialChunks' can only be used when the 'presplitHashedZones' is true", - !request.getNumInitialChunks() || shardKeyPattern.hasHashedPrefix() || - request.getPresplitHashedZones()); + !numInitialChunks || shardKeyPattern.hasHashedPrefix() || presplitHashedZones); uassert(ErrorCodes::InvalidOptions, str::stream() << "Cannot have both initial split points and tags set", - !request.getInitialSplitPoints() || tags.empty()); + !initialSplitPoints || tags.empty()); // If 'presplitHashedZones' flag is set, we always use 'PresplitHashedZonesSplitPolicy', to make // sure we throw the correct assertion if further validation fails. - if (request.getPresplitHashedZones()) { + if (presplitHashedZones) { return std::make_unique<PresplitHashedZonesSplitPolicy>( - opCtx, shardKeyPattern, tags, request.getNumInitialChunks(), collectionIsEmpty); + opCtx, shardKeyPattern, tags, numInitialChunks, collectionIsEmpty); } // The next preference is to use split points based strategy. This is only possible if // 'initialSplitPoints' is set, or if the collection is empty with shard key having a hashed // prefix. - if (request.getInitialSplitPoints()) { - return std::make_unique<SplitPointsBasedSplitPolicy>(*request.getInitialSplitPoints()); + if (initialSplitPoints) { + return std::make_unique<SplitPointsBasedSplitPolicy>(*initialSplitPoints); } if (tags.empty() && shardKeyPattern.hasHashedPrefix() && collectionIsEmpty) { return std::make_unique<SplitPointsBasedSplitPolicy>( - shardKeyPattern, numShards, request.getNumInitialChunks()); + shardKeyPattern, numShards, numInitialChunks); } if (!tags.empty()) { diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index 6687c53c933..91f1c3c4054 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -36,7 +36,6 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_tags.h" -#include "mongo/s/request_types/shard_collection_gen.h" #include "mongo/s/shard_id.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/string_map.h" @@ -61,7 +60,9 @@ public: static std::unique_ptr<InitialSplitPolicy> calculateOptimizationStrategy( OperationContext* opCtx, const ShardKeyPattern& shardKeyPattern, - const ShardsvrShardCollectionRequest& request, + const std::int64_t numInitialChunks, + const bool presplitHashedZones, + const boost::optional<std::vector<BSONObj>>& initialSplitPoints, const std::vector<TagsType>& tags, size_t numShards, bool collectionIsEmpty); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp index 85067a04d22..c3871c2a109 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp @@ -46,6 +46,7 @@ #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/request_types/shard_collection_gen.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" @@ -118,7 +119,9 @@ TEST_F(CreateFirstChunksTest, Split_Disallowed_With_Both_SplitPoints_And_Zones) ASSERT_THROWS_CODE( InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), kShardKeyPattern, - request, + request.getNumInitialChunks(), + request.getPresplitHashedZones(), + request.getInitialSplitPoints(), tags, 2 /* numShards */, true /* collectionIsEmpty */), @@ -128,7 +131,9 @@ TEST_F(CreateFirstChunksTest, Split_Disallowed_With_Both_SplitPoints_And_Zones) ASSERT_THROWS_CODE( InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), kShardKeyPattern, - request, + request.getNumInitialChunks(), + request.getPresplitHashedZones(), + request.getInitialSplitPoints(), tags, 2 /* numShards */, false /* collectionIsEmpty */), @@ -160,7 +165,9 @@ TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromSplitVector_Man auto optimization = InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), kShardKeyPattern, - request, + request.getNumInitialChunks(), + request.getPresplitHashedZones(), + request.getInitialSplitPoints(), {}, /* tags */ 3 /* numShards */, false /* collectionIsEmpty */); @@ -203,12 +210,15 @@ TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromClient_ManyChun ShardsvrShardCollectionRequest request; request.setInitialSplitPoints(splitPoints); - auto optimization = InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), - kShardKeyPattern, - request, - zones, - 3 /* numShards */, - collectionIsEmpty); + auto optimization = + InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), + kShardKeyPattern, + request.getNumInitialChunks(), + request.getPresplitHashedZones(), + request.getInitialSplitPoints(), + zones, + 3 /* numShards */, + collectionIsEmpty); ASSERT(optimization->isOptimized()); @@ -236,8 +246,15 @@ TEST_F(CreateFirstChunksTest, NonEmptyCollection_WithZones_OneChunkToPrimary) { bool collectionIsEmpty = false; ShardsvrShardCollectionRequest request; - auto optimization = InitialSplitPolicy::calculateOptimizationStrategy( - operationContext(), kShardKeyPattern, request, zones, 3 /* numShards */, collectionIsEmpty); + auto optimization = + InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), + kShardKeyPattern, + request.getNumInitialChunks(), + request.getPresplitHashedZones(), + request.getInitialSplitPoints(), + zones, + 3 /* numShards */, + collectionIsEmpty); ASSERT(optimization->isOptimized()); @@ -275,12 +292,15 @@ TEST_F(CreateFirstChunksTest, EmptyCollection_SplitPoints_FromClient_ManyChunksD ShardsvrShardCollectionRequest request; request.setInitialSplitPoints(splitPoints); - auto optimization = InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), - kShardKeyPattern, - request, - zones, - 3 /* numShards */, - collectionIsEmpty); + auto optimization = + InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), + kShardKeyPattern, + request.getNumInitialChunks(), + request.getPresplitHashedZones(), + request.getInitialSplitPoints(), + zones, + 3 /* numShards */, + collectionIsEmpty); ASSERT(optimization->isOptimized()); @@ -322,12 +342,15 @@ TEST_F(CreateFirstChunksTest, EmptyCollection_NoSplitPoints_OneChunkToPrimary) { ShardsvrShardCollectionRequest request; request.setInitialSplitPoints(splitPoints); - auto optimization = InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), - kShardKeyPattern, - request, - zones, - 3 /* numShards */, - collectionIsEmpty); + auto optimization = + InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), + kShardKeyPattern, + request.getNumInitialChunks(), + request.getPresplitHashedZones(), + request.getInitialSplitPoints(), + zones, + 3 /* numShards */, + collectionIsEmpty); ASSERT(optimization->isOptimized()); @@ -355,8 +378,15 @@ TEST_F(CreateFirstChunksTest, EmptyCollection_WithZones_ManyChunksOnFirstZoneSha ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))}; bool collectionIsEmpty = true; ShardsvrShardCollectionRequest request; - auto optimization = InitialSplitPolicy::calculateOptimizationStrategy( - operationContext(), kShardKeyPattern, request, zones, 3 /* numShards */, collectionIsEmpty); + auto optimization = + InitialSplitPolicy::calculateOptimizationStrategy(operationContext(), + kShardKeyPattern, + request.getNumInitialChunks(), + request.getPresplitHashedZones(), + request.getInitialSplitPoints(), + zones, + 3 /* numShards */, + collectionIsEmpty); ASSERT(optimization->isOptimized()); diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp new file mode 100644 index 00000000000..12dbc593b49 --- /dev/null +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -0,0 +1,659 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/create_collection_coordinator.h" +#include "mongo/db/s/shard_key_util.h" +#include "mongo/db/s/sharding_ddl_util.h" +#include "mongo/db/s/sharding_logging.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/logv2/log.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/grid.h" +#include "mongo/s/request_types/shard_collection_gen.h" +#include "mongo/s/sharded_collections_ddl_parameters_gen.h" +#include "mongo/s/write_ops/cluster_write.h" + +namespace mongo { +namespace { + +struct OptionsAndIndexes { + BSONObj options; + std::vector<BSONObj> indexSpecs; + BSONObj idIndexSpec; +}; + +OptionsAndIndexes getCollectionOptionsAndIndexes(OperationContext* opCtx, + const NamespaceStringOrUUID& nssOrUUID) { + DBDirectClient localClient(opCtx); + BSONObj idIndex; + BSONObjBuilder optionsBob; + + auto all = + localClient.getCollectionInfos(nssOrUUID.dbname(), BSON("info.uuid" << *nssOrUUID.uuid())); + + // There must be a collection at this time. + invariant(!all.empty()); + auto& entry = all.front(); + if (entry["options"].isABSONObj()) { + optionsBob.appendElements(entry["options"].Obj()); + } + optionsBob.append(entry["info"]["uuid"]); + idIndex = entry["idIndex"].Obj().getOwned(); + + auto indexSpecsList = localClient.getIndexSpecs(nssOrUUID, false, 0); + + return {optionsBob.obj(), + std::vector<BSONObj>(std::begin(indexSpecsList), std::end(indexSpecsList)), + idIndex}; +} + +/** + * Compares the proposed shard key with the shard key of the collection's existing zones + * to ensure they are a legal combination. + */ +void validateShardKeyAgainstExistingZones(OperationContext* opCtx, + const BSONObj& proposedKey, + const ShardKeyPattern& shardKeyPattern, + const std::vector<TagsType>& tags) { + for (const auto& tag : tags) { + BSONObjIterator tagMinFields(tag.getMinKey()); + BSONObjIterator tagMaxFields(tag.getMaxKey()); + BSONObjIterator proposedFields(proposedKey); + + while (tagMinFields.more() && proposedFields.more()) { + BSONElement tagMinKeyElement = tagMinFields.next(); + BSONElement tagMaxKeyElement = tagMaxFields.next(); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "the min and max of the existing zone " << tag.getMinKey() + << " -->> " << tag.getMaxKey() << " have non-matching keys", + tagMinKeyElement.fieldNameStringData() == + tagMaxKeyElement.fieldNameStringData()); + + BSONElement proposedKeyElement = proposedFields.next(); + bool match = ((tagMinKeyElement.fieldNameStringData() == + proposedKeyElement.fieldNameStringData()) && + ((tagMinFields.more() && proposedFields.more()) || + (!tagMinFields.more() && !proposedFields.more()))); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "the proposed shard key " << proposedKey.toString() + << " does not match with the shard key of the existing zone " + << tag.getMinKey() << " -->> " << tag.getMaxKey(), + match); + + // If the field is hashed, make sure that the min and max values are of supported type. + uassert( + ErrorCodes::InvalidOptions, + str::stream() << "cannot do hash sharding with the proposed key " + << proposedKey.toString() << " because there exists a zone " + << tag.getMinKey() << " -->> " << tag.getMaxKey() + << " whose boundaries are not of type NumberLong, MinKey or MaxKey", + !ShardKeyPattern::isHashedPatternEl(proposedKeyElement) || + (ShardKeyPattern::isValidHashedValue(tagMinKeyElement) && + ShardKeyPattern::isValidHashedValue(tagMaxKeyElement))); + } + } +} + +std::vector<TagsType> getTagsAndValidate(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& proposedKey, + const ShardKeyPattern& shardKeyPattern) { + // Read zone info + const auto catalogClient = Grid::get(opCtx)->catalogClient(); + auto tags = uassertStatusOK(catalogClient->getTagsForCollection(opCtx, nss)); + + if (!tags.empty()) { + validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tags); + } + + return tags; +} + +boost::optional<UUID> getUUID(OperationContext* opCtx, const NamespaceString& nss) { + AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden); + const auto& coll = autoColl.getCollection(); + return coll->uuid(); +} + +boost::optional<UUID> getUUIDFromPrimaryShard(OperationContext* opCtx, const NamespaceString& nss) { + // Obtain the collection's UUID from the primary shard's listCollections response. + DBDirectClient localClient(opCtx); + BSONObj res; + { + std::list<BSONObj> all = + localClient.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); + if (!all.empty()) { + res = all.front().getOwned(); + } + } + + uassert(ErrorCodes::InternalError, + str::stream() << "expected to have an entry for " << nss.toString() + << " in listCollections response, but did not", + !res.isEmpty()); + + BSONObj collectionInfo; + if (res["info"].type() == BSONType::Object) { + collectionInfo = res["info"].Obj(); + } + + uassert(ErrorCodes::InternalError, + str::stream() << "expected to return 'info' field as part of " + "listCollections for " + << nss.ns() + << " because the cluster is in featureCompatibilityVersion=3.6, but got " + << res, + !collectionInfo.isEmpty()); + + uassert(ErrorCodes::InternalError, + str::stream() << "expected to return a UUID for collection " << nss.ns() + << " as part of 'info' field but got " << res, + collectionInfo.hasField("uuid")); + + return uassertStatusOK(UUID::parse(collectionInfo["uuid"])); +} + +bool checkIfCollectionIsEmpty(OperationContext* opCtx, const NamespaceString& nss) { + // Use find with predicate instead of count in order to ensure that the count + // command doesn't just consult the cached metadata, which may not always be + // correct + DBDirectClient localClient(opCtx); + return localClient.findOne(nss.ns(), Query()).isEmpty(); +} + +int getNumShards(OperationContext* opCtx) { + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + shardRegistry->reload(opCtx); + + return shardRegistry->getNumShards(opCtx); +} + +BSONObj getCollation(OperationContext* opCtx, + const NamespaceString& nss, + const boost::optional<BSONObj>& collation) { + // Ensure the collation is valid. Currently we only allow the simple collation. + std::unique_ptr<CollatorInterface> requestedCollator = nullptr; + if (collation) { + requestedCollator = + uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(collation.value())); + uassert(ErrorCodes::BadValue, + str::stream() << "The collation for shardCollection must be {locale: 'simple'}, " + << "but found: " << collation.value(), + !requestedCollator); + } + + AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden); + + const auto actualCollator = [&]() -> const CollatorInterface* { + const auto& coll = autoColl.getCollection(); + if (coll) { + uassert( + ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped()); + return coll->getDefaultCollator(); + } + + return nullptr; + }(); + + if (!requestedCollator && !actualCollator) + return BSONObj(); + + auto actualCollatorBSON = actualCollator->getSpec().toBSON(); + + if (!collation) { + auto actualCollatorFilter = + uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) + ->makeFromBSON(actualCollatorBSON)); + uassert(ErrorCodes::BadValue, + str::stream() << "If no collation was specified, the collection collation must be " + "{locale: 'simple'}, " + << "but found: " << actualCollatorBSON, + !actualCollatorFilter); + } + + return actualCollatorBSON; +} + +void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) { + BatchWriteExecStats stats; + BatchedCommandResponse response; + BatchedCommandRequest updateRequest([&]() { + write_ops::Update updateOp(ChunkType::ConfigNS); + std::vector<write_ops::UpdateOpEntry> entries; + entries.reserve(chunks.size()); + for (const auto& chunk : chunks) { + write_ops::UpdateOpEntry entry( + BSON(ChunkType::collectionUUID << chunk.getCollectionUUID() << ChunkType::shard + << chunk.getShard() << ChunkType::min + << chunk.getMin()), + write_ops::UpdateModification::parseFromClassicUpdate(chunk.toConfigBSON())); + entry.setUpsert(true); + entry.setMulti(false); + entries.push_back(entry); + } + updateOp.setUpdates(entries); + return updateOp; + }()); + + updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + + ClusterWriter::write(opCtx, updateRequest, &stats, &response); + uassertStatusOK(response.toStatus()); +} + +void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, CollectionType& coll) { + BatchWriteExecStats stats; + BatchedCommandResponse response; + BatchedCommandRequest updateRequest([&]() { + write_ops::Update updateOp(CollectionType::ConfigNS); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(CollectionType::kNssFieldName << nss.ns())); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(coll.toBSON())); + entry.setUpsert(true); + entry.setMulti(false); + return entry; + }()}); + return updateOp; + }()); + + updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + try { + ClusterWriter::write(opCtx, updateRequest, &stats, &response); + uassertStatusOK(response.toStatus()); + } catch (const DBException&) { + // If an error happens when contacting the config server, we don't know if the update + // succeded or not, which might cause the local shard version to differ from the config + // server, so we clear the metadata to allow another operation to refresh it. + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + AutoGetCollection autoColl(opCtx, nss, MODE_IX); + CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx); + throw; + } +} + +} // namespace + +CreateCollectionCoordinator::CreateCollectionCoordinator( + OperationContext* opCtx, const ShardsvrCreateCollection& createCollParams) + : ShardingDDLCoordinator(opCtx, createCollParams.getNamespace()), + _serviceContext(opCtx->getServiceContext()), + _request(createCollParams), + _nss(_request.getNamespace()) { + invariant(createCollParams.getShardKey()); + _shardKeyPattern = ShardKeyPattern(createCollParams.getShardKey()->getOwned()); +} + +SemiFuture<void> CreateCollectionCoordinator::runImpl( + std::shared_ptr<executor::TaskExecutor> executor) { + return ExecutorFuture<void>(executor, Status::OK()) + .then([this, anchor = shared_from_this()]() { + ThreadClient tc("CreateCollectionCoordinator", _serviceContext); + auto opCtxHolder = tc->makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + _forwardableOpMetadata.setOn(opCtx); + + _checkCommandArguments(opCtx); + if (_result) { + // Early return before holding the critical section, the collection was already + // created. + return; + } + { + // From this point on all writes are blocked on the collection. + ScopedShardVersionCriticalSection critSec(opCtx, _nss); + + _createCollectionAndIndexes(opCtx); + if (_result) { + // Early return, the collection was already created. + return; + } + + _createChunks(opCtx); + if (_splitPolicy->isOptimized()) { + // Block reads/writes from here on if we need to create the collection on other + // shards, this way we prevent reads/writes that should be redirected to another + // shard. + critSec.enterCommitPhase(); + _createCollectionOnNonPrimaryShards(opCtx); + + _commit(opCtx); + } + } + + if (!_splitPolicy->isOptimized()) { + _commit(opCtx); + } + + _cleanup(opCtx); + }) + .onError([this, anchor = shared_from_this()](const Status& status) { + LOGV2_ERROR(5277908, + "Error running create collection", + "namespace"_attr = _nss, + "error"_attr = redact(status)); + return status; + }) + .semi(); +} + + +void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) { + LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = _nss); + + const auto dbInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabaseWithRefresh(opCtx, _nss.db())); + + uassert(ErrorCodes::IllegalOperation, + str::stream() << "sharding not enabled for db " << _nss.db(), + dbInfo.shardingEnabled()); + + if (_nss.db() == NamespaceString::kConfigDb) { + // Only whitelisted collections in config may be sharded (unless we are in test mode) + uassert(ErrorCodes::IllegalOperation, + "only special collections in the config db may be sharded", + _nss == NamespaceString::kLogicalSessionsNamespace); + } + + // Ensure that hashed and unique are not both set. + uassert(ErrorCodes::InvalidOptions, + "Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on " + "the hashed field by declaring an additional (non-hashed) unique index on the field.", + !_shardKeyPattern.value().isHashedPattern() || + !(_request.getUnique() && _request.getUnique().value())); + + // Ensure the namespace is valid. + uassert(ErrorCodes::IllegalOperation, + "can't shard system namespaces", + !_nss.isSystem() || _nss == NamespaceString::kLogicalSessionsNamespace || + _nss.isTemporaryReshardingCollection()); + + if (_request.getNumInitialChunks()) { + // Ensure numInitialChunks is within valid bounds. + // Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000 + // chunks in total to limit the amount of memory this command consumes so there is less + // danger of an OOM error. + + const int maxNumInitialChunksForShards = + Grid::get(opCtx)->shardRegistry()->getNumShardsNoReload() * 8192; + const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption + int numChunks = _request.getNumInitialChunks().value(); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "numInitialChunks cannot be more than either: " + << maxNumInitialChunksForShards << ", 8192 * number of shards; or " + << maxNumInitialChunksTotal, + numChunks >= 0 && numChunks <= maxNumInitialChunksForShards && + numChunks <= maxNumInitialChunksTotal); + } + + if (_nss.db() == NamespaceString::kConfigDb) { + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + auto findReponse = uassertStatusOK( + configShard->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + _nss, + BSONObj(), + BSONObj(), + 1)); + + auto numDocs = findReponse.docs.size(); + + // If this is a collection on the config db, it must be empty to be sharded. + uassert(ErrorCodes::IllegalOperation, + "collections in the config db must be empty to be sharded", + numDocs == 0); + } + + auto unique = _request.getUnique() ? *_request.getUnique() : false; + if (auto createCollectionResponseOpt = + mongo::sharding_ddl_util::checkIfCollectionAlreadySharded( + opCtx, + _nss, + _shardKeyPattern->getKeyPattern().toBSON(), + getCollation(opCtx, _nss, _request.getCollation()), + unique)) { + _result = createCollectionResponseOpt; + } +} + +void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* opCtx) { + LOGV2_DEBUG( + 5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = _nss); + + auto unique = _request.getUnique() ? *_request.getUnique() : false; + _collation = getCollation(opCtx, _nss, _request.getCollation()); + + if (auto createCollectionResponseOpt = + mongo::sharding_ddl_util::checkIfCollectionAlreadySharded( + opCtx, _nss, _shardKeyPattern->getKeyPattern().toBSON(), *_collation, unique)) { + _result = createCollectionResponseOpt; + return; + } + + // Internally creates the collection if it doesn't exist. + shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible( + opCtx, + _nss, + _shardKeyPattern->toBSON(), + *_shardKeyPattern, + _collation, + _request.getUnique() ? *_request.getUnique() : false, + shardkeyutil::ValidationBehaviorsShardCollection(opCtx)); + + _collectionUUID = *getUUIDFromPrimaryShard(opCtx, _nss); +} + +void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) { + LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = _nss); + + _splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy( + opCtx, + *_shardKeyPattern, + _request.getNumInitialChunks() ? *_request.getNumInitialChunks() : 0, + _request.getPresplitHashedZones() ? *_request.getPresplitHashedZones() : false, + _request.getInitialSplitPoints(), + getTagsAndValidate(opCtx, _nss, _shardKeyPattern->toBSON(), *_shardKeyPattern), + getNumShards(opCtx), + checkIfCollectionIsEmpty(opCtx, _nss)); + + _initialChunks = _splitPolicy->createFirstChunks( + opCtx, *_shardKeyPattern, {_nss, *_collectionUUID, ShardingState::get(opCtx)->shardId()}); + + // There must be at least one chunk. + invariant(!_initialChunks.chunks.empty()); +} + +void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationContext* opCtx) { + LOGV2_DEBUG(5277905, + 2, + "Create collection _createCollectionOnNonPrimaryShards", + "namespace"_attr = _nss); + + std::vector<AsyncRequestsSender::Request> requests; + std::set<ShardId> initializedShards; + auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId(); + + NamespaceStringOrUUID nssOrUUID{_nss.db().toString(), *_collectionUUID}; + auto [collOptions, indexes, idIndex] = getCollectionOptionsAndIndexes(opCtx, nssOrUUID); + + for (const auto& chunk : _initialChunks.chunks) { + const auto& chunkShardId = chunk.getShard(); + if (chunkShardId == dbPrimaryShardId || + initializedShards.find(chunkShardId) != initializedShards.end()) { + continue; + } + + ShardsvrCreateCollectionParticipant createCollectionParticipantRequest(_nss); + createCollectionParticipantRequest.setCollectionUUID(*_collectionUUID); + + createCollectionParticipantRequest.setOptions(collOptions); + createCollectionParticipantRequest.setIdIndex(idIndex); + createCollectionParticipantRequest.setIndexes(indexes); + + requests.emplace_back( + chunkShardId, + createCollectionParticipantRequest.toBSON( + BSON("writeConcern" << ShardingCatalogClient::kMajorityWriteConcern.toBSON()))); + + initializedShards.emplace(chunkShardId); + } + + if (!requests.empty()) { + auto responses = gatherResponses(opCtx, + _nss.db(), + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent, + requests); + + // If any shards fail to create the collection, fail the entire shardCollection command + // (potentially leaving incomplely created sharded collection) + for (const auto& response : responses) { + auto shardResponse = uassertStatusOKWithContext( + std::move(response.swResponse), + str::stream() << "Unable to create collection " << _nss.ns() << " on " + << response.shardId); + auto status = getStatusFromCommandResult(shardResponse.data); + uassertStatusOK(status.withContext(str::stream() + << "Unable to create collection " << _nss.ns() + << " on " << response.shardId)); + + auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data); + uassertStatusOK(wcStatus.withContext(str::stream() + << "Unable to create collection " << _nss.ns() + << " on " << response.shardId)); + } + } +} + +void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { + LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = _nss); + + // Upsert Chunks. + upsertChunks(opCtx, _initialChunks.chunks); + + CollectionType coll(_nss, + _initialChunks.collVersion().epoch(), + _initialChunks.creationTime, + Date_t::now(), + *_collectionUUID); + + coll.setKeyPattern(_shardKeyPattern->toBSON()); + + if (_collation) { + coll.setDefaultCollation(_collation.value()); + } + + if (_request.getUnique()) { + coll.setUnique(*_request.getUnique()); + } + + updateCatalogEntry(opCtx, _nss, coll); +} + +void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) { + LOGV2_DEBUG(5277907, 2, "Create collection _cleanup", "namespace"_attr = _nss); + + try { + forceShardFilteringMetadataRefresh(opCtx, _nss); + } catch (const DBException&) { + // If the refresh fails, then set the shard version to UNKNOWN and let a future operation to + // refresh the metadata. + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX); + CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx); + } + + // Is it really necessary to refresh all shards? or can I assume that the shard version will be + // unknown and refreshed eventually? + auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId(); + + std::set<ShardId> shardsRefreshed; + for (const auto& chunk : _initialChunks.chunks) { + const auto& chunkShardId = chunk.getShard(); + if (chunkShardId == dbPrimaryShardId || + shardsRefreshed.find(chunkShardId) != shardsRefreshed.end()) { + continue; + } + + auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, chunkShardId)); + try { + auto refreshCmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("_flushRoutingTableCacheUpdates" << _nss.ns()), + Seconds{30}, + Shard::RetryPolicy::kIdempotent)); + + uassertStatusOK(refreshCmdResponse.commandStatus); + } catch (const DBException& ex) { + LOGV2_WARNING(5277909, + "Could not refresh shard", + "shardId"_attr = shard->getId(), + "error"_attr = redact(ex.reason())); + } + shardsRefreshed.emplace(chunkShardId); + } + + LOGV2(5277901, + "Created initial chunk(s)", + "namespace"_attr = _nss, + "numInitialChunks"_attr = _initialChunks.chunks.size(), + "initialCollectionVersion"_attr = _initialChunks.collVersion()); + + + ShardingLogging::get(opCtx)->logChange( + opCtx, + "shardCollection.end", + _nss.ns(), + BSON("version" << _initialChunks.collVersion().toString() << "numChunks" + << static_cast<int>(_initialChunks.chunks.size())), + ShardingCatalogClient::kMajorityWriteConcern); + + auto result = CreateCollectionResponse( + _initialChunks.chunks[_initialChunks.chunks.size() - 1].getVersion()); + result.setCollectionUUID(_collectionUUID); + _result = result; +} + +} // namespace mongo diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h new file mode 100644 index 00000000000..60f107487e2 --- /dev/null +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/s/config/initial_split_policy.h" +#include "mongo/db/s/shard_filtering_metadata_refresh.h" +#include "mongo/db/s/sharding_ddl_coordinator.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" +#include "mongo/util/future.h" + +namespace mongo { + +class CreateCollectionCoordinator final + : public ShardingDDLCoordinator, + public std::enable_shared_from_this<CreateCollectionCoordinator> { +public: + CreateCollectionCoordinator(OperationContext* opCtx, const ShardsvrCreateCollection& request); + + /** + * Returns the information of the newly created collection, or the already existing one. It must + * be called after a successfull execution of run. + */ + const CreateCollectionResponse& getResultOnSuccess() { + return *_result; + } + +private: + SemiFuture<void> runImpl(std::shared_ptr<executor::TaskExecutor> executor) override; + + /** + * Performs all required checks before holding the critical sections. + */ + void _checkCommandArguments(OperationContext* opCtx); + + /** + * Ensures the collection is created locally and has the appropiate shard index. + */ + void _createCollectionAndIndexes(OperationContext* opCtx); + + /** + * Given the appropiate split policy, create the initial chunks. + */ + void _createChunks(OperationContext* opCtx); + + /** + * If the optimized path can be taken, ensure the collection is already created in all the + * participant shards. + */ + void _createCollectionOnNonPrimaryShards(OperationContext* opCtx); + + /** + * Does the following writes: + * 1. Updates the config.collections entry for the new sharded collection + * 2. Updates config.chunks entries for the new sharded collection + */ + void _commit(OperationContext* opCtx); + + /** + * Refresh all participant shards and log creation. + */ + void _cleanup(OperationContext* opCtx); + + ServiceContext* _serviceContext; + const ShardsvrCreateCollection _request; + const NamespaceString& _nss; + + boost::optional<ShardKeyPattern> _shardKeyPattern; + boost::optional<BSONObj> _collation; + boost::optional<UUID> _collectionUUID; + std::unique_ptr<InitialSplitPolicy> _splitPolicy; + InitialSplitPolicy::ShardCollectionConfig _initialChunks; + boost::optional<CreateCollectionResponse> _result; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/shard_collection_legacy.cpp b/src/mongo/db/s/shard_collection_legacy.cpp index 47e82cd0cc0..766f1bb39ef 100644 --- a/src/mongo/db/s/shard_collection_legacy.cpp +++ b/src/mongo/db/s/shard_collection_legacy.cpp @@ -51,6 +51,7 @@ #include "mongo/db/s/shard_collection_legacy.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/shard_key_util.h" +#include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_logging.h" #include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" @@ -101,38 +102,11 @@ void uassertStatusOKWithWarning(const Status& status) { } } -/** - * Throws an exception if the collection is already sharded with different options. - * - * If the collection is already sharded with the same options, returns the existing collection's - * full spec, else returns boost::none. - */ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadyShardedWithSameOptions( OperationContext* opCtx, const ShardsvrShardCollectionRequest& request) { const auto& nss = *request.get_shardsvrShardCollection(); - auto cm = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss)); - - if (!cm.isSharded()) { - return boost::none; - } - - auto defaultCollator = - cm.getDefaultCollator() ? cm.getDefaultCollator()->getSpec().toBSON() : BSONObj(); - - // If the collection is already sharded, fail if the deduced options in this request do not - // match the options the collection was originally sharded with. - uassert(ErrorCodes::AlreadyInitialized, - str::stream() << "sharding already enabled for collection " << nss, - SimpleBSONObjComparator::kInstance.evaluate(cm.getShardKeyPattern().toBSON() == - request.getKey()) && - SimpleBSONObjComparator::kInstance.evaluate(defaultCollator == - *request.getCollation()) && - cm.isUnique() == request.getUnique()); - - CreateCollectionResponse response(cm.getVersion()); - response.setCollectionUUID(cm.getUUID()); - return response; + return mongo::sharding_ddl_util::checkIfCollectionAlreadySharded( + opCtx, nss, request.getKey(), *request.getCollation(), request.getUnique()); } void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) { @@ -584,7 +558,7 @@ CreateCollectionResponse shardCollection(OperationContext* opCtx, // path. boost::optional<DistLockManager::ScopedDistLock> dbDistLock; boost::optional<DistLockManager::ScopedDistLock> collDistLock; - if (!mustTakeDistLock) { + if (mustTakeDistLock) { dbDistLock.emplace(uassertStatusOK(DistLockManager::get(opCtx)->lock( opCtx, nss.db(), "shardCollection", DistLockManager::kDefaultLockTimeout))); collDistLock.emplace(uassertStatusOK(DistLockManager::get(opCtx)->lock( @@ -604,14 +578,20 @@ CreateCollectionResponse shardCollection(OperationContext* opCtx, // If DistLock must not be taken, then the request came from the config server, there is no // need to check this here. - if (!mustTakeDistLock) { + if (mustTakeDistLock) { if (nss.db() == NamespaceString::kConfigDb) { - BSONObj countResult; - DBDirectClient client(opCtx); - if (!client.runCommand( - nss.db().toString(), BSON("count" << nss.coll()), countResult)) - uassertStatusOK(getStatusFromCommandResult(countResult)); - auto numDocs = countResult["n"].Int(); + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + + auto findReponse = uassertStatusOK(configShard->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + nss, + BSONObj(), + BSONObj(), + 1)); + + auto numDocs = findReponse.docs.size(); // If this is a collection on the config db, it must be empty to be sharded. uassert(ErrorCodes::IllegalOperation, @@ -642,11 +622,12 @@ CreateCollectionResponse shardCollection(OperationContext* opCtx, splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy(opCtx, targetState->shardKeyPattern, - request, + request.getNumInitialChunks(), + request.getPresplitHashedZones(), + request.getInitialSplitPoints(), targetState->tags, getNumShards(opCtx), targetState->collectionIsEmpty); - boost::optional<CollectionUUID> optCollectionUUID; if (shouldUseUUIDForChunkIndexing) { optCollectionUUID = targetState->uuid; @@ -709,7 +690,7 @@ CreateCollectionResponse shardCollection(OperationContext* opCtx, CreateCollectionResponse shardCollectionLegacy(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& cmdObj, - bool legacyPath) { + bool requestFromCSRS) { auto request = ShardsvrShardCollectionRequest::parse( IDLParserErrorContext("_shardsvrShardCollection"), cmdObj); if (!request.getCollation()) @@ -732,8 +713,12 @@ CreateCollectionResponse shardCollectionLegacy(OperationContext* opCtx, response = scopedShardCollection.getResponse().get(); } else { try { - response = shardCollection( - opCtx, nss, cmdObj, request, ShardingState::get(opCtx)->shardId(), legacyPath); + response = shardCollection(opCtx, + nss, + cmdObj, + request, + ShardingState::get(opCtx)->shardId(), + !requestFromCSRS); } catch (const DBException& e) { scopedShardCollection.emplaceResponse(e.toStatus()); throw; diff --git a/src/mongo/db/s/shard_collection_legacy.h b/src/mongo/db/s/shard_collection_legacy.h index 2cf109feabf..f245c9265f1 100644 --- a/src/mongo/db/s/shard_collection_legacy.h +++ b/src/mongo/db/s/shard_collection_legacy.h @@ -34,9 +34,14 @@ namespace mongo { +/** + * Executes the legacy path of shard collection. If requestFromCSRS is false, then the checks and + * operations performed by the config server (like taking the distributed lock on the database and + * the collection) must be done on the shard instead. + */ CreateCollectionResponse shardCollectionLegacy(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& cmdObj, - bool mustTakeDistLock); + bool requestFromCSRS); } // namespace mongo diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp index ca117a3657b..5a28e117aea 100644 --- a/src/mongo/db/s/sharding_ddl_coordinator.cpp +++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp @@ -44,28 +44,29 @@ ShardingDDLCoordinator::ShardingDDLCoordinator(OperationContext* opCtx, const Na : _nss(ns), _forwardableOpMetadata(opCtx){}; SemiFuture<void> ShardingDDLCoordinator::run(OperationContext* opCtx) { - // Check that the operation context has a database version for this namespace - const auto clientDbVersion = OperationShardingState::get(opCtx).getDbVersion(_nss.db()); - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Request sent without attaching database version", - clientDbVersion); + if (!_nss.isConfigDB()) { + // Check that the operation context has a database version for this namespace + const auto clientDbVersion = OperationShardingState::get(opCtx).getDbVersion(_nss.db()); + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Request sent without attaching database version", + clientDbVersion); - // Checks that this is the primary shard for the namespace's db - const auto dbPrimaryShardId = [&]() { - Lock::DBLock dbWriteLock(opCtx, _nss.db(), MODE_IS); - auto dss = DatabaseShardingState::get(opCtx, _nss.db()); - auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss); - // The following call will also ensure that the database version matches - return dss->getDatabaseInfo(opCtx, dssLock).getPrimary(); - }(); + // Checks that this is the primary shard for the namespace's db + const auto dbPrimaryShardId = [&]() { + Lock::DBLock dbWriteLock(opCtx, _nss.db(), MODE_IS); + auto dss = DatabaseShardingState::get(opCtx, _nss.db()); + auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss); + // The following call will also ensure that the database version matches + return dss->getDatabaseInfo(opCtx, dssLock).getPrimary(); + }(); - const auto thisShardId = ShardingState::get(opCtx)->shardId(); - - uassert(ErrorCodes::IllegalOperation, - str::stream() << "This is not the primary shard for db " << _nss.db() - << " expected: " << dbPrimaryShardId << " shardId: " << thisShardId, - dbPrimaryShardId == thisShardId); + const auto thisShardId = ShardingState::get(opCtx)->shardId(); + uassert(ErrorCodes::IllegalOperation, + str::stream() << "This is not the primary shard for db " << _nss.db() + << " expected: " << dbPrimaryShardId << " shardId: " << thisShardId, + dbPrimaryShardId == thisShardId); + } return runImpl(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); } diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp index 3b29130e2ff..9e0891dfb13 100644 --- a/src/mongo/db/s/sharding_ddl_util.cpp +++ b/src/mongo/db/s/sharding_ddl_util.cpp @@ -206,5 +206,34 @@ void checkShardedRenamePreconditions(OperationContext* opCtx, tags.empty()); } +boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded( + OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& key, + const BSONObj& collation, + bool unique) { + auto cm = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss)); + + if (!cm.isSharded()) { + return boost::none; + } + + auto defaultCollator = + cm.getDefaultCollator() ? cm.getDefaultCollator()->getSpec().toBSON() : BSONObj(); + + // If the collection is already sharded, fail if the deduced options in this request do not + // match the options the collection was originally sharded with. + uassert(ErrorCodes::AlreadyInitialized, + str::stream() << "sharding already enabled for collection " << nss, + SimpleBSONObjComparator::kInstance.evaluate(cm.getShardKeyPattern().toBSON() == key) && + SimpleBSONObjComparator::kInstance.evaluate(defaultCollator == collation) && + cm.isUnique() == unique); + + CreateCollectionResponse response(cm.getVersion()); + response.setCollectionUUID(cm.getUUID()); + return response; +} + } // namespace sharding_ddl_util } // namespace mongo diff --git a/src/mongo/db/s/sharding_ddl_util.h b/src/mongo/db/s/sharding_ddl_util.h index c98c7a50a64..74b4fce3c57 100644 --- a/src/mongo/db/s/sharding_ddl_util.h +++ b/src/mongo/db/s/sharding_ddl_util.h @@ -29,6 +29,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" namespace mongo { namespace sharding_ddl_util { @@ -61,5 +62,18 @@ void checkShardedRenamePreconditions(OperationContext* opCtx, const NamespaceString& toNss, const bool dropTarget); +/** + * Throws an exception if the collection is already sharded with different options. + * + * If the collection is already sharded with the same options, returns the existing collection's + * full spec, else returns boost::none. + */ +boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded( + OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& key, + const BSONObj& collation, + bool unique); + } // namespace sharding_ddl_util } // namespace mongo diff --git a/src/mongo/db/s/shardsvr_create_collection_command.cpp b/src/mongo/db/s/shardsvr_create_collection_command.cpp index 4aa3d632b3c..23efc3558d3 100644 --- a/src/mongo/db/s/shardsvr_create_collection_command.cpp +++ b/src/mongo/db/s/shardsvr_create_collection_command.cpp @@ -36,11 +36,15 @@ #include "mongo/db/commands.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/s/create_collection_coordinator.h" +#include "mongo/db/s/dist_lock_manager.h" #include "mongo/db/s/shard_collection_legacy.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/logv2/log.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/shard_collection_gen.h" #include "mongo/s/request_types/sharded_ddl_commands_gen.h" +#include "mongo/s/sharded_collections_ddl_parameters_gen.h" namespace mongo { namespace { @@ -158,6 +162,23 @@ CreateCollectionResponse createCollectionLegacy(OperationContext* opCtx, return shardCollectionLegacy(opCtx, nss, shardsvrShardCollectionRequest.toBSON(), false); } +CreateCollectionResponse createCollection(OperationContext* opCtx, + const NamespaceString& nss, + const ShardsvrCreateCollection& request) { + uassert( + ErrorCodes::NotImplemented, "create collection not implemented yet", request.getShardKey()); + + DistLockManager::ScopedDistLock dbDistLock(uassertStatusOK(DistLockManager::get(opCtx)->lock( + opCtx, nss.db(), "shardCollection", DistLockManager::kDefaultLockTimeout))); + DistLockManager::ScopedDistLock collDistLock(uassertStatusOK(DistLockManager::get(opCtx)->lock( + opCtx, nss.ns(), "shardCollection", DistLockManager::kDefaultLockTimeout))); + + auto createCollectionCoordinator = + std::make_shared<CreateCollectionCoordinator>(opCtx, request); + createCollectionCoordinator->run(opCtx).get(opCtx); + return createCollectionCoordinator->getResultOnSuccess(); +} + class ShardsvrCreateCollectionCommand final : public TypedCommand<ShardsvrCreateCollectionCommand> { public: using Request = ShardsvrCreateCollection; @@ -194,7 +215,18 @@ public: "Create Collection path has not been implemented", request().getShardKey()); - return createCollectionLegacy(opCtx, ns(), request()); + if (feature_flags::gShardingFullDDLSupport.isEnabled( + serverGlobalParams.featureCompatibility)) { + LOGV2_DEBUG( + 5277910, 1, "Running new create collection procedure", "namespace"_attr = ns()); + return createCollection(opCtx, ns(), request()); + } else { + LOGV2_DEBUG(5277911, + 1, + "Running legacy create collection procedure", + "namespace"_attr = ns()); + return createCollectionLegacy(opCtx, ns(), request()); + } } private: diff --git a/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp new file mode 100644 index 00000000000..243d0d84737 --- /dev/null +++ b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/s/migration_destination_manager.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/s/request_types/sharded_ddl_commands_gen.h" + +namespace mongo { +namespace { + +class ShardsvrCreateCollectionParticipantCommand final + : public TypedCommand<ShardsvrCreateCollectionParticipantCommand> { +public: + using Request = ShardsvrCreateCollectionParticipant; + + bool adminOnly() const override { + return false; + } + + std::string help() const override { + return "Internal command. Do not call directly. Creates a collection."; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + auto const shardingState = ShardingState::get(opCtx); + uassertStatusOK(shardingState->canAcceptShardedCommands()); + + uassert(ErrorCodes::InvalidOptions, + str::stream() << "_shardsvrCreateCollectionParticipant must be called with " + "majority writeConcern, got " + << request().toBSON(BSONObj()), + opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); + + + MigrationDestinationManager::cloneCollectionIndexesAndOptions( + opCtx, + ns(), + {*request().getCollectionUUID(), + request().getIndexes(), + request().getIdIndex(), + request().getOptions()}); + } + + private: + NamespaceString ns() const override { + return request().getNamespace(); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + +} shardsvrCreateCollectionCommand; + +} // namespace +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index fb57d689248..41dfe3cdf33 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -53,6 +53,45 @@ namespace mongo { namespace { +std::vector<AsyncRequestsSender::Request> buildUnshardedRequestsForAllShards( + OperationContext* opCtx, std::vector<ShardId> shardIds, const BSONObj& cmdObj) { + auto cmdToSend = cmdObj; + appendShardVersion(cmdToSend, ChunkVersion::UNSHARDED()); + + std::vector<AsyncRequestsSender::Request> requests; + for (auto&& shardId : shardIds) + requests.emplace_back(std::move(shardId), cmdToSend); + + return requests; +} + +AsyncRequestsSender::Response executeCommandAgainstDatabasePrimaryOrFirstShard( + OperationContext* opCtx, + StringData dbName, + const CachedDatabaseInfo& dbInfo, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + Shard::RetryPolicy retryPolicy) { + ShardId shardId; + if (dbName == NamespaceString::kConfigDb) { + auto shardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); + uassert(ErrorCodes::IllegalOperation, "there are no shards to target", !shardIds.empty()); + std::sort(shardIds.begin(), shardIds.end()); + shardId = shardIds[0]; + } else { + shardId = dbInfo.primaryId(); + } + + auto responses = + gatherResponses(opCtx, + dbName, + readPref, + retryPolicy, + buildUnshardedRequestsForAllShards( + opCtx, {shardId}, appendDbVersionIfPresent(cmdObj, dbInfo))); + return std::move(responses.front()); +} + class ShardCollectionCmd : public BasicCommand { public: ShardCollectionCmd() : BasicCommand("shardCollection", "shardcollection") {} @@ -110,40 +149,31 @@ public: auto catalogCache = Grid::get(opCtx)->catalogCache(); const auto dbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, nss.db())); - ShardId shardId; - if (nss.db() == NamespaceString::kConfigDb) { - auto shardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); - uassert( - ErrorCodes::IllegalOperation, "there are no shards to target", !shardIds.empty()); - // Many tests assume the primary shard for configDb will be the shard - // with the first ID in ascending lexical order - std::sort(shardIds.begin(), shardIds.end()); - shardId = shardIds[0]; - } else { - shardId = dbInfo.primaryId(); - } - - auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); - - auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( + auto cmdResponse = executeCommandAgainstDatabasePrimaryOrFirstShard( opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - nss.db().toString(), + nss.db(), + dbInfo, CommandHelpers::appendMajorityWriteConcern( CommandHelpers::appendGenericCommandArgs(cmdObj, shardsvrCollRequest.toBSON({})), opCtx->getWriteConcern()), - Shard::RetryPolicy::kIdempotent)); - uassertStatusOK(cmdResponse.commandStatus); + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + Shard::RetryPolicy::kIdempotent); - CommandHelpers::filterCommandReplyForPassthrough(cmdResponse.response, &result); - result.append("collectionsharded", nss.toString()); + const auto remoteResponse = uassertStatusOK(cmdResponse.swResponse); + uassertStatusOK(getStatusFromCommandResult(remoteResponse.data)); auto createCollResp = CreateCollectionResponse::parse( - IDLParserErrorContext("createCollection"), cmdResponse.response); + IDLParserErrorContext("createCollection"), remoteResponse.data); catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( nss, createCollResp.getCollectionVersion(), dbInfo.primaryId()); + // Add only collectionsharded as a response parameter and remove the version to maintain the + // same format as before. + result.append("collectionsharded", nss.toString()); + auto resultObj = + remoteResponse.data.removeField(CreateCollectionResponse::kCollectionVersionFieldName); + CommandHelpers::filterCommandReplyForPassthrough(resultObj, &result); return true; } diff --git a/src/mongo/s/request_types/sharded_ddl_commands.idl b/src/mongo/s/request_types/sharded_ddl_commands.idl index 61f190a70f2..005c713107d 100644 --- a/src/mongo/s/request_types/sharded_ddl_commands.idl +++ b/src/mongo/s/request_types/sharded_ddl_commands.idl @@ -32,10 +32,10 @@ global: imports: - "mongo/db/drop_database.idl" - "mongo/db/commands/rename_collection.idl" + - "mongo/db/keypattern.idl" - "mongo/idl/basic_types.idl" - - "mongo/s/database_version.idl" - "mongo/s/chunk_version.idl" - - "mongo/db/keypattern.idl" + - "mongo/s/database_version.idl" structs: RenameCollectionResponse: @@ -93,6 +93,29 @@ commands: type: object description: "The collation to use for the shard key index." optional: true + + _shardsvrCreateCollectionParticipant: + command_name: _shardsvrCreateCollectionParticipant + cpp_name: ShardsvrCreateCollectionParticipant + description: "Command to create a collection on participant shards, when called, assumes the primary shard is under the critical section for that namespace." + strict: false + namespace: concatenate_with_db + api_version: "" + fields: + indexes: + type: array<object> + description: "Collection indexes." + options: + type: object + description: "Collection options." + collectionUUID: + type: uuid + description: "Collection uuid." + optional: true + idIndex: + type: object + description: "Id index." + _shardsvrDropDatabase: description: "Internal command sent to the primary shard of a database to drop it." |