summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/evergreen.yml2
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--jstests/replsets/tenant_migration_concurrent_writes.js1
-rw-r--r--jstests/sharding/index_commands_during_initial_split.js11
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js1
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp22
-rw-r--r--src/mongo/db/s/config/initial_split_policy.h5
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp80
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp659
-rw-r--r--src/mongo/db/s/create_collection_coordinator.h103
-rw-r--r--src/mongo/db/s/shard_collection_legacy.cpp69
-rw-r--r--src/mongo/db/s/shard_collection_legacy.h7
-rw-r--r--src/mongo/db/s/sharding_ddl_coordinator.cpp39
-rw-r--r--src/mongo/db/s/sharding_ddl_util.cpp29
-rw-r--r--src/mongo/db/s/sharding_ddl_util.h14
-rw-r--r--src/mongo/db/s/shardsvr_create_collection_command.cpp34
-rw-r--r--src/mongo/db/s/shardsvr_create_collection_participant_command.cpp104
-rw-r--r--src/mongo/s/commands/cluster_shard_collection_cmd.cpp76
-rw-r--r--src/mongo/s/request_types/sharded_ddl_commands.idl27
21 files changed, 1161 insertions, 127 deletions
diff --git a/etc/evergreen.yml b/etc/evergreen.yml
index 1669104c6be..d6e81a727c4 100644
--- a/etc/evergreen.yml
+++ b/etc/evergreen.yml
@@ -6308,7 +6308,7 @@ tasks:
vars:
depends_on: jsCore
use_large_distro: "true"
- resmoke_args: --storageEngine=wiredTiger
+ resmoke_args: "--storageEngine=wiredTiger --mongodSetParameters='{featureFlagDisableIncompleteShardingDDLSupport: true}'"
fallback_num_sub_suites: 37
- name: tenant_migration_jscore_passthrough_gen
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index b537ddc91f9..c4f4d1e0e13 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -119,6 +119,7 @@ let viewsCommandTests = {
_shardsvrDropCollection: {skip: isAnInternalCommand},
_shardsvrDropCollectionParticipant: {skip: isAnInternalCommand},
_shardsvrCreateCollection: {skip: isAnInternalCommand},
+ _shardsvrCreateCollectionParticipant: {skip: isAnInternalCommand},
_shardsvrDropDatabase: {skip: isAnInternalCommand},
_shardsvrDropDatabaseParticipant: {skip: isAnInternalCommand},
_shardsvrMovePrimary: {skip: isAnInternalCommand},
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index bfdcaa1eb47..77b461c2829 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -67,6 +67,7 @@ const allCommands = {
_shardsvrDropCollection: {skip: isPrimaryOnly},
_shardsvrCreateCollection: {skip: isPrimaryOnly},
_shardsvrDropCollectionParticipant: {skip: isPrimaryOnly},
+ _shardsvrCreateCollectionParticipant: {skip: isPrimaryOnly},
_shardsvrMovePrimary: {skip: isPrimaryOnly},
_shardsvrRenameCollection: {skip: isPrimaryOnly},
_shardsvrRenameCollectionParticipant: {skip: isAnInternalCommand},
diff --git a/jstests/replsets/tenant_migration_concurrent_writes.js b/jstests/replsets/tenant_migration_concurrent_writes.js
index 40a199a2c9d..603738080b4 100644
--- a/jstests/replsets/tenant_migration_concurrent_writes.js
+++ b/jstests/replsets/tenant_migration_concurrent_writes.js
@@ -456,6 +456,7 @@ const testCases = {
_recvChunkStatus: {skip: isNotRunOnUserDatabase},
_shardsvrCloneCatalogData: {skip: isNotRunOnUserDatabase},
_shardsvrCreateCollection: {skip: isOnlySupportedOnShardedCluster},
+ _shardsvrCreateCollectionParticipant: {skip: isOnlySupportedOnShardedCluster},
_shardsvrMovePrimary: {skip: isNotRunOnUserDatabase},
_shardsvrShardCollection: {skip: isNotRunOnUserDatabase},
_shardsvrRenameCollection: {skip: isOnlySupportedOnShardedCluster},
diff --git a/jstests/sharding/index_commands_during_initial_split.js b/jstests/sharding/index_commands_during_initial_split.js
index 39c70773ee9..350149b9d9c 100644
--- a/jstests/sharding/index_commands_during_initial_split.js
+++ b/jstests/sharding/index_commands_during_initial_split.js
@@ -59,6 +59,17 @@ function runCommandDuringShardCollection(st, ns, shardKey, zones, failpointName,
const numShards = 4;
const st = new ShardingTest({shards: numShards});
+
+const featureFlagParam = assert.commandWorked(
+ st.configRS.getPrimary().adminCommand({getParameter: 1, featureFlagShardingFullDDLSupport: 1}));
+
+if (featureFlagParam.featureFlagShardingFullDDLSupport.value) {
+ jsTest.log(
+ 'Skipping test because featureFlagShardingFullDDLSupport feature flag is enabled and this test expects the exacty legacy steps, so, it is incompatible with the new path.');
+ st.stop();
+ return;
+}
+
const allShards = [];
for (let i = 0; i < numShards; i++) {
allShards.push(st["shard" + i]);
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index 24d2b31bb94..5c1705ea960 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -115,6 +115,7 @@ let testCases = {
_recvChunkStatus: {skip: "internal command"},
_shardsvrCloneCatalogData: {skip: "internal command"},
_shardsvrCreateCollection: {skip: "internal command"},
+ _shardsvrCreateCollectionParticipant: {skip: "internal command"},
_shardsvrDropCollection: {skip: "internal command"},
_shardsvrDropCollectionParticipant: {skip: "internal command"},
_shardsvrDropDatabase: {skip: "internal command"},
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 593cf71a7ff..e2dd0f71059 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -130,6 +130,7 @@ env.Library(
'$BUILD_DIR/mongo/db/transaction',
'$BUILD_DIR/mongo/db/vector_clock_mongod',
'$BUILD_DIR/mongo/s/query/cluster_aggregate',
+ '$BUILD_DIR/mongo/s/sharding_api',
'$BUILD_DIR/mongo/s/sharding_initialization',
'chunk_splitter',
'sharding_api_d',
@@ -326,6 +327,7 @@ env.Library(
'config/configsvr_shard_collection_command.cpp',
'config/configsvr_split_chunk_command.cpp',
'config/configsvr_update_zone_key_range_command.cpp',
+ 'create_collection_coordinator.cpp',
'drop_collection_coordinator.cpp',
'drop_database_coordinator.cpp',
'flush_database_cache_updates_command.cpp',
@@ -344,6 +346,7 @@ env.Library(
'sharding_server_status.cpp',
'sharding_state_command.cpp',
'shardsvr_create_collection_command.cpp',
+ 'shardsvr_create_collection_participant_command.cpp',
'shardsvr_drop_collection_command.cpp',
'shardsvr_drop_collection_participant_command.cpp',
'shardsvr_drop_database_command.cpp',
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index 3ec7fa7f13f..ba9fb36ede2 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -239,41 +239,41 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle
std::unique_ptr<InitialSplitPolicy> InitialSplitPolicy::calculateOptimizationStrategy(
OperationContext* opCtx,
const ShardKeyPattern& shardKeyPattern,
- const ShardsvrShardCollectionRequest& request,
+ const std::int64_t numInitialChunks,
+ const bool presplitHashedZones,
+ const boost::optional<std::vector<BSONObj>>& initialSplitPoints,
const std::vector<TagsType>& tags,
size_t numShards,
bool collectionIsEmpty) {
uassert(ErrorCodes::InvalidOptions,
str::stream() << "numInitialChunks is only supported when the collection is empty "
"and has a hashed field in the shard key pattern",
- !request.getNumInitialChunks() ||
- (shardKeyPattern.isHashedPattern() && collectionIsEmpty));
+ !numInitialChunks || (shardKeyPattern.isHashedPattern() && collectionIsEmpty));
uassert(ErrorCodes::InvalidOptions,
str::stream()
<< "When the prefix of the hashed shard key is a range field, "
"'numInitialChunks' can only be used when the 'presplitHashedZones' is true",
- !request.getNumInitialChunks() || shardKeyPattern.hasHashedPrefix() ||
- request.getPresplitHashedZones());
+ !numInitialChunks || shardKeyPattern.hasHashedPrefix() || presplitHashedZones);
uassert(ErrorCodes::InvalidOptions,
str::stream() << "Cannot have both initial split points and tags set",
- !request.getInitialSplitPoints() || tags.empty());
+ !initialSplitPoints || tags.empty());
// If 'presplitHashedZones' flag is set, we always use 'PresplitHashedZonesSplitPolicy', to make
// sure we throw the correct assertion if further validation fails.
- if (request.getPresplitHashedZones()) {
+ if (presplitHashedZones) {
return std::make_unique<PresplitHashedZonesSplitPolicy>(
- opCtx, shardKeyPattern, tags, request.getNumInitialChunks(), collectionIsEmpty);
+ opCtx, shardKeyPattern, tags, numInitialChunks, collectionIsEmpty);
}
// The next preference is to use split points based strategy. This is only possible if
// 'initialSplitPoints' is set, or if the collection is empty with shard key having a hashed
// prefix.
- if (request.getInitialSplitPoints()) {
- return std::make_unique<SplitPointsBasedSplitPolicy>(*request.getInitialSplitPoints());
+ if (initialSplitPoints) {
+ return std::make_unique<SplitPointsBasedSplitPolicy>(*initialSplitPoints);
}
if (tags.empty() && shardKeyPattern.hasHashedPrefix() && collectionIsEmpty) {
return std::make_unique<SplitPointsBasedSplitPolicy>(
- shardKeyPattern, numShards, request.getNumInitialChunks());
+ shardKeyPattern, numShards, numInitialChunks);
}
if (!tags.empty()) {
diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h
index 6687c53c933..91f1c3c4054 100644
--- a/src/mongo/db/s/config/initial_split_policy.h
+++ b/src/mongo/db/s/config/initial_split_policy.h
@@ -36,7 +36,6 @@
#include "mongo/db/namespace_string.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_tags.h"
-#include "mongo/s/request_types/shard_collection_gen.h"
#include "mongo/s/shard_id.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/util/string_map.h"
@@ -61,7 +60,9 @@ public:
static std::unique_ptr<InitialSplitPolicy> calculateOptimizationStrategy(
OperationContext* opCtx,
const ShardKeyPattern& shardKeyPattern,
- const ShardsvrShardCollectionRequest& request,
+ const std::int64_t numInitialChunks,
+ const bool presplitHashedZones,
+ const boost::optional<std::vector<BSONObj>>& initialSplitPoints,
const std::vector<TagsType>& tags,
size_t numShards,
bool collectionIsEmpty);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
index 85067a04d22..c3871c2a109 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
@@ -46,6 +46,7 @@
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/request_types/shard_collection_gen.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
@@ -118,7 +119,9 @@ TEST_F(CreateFirstChunksTest, Split_Disallowed_With_Both_SplitPoints_And_Zones)
ASSERT_THROWS_CODE(
InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
kShardKeyPattern,
- request,
+ request.getNumInitialChunks(),
+ request.getPresplitHashedZones(),
+ request.getInitialSplitPoints(),
tags,
2 /* numShards */,
true /* collectionIsEmpty */),
@@ -128,7 +131,9 @@ TEST_F(CreateFirstChunksTest, Split_Disallowed_With_Both_SplitPoints_And_Zones)
ASSERT_THROWS_CODE(
InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
kShardKeyPattern,
- request,
+ request.getNumInitialChunks(),
+ request.getPresplitHashedZones(),
+ request.getInitialSplitPoints(),
tags,
2 /* numShards */,
false /* collectionIsEmpty */),
@@ -160,7 +165,9 @@ TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromSplitVector_Man
auto optimization =
InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
kShardKeyPattern,
- request,
+ request.getNumInitialChunks(),
+ request.getPresplitHashedZones(),
+ request.getInitialSplitPoints(),
{}, /* tags */
3 /* numShards */,
false /* collectionIsEmpty */);
@@ -203,12 +210,15 @@ TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromClient_ManyChun
ShardsvrShardCollectionRequest request;
request.setInitialSplitPoints(splitPoints);
- auto optimization = InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
- kShardKeyPattern,
- request,
- zones,
- 3 /* numShards */,
- collectionIsEmpty);
+ auto optimization =
+ InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
+ kShardKeyPattern,
+ request.getNumInitialChunks(),
+ request.getPresplitHashedZones(),
+ request.getInitialSplitPoints(),
+ zones,
+ 3 /* numShards */,
+ collectionIsEmpty);
ASSERT(optimization->isOptimized());
@@ -236,8 +246,15 @@ TEST_F(CreateFirstChunksTest, NonEmptyCollection_WithZones_OneChunkToPrimary) {
bool collectionIsEmpty = false;
ShardsvrShardCollectionRequest request;
- auto optimization = InitialSplitPolicy::calculateOptimizationStrategy(
- operationContext(), kShardKeyPattern, request, zones, 3 /* numShards */, collectionIsEmpty);
+ auto optimization =
+ InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
+ kShardKeyPattern,
+ request.getNumInitialChunks(),
+ request.getPresplitHashedZones(),
+ request.getInitialSplitPoints(),
+ zones,
+ 3 /* numShards */,
+ collectionIsEmpty);
ASSERT(optimization->isOptimized());
@@ -275,12 +292,15 @@ TEST_F(CreateFirstChunksTest, EmptyCollection_SplitPoints_FromClient_ManyChunksD
ShardsvrShardCollectionRequest request;
request.setInitialSplitPoints(splitPoints);
- auto optimization = InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
- kShardKeyPattern,
- request,
- zones,
- 3 /* numShards */,
- collectionIsEmpty);
+ auto optimization =
+ InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
+ kShardKeyPattern,
+ request.getNumInitialChunks(),
+ request.getPresplitHashedZones(),
+ request.getInitialSplitPoints(),
+ zones,
+ 3 /* numShards */,
+ collectionIsEmpty);
ASSERT(optimization->isOptimized());
@@ -322,12 +342,15 @@ TEST_F(CreateFirstChunksTest, EmptyCollection_NoSplitPoints_OneChunkToPrimary) {
ShardsvrShardCollectionRequest request;
request.setInitialSplitPoints(splitPoints);
- auto optimization = InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
- kShardKeyPattern,
- request,
- zones,
- 3 /* numShards */,
- collectionIsEmpty);
+ auto optimization =
+ InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
+ kShardKeyPattern,
+ request.getNumInitialChunks(),
+ request.getPresplitHashedZones(),
+ request.getInitialSplitPoints(),
+ zones,
+ 3 /* numShards */,
+ collectionIsEmpty);
ASSERT(optimization->isOptimized());
@@ -355,8 +378,15 @@ TEST_F(CreateFirstChunksTest, EmptyCollection_WithZones_ManyChunksOnFirstZoneSha
ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))};
bool collectionIsEmpty = true;
ShardsvrShardCollectionRequest request;
- auto optimization = InitialSplitPolicy::calculateOptimizationStrategy(
- operationContext(), kShardKeyPattern, request, zones, 3 /* numShards */, collectionIsEmpty);
+ auto optimization =
+ InitialSplitPolicy::calculateOptimizationStrategy(operationContext(),
+ kShardKeyPattern,
+ request.getNumInitialChunks(),
+ request.getPresplitHashedZones(),
+ request.getInitialSplitPoints(),
+ zones,
+ 3 /* numShards */,
+ collectionIsEmpty);
ASSERT(optimization->isOptimized());
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
new file mode 100644
index 00000000000..12dbc593b49
--- /dev/null
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -0,0 +1,659 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/commands/feature_compatibility_version.h"
+#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/create_collection_coordinator.h"
+#include "mongo/db/s/shard_key_util.h"
+#include "mongo/db/s/sharding_ddl_util.h"
+#include "mongo/db/s/sharding_logging.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/logv2/log.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/request_types/shard_collection_gen.h"
+#include "mongo/s/sharded_collections_ddl_parameters_gen.h"
+#include "mongo/s/write_ops/cluster_write.h"
+
+namespace mongo {
+namespace {
+
+struct OptionsAndIndexes {
+ BSONObj options;
+ std::vector<BSONObj> indexSpecs;
+ BSONObj idIndexSpec;
+};
+
+OptionsAndIndexes getCollectionOptionsAndIndexes(OperationContext* opCtx,
+ const NamespaceStringOrUUID& nssOrUUID) {
+ DBDirectClient localClient(opCtx);
+ BSONObj idIndex;
+ BSONObjBuilder optionsBob;
+
+ auto all =
+ localClient.getCollectionInfos(nssOrUUID.dbname(), BSON("info.uuid" << *nssOrUUID.uuid()));
+
+ // There must be a collection at this time.
+ invariant(!all.empty());
+ auto& entry = all.front();
+ if (entry["options"].isABSONObj()) {
+ optionsBob.appendElements(entry["options"].Obj());
+ }
+ optionsBob.append(entry["info"]["uuid"]);
+ idIndex = entry["idIndex"].Obj().getOwned();
+
+ auto indexSpecsList = localClient.getIndexSpecs(nssOrUUID, false, 0);
+
+ return {optionsBob.obj(),
+ std::vector<BSONObj>(std::begin(indexSpecsList), std::end(indexSpecsList)),
+ idIndex};
+}
+
+/**
+ * Compares the proposed shard key with the shard key of the collection's existing zones
+ * to ensure they are a legal combination.
+ */
+void validateShardKeyAgainstExistingZones(OperationContext* opCtx,
+ const BSONObj& proposedKey,
+ const ShardKeyPattern& shardKeyPattern,
+ const std::vector<TagsType>& tags) {
+ for (const auto& tag : tags) {
+ BSONObjIterator tagMinFields(tag.getMinKey());
+ BSONObjIterator tagMaxFields(tag.getMaxKey());
+ BSONObjIterator proposedFields(proposedKey);
+
+ while (tagMinFields.more() && proposedFields.more()) {
+ BSONElement tagMinKeyElement = tagMinFields.next();
+ BSONElement tagMaxKeyElement = tagMaxFields.next();
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "the min and max of the existing zone " << tag.getMinKey()
+ << " -->> " << tag.getMaxKey() << " have non-matching keys",
+ tagMinKeyElement.fieldNameStringData() ==
+ tagMaxKeyElement.fieldNameStringData());
+
+ BSONElement proposedKeyElement = proposedFields.next();
+ bool match = ((tagMinKeyElement.fieldNameStringData() ==
+ proposedKeyElement.fieldNameStringData()) &&
+ ((tagMinFields.more() && proposedFields.more()) ||
+ (!tagMinFields.more() && !proposedFields.more())));
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "the proposed shard key " << proposedKey.toString()
+ << " does not match with the shard key of the existing zone "
+ << tag.getMinKey() << " -->> " << tag.getMaxKey(),
+ match);
+
+ // If the field is hashed, make sure that the min and max values are of supported type.
+ uassert(
+ ErrorCodes::InvalidOptions,
+ str::stream() << "cannot do hash sharding with the proposed key "
+ << proposedKey.toString() << " because there exists a zone "
+ << tag.getMinKey() << " -->> " << tag.getMaxKey()
+ << " whose boundaries are not of type NumberLong, MinKey or MaxKey",
+ !ShardKeyPattern::isHashedPatternEl(proposedKeyElement) ||
+ (ShardKeyPattern::isValidHashedValue(tagMinKeyElement) &&
+ ShardKeyPattern::isValidHashedValue(tagMaxKeyElement)));
+ }
+ }
+}
+
+std::vector<TagsType> getTagsAndValidate(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& proposedKey,
+ const ShardKeyPattern& shardKeyPattern) {
+ // Read zone info
+ const auto catalogClient = Grid::get(opCtx)->catalogClient();
+ auto tags = uassertStatusOK(catalogClient->getTagsForCollection(opCtx, nss));
+
+ if (!tags.empty()) {
+ validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tags);
+ }
+
+ return tags;
+}
+
+boost::optional<UUID> getUUID(OperationContext* opCtx, const NamespaceString& nss) {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden);
+ const auto& coll = autoColl.getCollection();
+ return coll->uuid();
+}
+
+boost::optional<UUID> getUUIDFromPrimaryShard(OperationContext* opCtx, const NamespaceString& nss) {
+ // Obtain the collection's UUID from the primary shard's listCollections response.
+ DBDirectClient localClient(opCtx);
+ BSONObj res;
+ {
+ std::list<BSONObj> all =
+ localClient.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
+ if (!all.empty()) {
+ res = all.front().getOwned();
+ }
+ }
+
+ uassert(ErrorCodes::InternalError,
+ str::stream() << "expected to have an entry for " << nss.toString()
+ << " in listCollections response, but did not",
+ !res.isEmpty());
+
+ BSONObj collectionInfo;
+ if (res["info"].type() == BSONType::Object) {
+ collectionInfo = res["info"].Obj();
+ }
+
+ uassert(ErrorCodes::InternalError,
+ str::stream() << "expected to return 'info' field as part of "
+ "listCollections for "
+ << nss.ns()
+ << " because the cluster is in featureCompatibilityVersion=3.6, but got "
+ << res,
+ !collectionInfo.isEmpty());
+
+ uassert(ErrorCodes::InternalError,
+ str::stream() << "expected to return a UUID for collection " << nss.ns()
+ << " as part of 'info' field but got " << res,
+ collectionInfo.hasField("uuid"));
+
+ return uassertStatusOK(UUID::parse(collectionInfo["uuid"]));
+}
+
+bool checkIfCollectionIsEmpty(OperationContext* opCtx, const NamespaceString& nss) {
+ // Use find with predicate instead of count in order to ensure that the count
+ // command doesn't just consult the cached metadata, which may not always be
+ // correct
+ DBDirectClient localClient(opCtx);
+ return localClient.findOne(nss.ns(), Query()).isEmpty();
+}
+
+int getNumShards(OperationContext* opCtx) {
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ shardRegistry->reload(opCtx);
+
+ return shardRegistry->getNumShards(opCtx);
+}
+
+BSONObj getCollation(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const boost::optional<BSONObj>& collation) {
+ // Ensure the collation is valid. Currently we only allow the simple collation.
+ std::unique_ptr<CollatorInterface> requestedCollator = nullptr;
+ if (collation) {
+ requestedCollator =
+ uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(collation.value()));
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "The collation for shardCollection must be {locale: 'simple'}, "
+ << "but found: " << collation.value(),
+ !requestedCollator);
+ }
+
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden);
+
+ const auto actualCollator = [&]() -> const CollatorInterface* {
+ const auto& coll = autoColl.getCollection();
+ if (coll) {
+ uassert(
+ ErrorCodes::InvalidOptions, "can't shard a capped collection", !coll->isCapped());
+ return coll->getDefaultCollator();
+ }
+
+ return nullptr;
+ }();
+
+ if (!requestedCollator && !actualCollator)
+ return BSONObj();
+
+ auto actualCollatorBSON = actualCollator->getSpec().toBSON();
+
+ if (!collation) {
+ auto actualCollatorFilter =
+ uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
+ ->makeFromBSON(actualCollatorBSON));
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "If no collation was specified, the collection collation must be "
+ "{locale: 'simple'}, "
+ << "but found: " << actualCollatorBSON,
+ !actualCollatorFilter);
+ }
+
+ return actualCollatorBSON;
+}
+
+void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) {
+ BatchWriteExecStats stats;
+ BatchedCommandResponse response;
+ BatchedCommandRequest updateRequest([&]() {
+ write_ops::Update updateOp(ChunkType::ConfigNS);
+ std::vector<write_ops::UpdateOpEntry> entries;
+ entries.reserve(chunks.size());
+ for (const auto& chunk : chunks) {
+ write_ops::UpdateOpEntry entry(
+ BSON(ChunkType::collectionUUID << chunk.getCollectionUUID() << ChunkType::shard
+ << chunk.getShard() << ChunkType::min
+ << chunk.getMin()),
+ write_ops::UpdateModification::parseFromClassicUpdate(chunk.toConfigBSON()));
+ entry.setUpsert(true);
+ entry.setMulti(false);
+ entries.push_back(entry);
+ }
+ updateOp.setUpdates(entries);
+ return updateOp;
+ }());
+
+ updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+
+ ClusterWriter::write(opCtx, updateRequest, &stats, &response);
+ uassertStatusOK(response.toStatus());
+}
+
+void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, CollectionType& coll) {
+ BatchWriteExecStats stats;
+ BatchedCommandResponse response;
+ BatchedCommandRequest updateRequest([&]() {
+ write_ops::Update updateOp(CollectionType::ConfigNS);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(BSON(CollectionType::kNssFieldName << nss.ns()));
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(coll.toBSON()));
+ entry.setUpsert(true);
+ entry.setMulti(false);
+ return entry;
+ }()});
+ return updateOp;
+ }());
+
+ updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+ try {
+ ClusterWriter::write(opCtx, updateRequest, &stats, &response);
+ uassertStatusOK(response.toStatus());
+ } catch (const DBException&) {
+ // If an error happens when contacting the config server, we don't know if the update
+ // succeded or not, which might cause the local shard version to differ from the config
+ // server, so we clear the metadata to allow another operation to refresh it.
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx);
+ throw;
+ }
+}
+
+} // namespace
+
+CreateCollectionCoordinator::CreateCollectionCoordinator(
+ OperationContext* opCtx, const ShardsvrCreateCollection& createCollParams)
+ : ShardingDDLCoordinator(opCtx, createCollParams.getNamespace()),
+ _serviceContext(opCtx->getServiceContext()),
+ _request(createCollParams),
+ _nss(_request.getNamespace()) {
+ invariant(createCollParams.getShardKey());
+ _shardKeyPattern = ShardKeyPattern(createCollParams.getShardKey()->getOwned());
+}
+
+SemiFuture<void> CreateCollectionCoordinator::runImpl(
+ std::shared_ptr<executor::TaskExecutor> executor) {
+ return ExecutorFuture<void>(executor, Status::OK())
+ .then([this, anchor = shared_from_this()]() {
+ ThreadClient tc("CreateCollectionCoordinator", _serviceContext);
+ auto opCtxHolder = tc->makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ _forwardableOpMetadata.setOn(opCtx);
+
+ _checkCommandArguments(opCtx);
+ if (_result) {
+ // Early return before holding the critical section, the collection was already
+ // created.
+ return;
+ }
+ {
+ // From this point on all writes are blocked on the collection.
+ ScopedShardVersionCriticalSection critSec(opCtx, _nss);
+
+ _createCollectionAndIndexes(opCtx);
+ if (_result) {
+ // Early return, the collection was already created.
+ return;
+ }
+
+ _createChunks(opCtx);
+ if (_splitPolicy->isOptimized()) {
+ // Block reads/writes from here on if we need to create the collection on other
+ // shards, this way we prevent reads/writes that should be redirected to another
+ // shard.
+ critSec.enterCommitPhase();
+ _createCollectionOnNonPrimaryShards(opCtx);
+
+ _commit(opCtx);
+ }
+ }
+
+ if (!_splitPolicy->isOptimized()) {
+ _commit(opCtx);
+ }
+
+ _cleanup(opCtx);
+ })
+ .onError([this, anchor = shared_from_this()](const Status& status) {
+ LOGV2_ERROR(5277908,
+ "Error running create collection",
+ "namespace"_attr = _nss,
+ "error"_attr = redact(status));
+ return status;
+ })
+ .semi();
+}
+
+
+void CreateCollectionCoordinator::_checkCommandArguments(OperationContext* opCtx) {
+ LOGV2_DEBUG(5277902, 2, "Create collection _checkCommandArguments", "namespace"_attr = _nss);
+
+ const auto dbInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabaseWithRefresh(opCtx, _nss.db()));
+
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "sharding not enabled for db " << _nss.db(),
+ dbInfo.shardingEnabled());
+
+ if (_nss.db() == NamespaceString::kConfigDb) {
+ // Only whitelisted collections in config may be sharded (unless we are in test mode)
+ uassert(ErrorCodes::IllegalOperation,
+ "only special collections in the config db may be sharded",
+ _nss == NamespaceString::kLogicalSessionsNamespace);
+ }
+
+ // Ensure that hashed and unique are not both set.
+ uassert(ErrorCodes::InvalidOptions,
+ "Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on "
+ "the hashed field by declaring an additional (non-hashed) unique index on the field.",
+ !_shardKeyPattern.value().isHashedPattern() ||
+ !(_request.getUnique() && _request.getUnique().value()));
+
+ // Ensure the namespace is valid.
+ uassert(ErrorCodes::IllegalOperation,
+ "can't shard system namespaces",
+ !_nss.isSystem() || _nss == NamespaceString::kLogicalSessionsNamespace ||
+ _nss.isTemporaryReshardingCollection());
+
+ if (_request.getNumInitialChunks()) {
+ // Ensure numInitialChunks is within valid bounds.
+ // Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000
+ // chunks in total to limit the amount of memory this command consumes so there is less
+ // danger of an OOM error.
+
+ const int maxNumInitialChunksForShards =
+ Grid::get(opCtx)->shardRegistry()->getNumShardsNoReload() * 8192;
+ const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption
+ int numChunks = _request.getNumInitialChunks().value();
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "numInitialChunks cannot be more than either: "
+ << maxNumInitialChunksForShards << ", 8192 * number of shards; or "
+ << maxNumInitialChunksTotal,
+ numChunks >= 0 && numChunks <= maxNumInitialChunksForShards &&
+ numChunks <= maxNumInitialChunksTotal);
+ }
+
+ if (_nss.db() == NamespaceString::kConfigDb) {
+ auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+
+ auto findReponse = uassertStatusOK(
+ configShard->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ _nss,
+ BSONObj(),
+ BSONObj(),
+ 1));
+
+ auto numDocs = findReponse.docs.size();
+
+ // If this is a collection on the config db, it must be empty to be sharded.
+ uassert(ErrorCodes::IllegalOperation,
+ "collections in the config db must be empty to be sharded",
+ numDocs == 0);
+ }
+
+ auto unique = _request.getUnique() ? *_request.getUnique() : false;
+ if (auto createCollectionResponseOpt =
+ mongo::sharding_ddl_util::checkIfCollectionAlreadySharded(
+ opCtx,
+ _nss,
+ _shardKeyPattern->getKeyPattern().toBSON(),
+ getCollation(opCtx, _nss, _request.getCollation()),
+ unique)) {
+ _result = createCollectionResponseOpt;
+ }
+}
+
+void CreateCollectionCoordinator::_createCollectionAndIndexes(OperationContext* opCtx) {
+ LOGV2_DEBUG(
+ 5277903, 2, "Create collection _createCollectionAndIndexes", "namespace"_attr = _nss);
+
+ auto unique = _request.getUnique() ? *_request.getUnique() : false;
+ _collation = getCollation(opCtx, _nss, _request.getCollation());
+
+ if (auto createCollectionResponseOpt =
+ mongo::sharding_ddl_util::checkIfCollectionAlreadySharded(
+ opCtx, _nss, _shardKeyPattern->getKeyPattern().toBSON(), *_collation, unique)) {
+ _result = createCollectionResponseOpt;
+ return;
+ }
+
+ // Internally creates the collection if it doesn't exist.
+ shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible(
+ opCtx,
+ _nss,
+ _shardKeyPattern->toBSON(),
+ *_shardKeyPattern,
+ _collation,
+ _request.getUnique() ? *_request.getUnique() : false,
+ shardkeyutil::ValidationBehaviorsShardCollection(opCtx));
+
+ _collectionUUID = *getUUIDFromPrimaryShard(opCtx, _nss);
+}
+
+void CreateCollectionCoordinator::_createChunks(OperationContext* opCtx) {
+ LOGV2_DEBUG(5277904, 2, "Create collection _createChunks", "namespace"_attr = _nss);
+
+ _splitPolicy = InitialSplitPolicy::calculateOptimizationStrategy(
+ opCtx,
+ *_shardKeyPattern,
+ _request.getNumInitialChunks() ? *_request.getNumInitialChunks() : 0,
+ _request.getPresplitHashedZones() ? *_request.getPresplitHashedZones() : false,
+ _request.getInitialSplitPoints(),
+ getTagsAndValidate(opCtx, _nss, _shardKeyPattern->toBSON(), *_shardKeyPattern),
+ getNumShards(opCtx),
+ checkIfCollectionIsEmpty(opCtx, _nss));
+
+ _initialChunks = _splitPolicy->createFirstChunks(
+ opCtx, *_shardKeyPattern, {_nss, *_collectionUUID, ShardingState::get(opCtx)->shardId()});
+
+ // There must be at least one chunk.
+ invariant(!_initialChunks.chunks.empty());
+}
+
+void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationContext* opCtx) {
+ LOGV2_DEBUG(5277905,
+ 2,
+ "Create collection _createCollectionOnNonPrimaryShards",
+ "namespace"_attr = _nss);
+
+ std::vector<AsyncRequestsSender::Request> requests;
+ std::set<ShardId> initializedShards;
+ auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId();
+
+ NamespaceStringOrUUID nssOrUUID{_nss.db().toString(), *_collectionUUID};
+ auto [collOptions, indexes, idIndex] = getCollectionOptionsAndIndexes(opCtx, nssOrUUID);
+
+ for (const auto& chunk : _initialChunks.chunks) {
+ const auto& chunkShardId = chunk.getShard();
+ if (chunkShardId == dbPrimaryShardId ||
+ initializedShards.find(chunkShardId) != initializedShards.end()) {
+ continue;
+ }
+
+ ShardsvrCreateCollectionParticipant createCollectionParticipantRequest(_nss);
+ createCollectionParticipantRequest.setCollectionUUID(*_collectionUUID);
+
+ createCollectionParticipantRequest.setOptions(collOptions);
+ createCollectionParticipantRequest.setIdIndex(idIndex);
+ createCollectionParticipantRequest.setIndexes(indexes);
+
+ requests.emplace_back(
+ chunkShardId,
+ createCollectionParticipantRequest.toBSON(
+ BSON("writeConcern" << ShardingCatalogClient::kMajorityWriteConcern.toBSON())));
+
+ initializedShards.emplace(chunkShardId);
+ }
+
+ if (!requests.empty()) {
+ auto responses = gatherResponses(opCtx,
+ _nss.db(),
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent,
+ requests);
+
+ // If any shards fail to create the collection, fail the entire shardCollection command
+ // (potentially leaving incomplely created sharded collection)
+ for (const auto& response : responses) {
+ auto shardResponse = uassertStatusOKWithContext(
+ std::move(response.swResponse),
+ str::stream() << "Unable to create collection " << _nss.ns() << " on "
+ << response.shardId);
+ auto status = getStatusFromCommandResult(shardResponse.data);
+ uassertStatusOK(status.withContext(str::stream()
+ << "Unable to create collection " << _nss.ns()
+ << " on " << response.shardId));
+
+ auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data);
+ uassertStatusOK(wcStatus.withContext(str::stream()
+ << "Unable to create collection " << _nss.ns()
+ << " on " << response.shardId));
+ }
+ }
+}
+
+void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
+ LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = _nss);
+
+ // Upsert Chunks.
+ upsertChunks(opCtx, _initialChunks.chunks);
+
+ CollectionType coll(_nss,
+ _initialChunks.collVersion().epoch(),
+ _initialChunks.creationTime,
+ Date_t::now(),
+ *_collectionUUID);
+
+ coll.setKeyPattern(_shardKeyPattern->toBSON());
+
+ if (_collation) {
+ coll.setDefaultCollation(_collation.value());
+ }
+
+ if (_request.getUnique()) {
+ coll.setUnique(*_request.getUnique());
+ }
+
+ updateCatalogEntry(opCtx, _nss, coll);
+}
+
+void CreateCollectionCoordinator::_cleanup(OperationContext* opCtx) {
+ LOGV2_DEBUG(5277907, 2, "Create collection _cleanup", "namespace"_attr = _nss);
+
+ try {
+ forceShardFilteringMetadataRefresh(opCtx, _nss);
+ } catch (const DBException&) {
+ // If the refresh fails, then set the shard version to UNKNOWN and let a future operation to
+ // refresh the metadata.
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState());
+ AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
+ CollectionShardingRuntime::get(opCtx, _nss)->clearFilteringMetadata(opCtx);
+ }
+
+ // Is it really necessary to refresh all shards? or can I assume that the shard version will be
+ // unknown and refreshed eventually?
+ auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto dbPrimaryShardId = ShardingState::get(opCtx)->shardId();
+
+ std::set<ShardId> shardsRefreshed;
+ for (const auto& chunk : _initialChunks.chunks) {
+ const auto& chunkShardId = chunk.getShard();
+ if (chunkShardId == dbPrimaryShardId ||
+ shardsRefreshed.find(chunkShardId) != shardsRefreshed.end()) {
+ continue;
+ }
+
+ auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, chunkShardId));
+ try {
+ auto refreshCmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ BSON("_flushRoutingTableCacheUpdates" << _nss.ns()),
+ Seconds{30},
+ Shard::RetryPolicy::kIdempotent));
+
+ uassertStatusOK(refreshCmdResponse.commandStatus);
+ } catch (const DBException& ex) {
+ LOGV2_WARNING(5277909,
+ "Could not refresh shard",
+ "shardId"_attr = shard->getId(),
+ "error"_attr = redact(ex.reason()));
+ }
+ shardsRefreshed.emplace(chunkShardId);
+ }
+
+ LOGV2(5277901,
+ "Created initial chunk(s)",
+ "namespace"_attr = _nss,
+ "numInitialChunks"_attr = _initialChunks.chunks.size(),
+ "initialCollectionVersion"_attr = _initialChunks.collVersion());
+
+
+ ShardingLogging::get(opCtx)->logChange(
+ opCtx,
+ "shardCollection.end",
+ _nss.ns(),
+ BSON("version" << _initialChunks.collVersion().toString() << "numChunks"
+ << static_cast<int>(_initialChunks.chunks.size())),
+ ShardingCatalogClient::kMajorityWriteConcern);
+
+ auto result = CreateCollectionResponse(
+ _initialChunks.chunks[_initialChunks.chunks.size() - 1].getVersion());
+ result.setCollectionUUID(_collectionUUID);
+ _result = result;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h
new file mode 100644
index 00000000000..60f107487e2
--- /dev/null
+++ b/src/mongo/db/s/create_collection_coordinator.h
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/s/config/initial_split_policy.h"
+#include "mongo/db/s/shard_filtering_metadata_refresh.h"
+#include "mongo/db/s/sharding_ddl_coordinator.h"
+#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+
+class CreateCollectionCoordinator final
+ : public ShardingDDLCoordinator,
+ public std::enable_shared_from_this<CreateCollectionCoordinator> {
+public:
+ CreateCollectionCoordinator(OperationContext* opCtx, const ShardsvrCreateCollection& request);
+
+ /**
+ * Returns the information of the newly created collection, or the already existing one. It must
+ * be called after a successfull execution of run.
+ */
+ const CreateCollectionResponse& getResultOnSuccess() {
+ return *_result;
+ }
+
+private:
+ SemiFuture<void> runImpl(std::shared_ptr<executor::TaskExecutor> executor) override;
+
+ /**
+ * Performs all required checks before holding the critical sections.
+ */
+ void _checkCommandArguments(OperationContext* opCtx);
+
+ /**
+ * Ensures the collection is created locally and has the appropiate shard index.
+ */
+ void _createCollectionAndIndexes(OperationContext* opCtx);
+
+ /**
+ * Given the appropiate split policy, create the initial chunks.
+ */
+ void _createChunks(OperationContext* opCtx);
+
+ /**
+ * If the optimized path can be taken, ensure the collection is already created in all the
+ * participant shards.
+ */
+ void _createCollectionOnNonPrimaryShards(OperationContext* opCtx);
+
+ /**
+ * Does the following writes:
+ * 1. Updates the config.collections entry for the new sharded collection
+ * 2. Updates config.chunks entries for the new sharded collection
+ */
+ void _commit(OperationContext* opCtx);
+
+ /**
+ * Refresh all participant shards and log creation.
+ */
+ void _cleanup(OperationContext* opCtx);
+
+ ServiceContext* _serviceContext;
+ const ShardsvrCreateCollection _request;
+ const NamespaceString& _nss;
+
+ boost::optional<ShardKeyPattern> _shardKeyPattern;
+ boost::optional<BSONObj> _collation;
+ boost::optional<UUID> _collectionUUID;
+ std::unique_ptr<InitialSplitPolicy> _splitPolicy;
+ InitialSplitPolicy::ShardCollectionConfig _initialChunks;
+ boost::optional<CreateCollectionResponse> _result;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/shard_collection_legacy.cpp b/src/mongo/db/s/shard_collection_legacy.cpp
index 47e82cd0cc0..766f1bb39ef 100644
--- a/src/mongo/db/s/shard_collection_legacy.cpp
+++ b/src/mongo/db/s/shard_collection_legacy.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/s/shard_collection_legacy.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/shard_key_util.h"
+#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_logging.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/logv2/log.h"
@@ -101,38 +102,11 @@ void uassertStatusOKWithWarning(const Status& status) {
}
}
-/**
- * Throws an exception if the collection is already sharded with different options.
- *
- * If the collection is already sharded with the same options, returns the existing collection's
- * full spec, else returns boost::none.
- */
boost::optional<CreateCollectionResponse> checkIfCollectionAlreadyShardedWithSameOptions(
OperationContext* opCtx, const ShardsvrShardCollectionRequest& request) {
const auto& nss = *request.get_shardsvrShardCollection();
- auto cm = uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss));
-
- if (!cm.isSharded()) {
- return boost::none;
- }
-
- auto defaultCollator =
- cm.getDefaultCollator() ? cm.getDefaultCollator()->getSpec().toBSON() : BSONObj();
-
- // If the collection is already sharded, fail if the deduced options in this request do not
- // match the options the collection was originally sharded with.
- uassert(ErrorCodes::AlreadyInitialized,
- str::stream() << "sharding already enabled for collection " << nss,
- SimpleBSONObjComparator::kInstance.evaluate(cm.getShardKeyPattern().toBSON() ==
- request.getKey()) &&
- SimpleBSONObjComparator::kInstance.evaluate(defaultCollator ==
- *request.getCollation()) &&
- cm.isUnique() == request.getUnique());
-
- CreateCollectionResponse response(cm.getVersion());
- response.setCollectionUUID(cm.getUUID());
- return response;
+ return mongo::sharding_ddl_util::checkIfCollectionAlreadySharded(
+ opCtx, nss, request.getKey(), *request.getCollation(), request.getUnique());
}
void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) {
@@ -584,7 +558,7 @@ CreateCollectionResponse shardCollection(OperationContext* opCtx,
// path.
boost::optional<DistLockManager::ScopedDistLock> dbDistLock;
boost::optional<DistLockManager::ScopedDistLock> collDistLock;
- if (!mustTakeDistLock) {
+ if (mustTakeDistLock) {
dbDistLock.emplace(uassertStatusOK(DistLockManager::get(opCtx)->lock(
opCtx, nss.db(), "shardCollection", DistLockManager::kDefaultLockTimeout)));
collDistLock.emplace(uassertStatusOK(DistLockManager::get(opCtx)->lock(
@@ -604,14 +578,20 @@ CreateCollectionResponse shardCollection(OperationContext* opCtx,
// If DistLock must not be taken, then the request came from the config server, there is no
// need to check this here.
- if (!mustTakeDistLock) {
+ if (mustTakeDistLock) {
if (nss.db() == NamespaceString::kConfigDb) {
- BSONObj countResult;
- DBDirectClient client(opCtx);
- if (!client.runCommand(
- nss.db().toString(), BSON("count" << nss.coll()), countResult))
- uassertStatusOK(getStatusFromCommandResult(countResult));
- auto numDocs = countResult["n"].Int();
+ auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+
+ auto findReponse = uassertStatusOK(configShard->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ nss,
+ BSONObj(),
+ BSONObj(),
+ 1));
+
+ auto numDocs = findReponse.docs.size();
// If this is a collection on the config db, it must be empty to be sharded.
uassert(ErrorCodes::IllegalOperation,
@@ -642,11 +622,12 @@ CreateCollectionResponse shardCollection(OperationContext* opCtx,
splitPolicy =
InitialSplitPolicy::calculateOptimizationStrategy(opCtx,
targetState->shardKeyPattern,
- request,
+ request.getNumInitialChunks(),
+ request.getPresplitHashedZones(),
+ request.getInitialSplitPoints(),
targetState->tags,
getNumShards(opCtx),
targetState->collectionIsEmpty);
-
boost::optional<CollectionUUID> optCollectionUUID;
if (shouldUseUUIDForChunkIndexing) {
optCollectionUUID = targetState->uuid;
@@ -709,7 +690,7 @@ CreateCollectionResponse shardCollection(OperationContext* opCtx,
CreateCollectionResponse shardCollectionLegacy(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& cmdObj,
- bool legacyPath) {
+ bool requestFromCSRS) {
auto request = ShardsvrShardCollectionRequest::parse(
IDLParserErrorContext("_shardsvrShardCollection"), cmdObj);
if (!request.getCollation())
@@ -732,8 +713,12 @@ CreateCollectionResponse shardCollectionLegacy(OperationContext* opCtx,
response = scopedShardCollection.getResponse().get();
} else {
try {
- response = shardCollection(
- opCtx, nss, cmdObj, request, ShardingState::get(opCtx)->shardId(), legacyPath);
+ response = shardCollection(opCtx,
+ nss,
+ cmdObj,
+ request,
+ ShardingState::get(opCtx)->shardId(),
+ !requestFromCSRS);
} catch (const DBException& e) {
scopedShardCollection.emplaceResponse(e.toStatus());
throw;
diff --git a/src/mongo/db/s/shard_collection_legacy.h b/src/mongo/db/s/shard_collection_legacy.h
index 2cf109feabf..f245c9265f1 100644
--- a/src/mongo/db/s/shard_collection_legacy.h
+++ b/src/mongo/db/s/shard_collection_legacy.h
@@ -34,9 +34,14 @@
namespace mongo {
+/**
+ * Executes the legacy path of shard collection. If requestFromCSRS is false, then the checks and
+ * operations performed by the config server (like taking the distributed lock on the database and
+ * the collection) must be done on the shard instead.
+ */
CreateCollectionResponse shardCollectionLegacy(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& cmdObj,
- bool mustTakeDistLock);
+ bool requestFromCSRS);
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_ddl_coordinator.cpp b/src/mongo/db/s/sharding_ddl_coordinator.cpp
index ca117a3657b..5a28e117aea 100644
--- a/src/mongo/db/s/sharding_ddl_coordinator.cpp
+++ b/src/mongo/db/s/sharding_ddl_coordinator.cpp
@@ -44,28 +44,29 @@ ShardingDDLCoordinator::ShardingDDLCoordinator(OperationContext* opCtx, const Na
: _nss(ns), _forwardableOpMetadata(opCtx){};
SemiFuture<void> ShardingDDLCoordinator::run(OperationContext* opCtx) {
- // Check that the operation context has a database version for this namespace
- const auto clientDbVersion = OperationShardingState::get(opCtx).getDbVersion(_nss.db());
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Request sent without attaching database version",
- clientDbVersion);
+ if (!_nss.isConfigDB()) {
+ // Check that the operation context has a database version for this namespace
+ const auto clientDbVersion = OperationShardingState::get(opCtx).getDbVersion(_nss.db());
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Request sent without attaching database version",
+ clientDbVersion);
- // Checks that this is the primary shard for the namespace's db
- const auto dbPrimaryShardId = [&]() {
- Lock::DBLock dbWriteLock(opCtx, _nss.db(), MODE_IS);
- auto dss = DatabaseShardingState::get(opCtx, _nss.db());
- auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss);
- // The following call will also ensure that the database version matches
- return dss->getDatabaseInfo(opCtx, dssLock).getPrimary();
- }();
+ // Checks that this is the primary shard for the namespace's db
+ const auto dbPrimaryShardId = [&]() {
+ Lock::DBLock dbWriteLock(opCtx, _nss.db(), MODE_IS);
+ auto dss = DatabaseShardingState::get(opCtx, _nss.db());
+ auto dssLock = DatabaseShardingState::DSSLock::lockShared(opCtx, dss);
+ // The following call will also ensure that the database version matches
+ return dss->getDatabaseInfo(opCtx, dssLock).getPrimary();
+ }();
- const auto thisShardId = ShardingState::get(opCtx)->shardId();
-
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "This is not the primary shard for db " << _nss.db()
- << " expected: " << dbPrimaryShardId << " shardId: " << thisShardId,
- dbPrimaryShardId == thisShardId);
+ const auto thisShardId = ShardingState::get(opCtx)->shardId();
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "This is not the primary shard for db " << _nss.db()
+ << " expected: " << dbPrimaryShardId << " shardId: " << thisShardId,
+ dbPrimaryShardId == thisShardId);
+ }
return runImpl(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
}
diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp
index 3b29130e2ff..9e0891dfb13 100644
--- a/src/mongo/db/s/sharding_ddl_util.cpp
+++ b/src/mongo/db/s/sharding_ddl_util.cpp
@@ -206,5 +206,34 @@ void checkShardedRenamePreconditions(OperationContext* opCtx,
tags.empty());
}
+boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& key,
+ const BSONObj& collation,
+ bool unique) {
+ auto cm = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, nss));
+
+ if (!cm.isSharded()) {
+ return boost::none;
+ }
+
+ auto defaultCollator =
+ cm.getDefaultCollator() ? cm.getDefaultCollator()->getSpec().toBSON() : BSONObj();
+
+ // If the collection is already sharded, fail if the deduced options in this request do not
+ // match the options the collection was originally sharded with.
+ uassert(ErrorCodes::AlreadyInitialized,
+ str::stream() << "sharding already enabled for collection " << nss,
+ SimpleBSONObjComparator::kInstance.evaluate(cm.getShardKeyPattern().toBSON() == key) &&
+ SimpleBSONObjComparator::kInstance.evaluate(defaultCollator == collation) &&
+ cm.isUnique() == unique);
+
+ CreateCollectionResponse response(cm.getVersion());
+ response.setCollectionUUID(cm.getUUID());
+ return response;
+}
+
} // namespace sharding_ddl_util
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_ddl_util.h b/src/mongo/db/s/sharding_ddl_util.h
index c98c7a50a64..74b4fce3c57 100644
--- a/src/mongo/db/s/sharding_ddl_util.h
+++ b/src/mongo/db/s/sharding_ddl_util.h
@@ -29,6 +29,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
namespace mongo {
namespace sharding_ddl_util {
@@ -61,5 +62,18 @@ void checkShardedRenamePreconditions(OperationContext* opCtx,
const NamespaceString& toNss,
const bool dropTarget);
+/**
+ * Throws an exception if the collection is already sharded with different options.
+ *
+ * If the collection is already sharded with the same options, returns the existing collection's
+ * full spec, else returns boost::none.
+ */
+boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& key,
+ const BSONObj& collation,
+ bool unique);
+
} // namespace sharding_ddl_util
} // namespace mongo
diff --git a/src/mongo/db/s/shardsvr_create_collection_command.cpp b/src/mongo/db/s/shardsvr_create_collection_command.cpp
index 4aa3d632b3c..23efc3558d3 100644
--- a/src/mongo/db/s/shardsvr_create_collection_command.cpp
+++ b/src/mongo/db/s/shardsvr_create_collection_command.cpp
@@ -36,11 +36,15 @@
#include "mongo/db/commands.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
+#include "mongo/db/s/create_collection_coordinator.h"
+#include "mongo/db/s/dist_lock_manager.h"
#include "mongo/db/s/shard_collection_legacy.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/logv2/log.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/shard_collection_gen.h"
#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
+#include "mongo/s/sharded_collections_ddl_parameters_gen.h"
namespace mongo {
namespace {
@@ -158,6 +162,23 @@ CreateCollectionResponse createCollectionLegacy(OperationContext* opCtx,
return shardCollectionLegacy(opCtx, nss, shardsvrShardCollectionRequest.toBSON(), false);
}
+CreateCollectionResponse createCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardsvrCreateCollection& request) {
+ uassert(
+ ErrorCodes::NotImplemented, "create collection not implemented yet", request.getShardKey());
+
+ DistLockManager::ScopedDistLock dbDistLock(uassertStatusOK(DistLockManager::get(opCtx)->lock(
+ opCtx, nss.db(), "shardCollection", DistLockManager::kDefaultLockTimeout)));
+ DistLockManager::ScopedDistLock collDistLock(uassertStatusOK(DistLockManager::get(opCtx)->lock(
+ opCtx, nss.ns(), "shardCollection", DistLockManager::kDefaultLockTimeout)));
+
+ auto createCollectionCoordinator =
+ std::make_shared<CreateCollectionCoordinator>(opCtx, request);
+ createCollectionCoordinator->run(opCtx).get(opCtx);
+ return createCollectionCoordinator->getResultOnSuccess();
+}
+
class ShardsvrCreateCollectionCommand final : public TypedCommand<ShardsvrCreateCollectionCommand> {
public:
using Request = ShardsvrCreateCollection;
@@ -194,7 +215,18 @@ public:
"Create Collection path has not been implemented",
request().getShardKey());
- return createCollectionLegacy(opCtx, ns(), request());
+ if (feature_flags::gShardingFullDDLSupport.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ LOGV2_DEBUG(
+ 5277910, 1, "Running new create collection procedure", "namespace"_attr = ns());
+ return createCollection(opCtx, ns(), request());
+ } else {
+ LOGV2_DEBUG(5277911,
+ 1,
+ "Running legacy create collection procedure",
+ "namespace"_attr = ns());
+ return createCollectionLegacy(opCtx, ns(), request());
+ }
}
private:
diff --git a/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp
new file mode 100644
index 00000000000..243d0d84737
--- /dev/null
+++ b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/s/migration_destination_manager.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/request_types/sharded_ddl_commands_gen.h"
+
+namespace mongo {
+namespace {
+
+class ShardsvrCreateCollectionParticipantCommand final
+ : public TypedCommand<ShardsvrCreateCollectionParticipantCommand> {
+public:
+ using Request = ShardsvrCreateCollectionParticipant;
+
+ bool adminOnly() const override {
+ return false;
+ }
+
+ std::string help() const override {
+ return "Internal command. Do not call directly. Creates a collection.";
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ auto const shardingState = ShardingState::get(opCtx);
+ uassertStatusOK(shardingState->canAcceptShardedCommands());
+
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "_shardsvrCreateCollectionParticipant must be called with "
+ "majority writeConcern, got "
+ << request().toBSON(BSONObj()),
+ opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority);
+
+
+ MigrationDestinationManager::cloneCollectionIndexesAndOptions(
+ opCtx,
+ ns(),
+ {*request().getCollectionUUID(),
+ request().getIndexes(),
+ request().getIdIndex(),
+ request().getOptions()});
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return request().getNamespace();
+ }
+
+ bool supportsWriteConcern() const override {
+ return true;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+ };
+
+} shardsvrCreateCollectionCommand;
+
+} // namespace
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
index fb57d689248..41dfe3cdf33 100644
--- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
+++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp
@@ -53,6 +53,45 @@
namespace mongo {
namespace {
+std::vector<AsyncRequestsSender::Request> buildUnshardedRequestsForAllShards(
+ OperationContext* opCtx, std::vector<ShardId> shardIds, const BSONObj& cmdObj) {
+ auto cmdToSend = cmdObj;
+ appendShardVersion(cmdToSend, ChunkVersion::UNSHARDED());
+
+ std::vector<AsyncRequestsSender::Request> requests;
+ for (auto&& shardId : shardIds)
+ requests.emplace_back(std::move(shardId), cmdToSend);
+
+ return requests;
+}
+
+AsyncRequestsSender::Response executeCommandAgainstDatabasePrimaryOrFirstShard(
+ OperationContext* opCtx,
+ StringData dbName,
+ const CachedDatabaseInfo& dbInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ Shard::RetryPolicy retryPolicy) {
+ ShardId shardId;
+ if (dbName == NamespaceString::kConfigDb) {
+ auto shardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
+ uassert(ErrorCodes::IllegalOperation, "there are no shards to target", !shardIds.empty());
+ std::sort(shardIds.begin(), shardIds.end());
+ shardId = shardIds[0];
+ } else {
+ shardId = dbInfo.primaryId();
+ }
+
+ auto responses =
+ gatherResponses(opCtx,
+ dbName,
+ readPref,
+ retryPolicy,
+ buildUnshardedRequestsForAllShards(
+ opCtx, {shardId}, appendDbVersionIfPresent(cmdObj, dbInfo)));
+ return std::move(responses.front());
+}
+
class ShardCollectionCmd : public BasicCommand {
public:
ShardCollectionCmd() : BasicCommand("shardCollection", "shardcollection") {}
@@ -110,40 +149,31 @@ public:
auto catalogCache = Grid::get(opCtx)->catalogCache();
const auto dbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, nss.db()));
- ShardId shardId;
- if (nss.db() == NamespaceString::kConfigDb) {
- auto shardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
- uassert(
- ErrorCodes::IllegalOperation, "there are no shards to target", !shardIds.empty());
- // Many tests assume the primary shard for configDb will be the shard
- // with the first ID in ascending lexical order
- std::sort(shardIds.begin(), shardIds.end());
- shardId = shardIds[0];
- } else {
- shardId = dbInfo.primaryId();
- }
-
- auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
-
- auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
+ auto cmdResponse = executeCommandAgainstDatabasePrimaryOrFirstShard(
opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- nss.db().toString(),
+ nss.db(),
+ dbInfo,
CommandHelpers::appendMajorityWriteConcern(
CommandHelpers::appendGenericCommandArgs(cmdObj, shardsvrCollRequest.toBSON({})),
opCtx->getWriteConcern()),
- Shard::RetryPolicy::kIdempotent));
- uassertStatusOK(cmdResponse.commandStatus);
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent);
- CommandHelpers::filterCommandReplyForPassthrough(cmdResponse.response, &result);
- result.append("collectionsharded", nss.toString());
+ const auto remoteResponse = uassertStatusOK(cmdResponse.swResponse);
+ uassertStatusOK(getStatusFromCommandResult(remoteResponse.data));
auto createCollResp = CreateCollectionResponse::parse(
- IDLParserErrorContext("createCollection"), cmdResponse.response);
+ IDLParserErrorContext("createCollection"), remoteResponse.data);
catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
nss, createCollResp.getCollectionVersion(), dbInfo.primaryId());
+ // Add only collectionsharded as a response parameter and remove the version to maintain the
+ // same format as before.
+ result.append("collectionsharded", nss.toString());
+ auto resultObj =
+ remoteResponse.data.removeField(CreateCollectionResponse::kCollectionVersionFieldName);
+ CommandHelpers::filterCommandReplyForPassthrough(resultObj, &result);
return true;
}
diff --git a/src/mongo/s/request_types/sharded_ddl_commands.idl b/src/mongo/s/request_types/sharded_ddl_commands.idl
index 61f190a70f2..005c713107d 100644
--- a/src/mongo/s/request_types/sharded_ddl_commands.idl
+++ b/src/mongo/s/request_types/sharded_ddl_commands.idl
@@ -32,10 +32,10 @@ global:
imports:
- "mongo/db/drop_database.idl"
- "mongo/db/commands/rename_collection.idl"
+ - "mongo/db/keypattern.idl"
- "mongo/idl/basic_types.idl"
- - "mongo/s/database_version.idl"
- "mongo/s/chunk_version.idl"
- - "mongo/db/keypattern.idl"
+ - "mongo/s/database_version.idl"
structs:
RenameCollectionResponse:
@@ -93,6 +93,29 @@ commands:
type: object
description: "The collation to use for the shard key index."
optional: true
+
+ _shardsvrCreateCollectionParticipant:
+ command_name: _shardsvrCreateCollectionParticipant
+ cpp_name: ShardsvrCreateCollectionParticipant
+ description: "Command to create a collection on participant shards, when called, assumes the primary shard is under the critical section for that namespace."
+ strict: false
+ namespace: concatenate_with_db
+ api_version: ""
+ fields:
+ indexes:
+ type: array<object>
+ description: "Collection indexes."
+ options:
+ type: object
+ description: "Collection options."
+ collectionUUID:
+ type: uuid
+ description: "Collection uuid."
+ optional: true
+ idIndex:
+ type: object
+ description: "Id index."
+
_shardsvrDropDatabase:
description: "Internal command sent to the primary shard of a database to drop it."