summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2018-07-27 09:56:07 -0400
committerCheahuychou Mao <cheahuychou.mao@mongodb.com>2018-07-31 17:50:44 -0400
commitd83b73ea2db96ccbcf5f2a0710f360f88896ab9c (patch)
tree452fd6c857dd2e9398dcfc66fac3dc55d4ff90fb /src
parent78dec3622268ad27bb855eda4c6a4ed345412fd9 (diff)
downloadmongo-d83b73ea2db96ccbcf5f2a0710f360f88896ab9c.tar.gz
SERVER-14394 Create initial hashed shard key chunks directly on shards
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp20
-rw-r--r--src/mongo/db/s/config/configsvr_shard_collection_command.cpp58
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp105
-rw-r--r--src/mongo/db/s/config/initial_split_policy.h28
-rw-r--r--src/mongo/db/s/config/initial_split_policy_test.cpp60
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp98
-rw-r--r--src/mongo/db/s/shardsvr_shard_collection.cpp209
-rw-r--r--src/mongo/s/commands/cluster_commands_helpers.cpp7
-rw-r--r--src/mongo/s/commands/cluster_commands_helpers.h14
-rw-r--r--src/mongo/s/request_types/shard_collection.idl8
10 files changed, 313 insertions, 294 deletions
diff --git a/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp b/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp
index e49b7ab3ae9..b51549e5283 100644
--- a/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp
+++ b/src/mongo/db/s/clone_collection_options_from_primary_shard_cmd.cpp
@@ -30,20 +30,10 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/audit.h"
-#include "mongo/db/auth/action_set.h"
-#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/auth/privilege.h"
-#include "mongo/db/catalog/index_create.h"
-#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/operation_context.h"
#include "mongo/db/s/migration_destination_manager.h"
-#include "mongo/db/service_context.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
+#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/s/request_types/clone_collection_options_from_primary_shard_gen.h"
#include "mongo/s/shard_id.h"
#include "mongo/util/log.h"
@@ -63,11 +53,15 @@ public:
auto primaryShardId = ShardId(request().getPrimaryShard().toString());
MigrationDestinationManager::cloneCollectionIndexesAndOptions(
opCtx, ns(), primaryShardId);
+
+ // At the time this command is invoked, the config server primary has already written
+ // the collection's routing metadata, so sync from the config server
+ forceShardFilteringMetadataRefresh(opCtx, ns());
}
private:
bool supportsWriteConcern() const override {
- return false;
+ return true;
}
NamespaceString ns() const override {
@@ -93,7 +87,7 @@ public:
}
bool adminOnly() const override {
- return true;
+ return false;
}
} CloneCollectionOptionsFromPrimaryShardCmd;
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
index 458495012b3..581c2459aeb 100644
--- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
@@ -440,8 +440,8 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx,
/**
* Migrates the initial "big chunks" from the primary shard to spread them evenly across the shards.
*
- * If 'allSplits' is not empty, additionally splits each "big chunk" into smaller chunks using the
- * points in 'allSplits.'
+ * If 'finalSplitPoints' is not empty, additionally splits each "big chunk" into smaller chunks
+ * using the points in 'finalSplitPoints.'
*/
void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
const NamespaceString& nss,
@@ -449,7 +449,7 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
const std::vector<ShardId>& shardIds,
bool isEmpty,
const ShardKeyPattern& shardKeyPattern,
- const std::vector<BSONObj>& allSplits) {
+ const std::vector<BSONObj>& finalSplitPoints) {
auto catalogCache = Grid::get(opCtx)->catalogCache();
if (!shardKeyPattern.isHashedPattern()) {
@@ -505,7 +505,7 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
}
}
- if (allSplits.empty()) {
+ if (finalSplitPoints.empty()) {
return;
}
@@ -517,14 +517,14 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
routingInfo.cm());
chunkManager = routingInfo.cm();
- // Subdivide the big chunks by splitting at each of the points in "allSplits"
+ // Subdivide the big chunks by splitting at each of the points in "finalSplitPoints"
// that we haven't already split by.
boost::optional<Chunk> currentChunk(
- chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[0]));
+ chunkManager->findIntersectingChunkWithSimpleCollation(finalSplitPoints[0]));
std::vector<BSONObj> subSplits;
- for (unsigned i = 0; i <= allSplits.size(); i++) {
- if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) {
+ for (unsigned i = 0; i <= finalSplitPoints.size(); i++) {
+ if (i == finalSplitPoints.size() || !currentChunk->containsKey(finalSplitPoints[i])) {
if (!subSplits.empty()) {
auto splitStatus = shardutil::splitChunkAtMultiplePoints(
opCtx,
@@ -543,12 +543,12 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
subSplits.clear();
}
- if (i < allSplits.size()) {
+ if (i < finalSplitPoints.size()) {
currentChunk.emplace(
- chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[i]));
+ chunkManager->findIntersectingChunkWithSimpleCollation(finalSplitPoints[i]));
}
} else {
- BSONObj splitPoint(allSplits[i]);
+ BSONObj splitPoint(finalSplitPoints[i]);
// Do not split on the boundaries
if (currentChunk->getMin().woCompare(splitPoint) == 0) {
@@ -791,22 +791,12 @@ public:
result << "collectionUUID" << *uuid;
}
- // Make sure the cached metadata for the collection knows that we are now sharded
- catalogCache->invalidateShardedCollection(nss);
-
- // Free the distlocks to allow the splits and migrations below to proceed.
- collDistLock.reset();
- dbDistLock.reset();
- lk.unlock();
-
- // Step 7. Migrate initial chunks to distribute them across shards.
- migrateAndFurtherSplitInitialChunks(opCtx,
- nss,
- numShards,
- shardIds,
- isEmpty,
- shardKeyPattern,
- std::move(shardCollResponse.getAllSplits()));
+ auto routingInfo =
+ uassertStatusOK(catalogCache->getCollectionRoutingInfoWithRefresh(opCtx, nss));
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Collection was successfully written as sharded but got dropped before it "
+ "could be evenly distributed",
+ routingInfo.cm());
return true;
}
@@ -823,18 +813,18 @@ public:
}
// Step 5.
- std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these
- std::vector<BSONObj> allSplits; // all of the initial desired split points
+ std::vector<BSONObj> initialSplitPoints; // there will be at most numShards-1 of these
+ std::vector<BSONObj> finalSplitPoints; // all of the desired split points
if (request.getInitialSplitPoints()) {
- initSplits = std::move(*request.getInitialSplitPoints());
+ initialSplitPoints = std::move(*request.getInitialSplitPoints());
} else {
InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection(
shardKeyPattern,
isEmpty,
numShards,
request.getNumInitialChunks(),
- &initSplits,
- &allSplits);
+ &initialSplitPoints,
+ &finalSplitPoints);
}
LOG(0) << "CMD: shardcollection: " << cmdObj;
@@ -854,7 +844,7 @@ public:
shardKeyPattern,
*request.getCollation(),
request.getUnique(),
- initSplits,
+ initialSplitPoints,
distributeInitialChunks,
primaryShardId);
result << "collectionsharded" << nss.ns();
@@ -871,7 +861,7 @@ public:
// Step 7. Migrate initial chunks to distribute them across shards.
migrateAndFurtherSplitInitialChunks(
- opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, allSplits);
+ opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, finalSplitPoints);
return true;
}
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index 879a7b90339..ca16683c2ad 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -32,6 +32,13 @@
#include "mongo/db/s/config/initial_split_policy.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/client/read_preference.h"
+#include "mongo/db/logical_clock.h"
+#include "mongo/s/balancer_configuration.h"
+#include "mongo/s/catalog/sharding_catalog_client_impl.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/shard_util.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -105,7 +112,8 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle
const ShardId& databasePrimaryShardId,
const Timestamp& validAfter,
const std::vector<BSONObj>& splitPoints,
- const std::vector<ShardId>& shardIds) {
+ const std::vector<ShardId>& shardIds,
+ const int numContiguousChunksPerShard) {
invariant(!shardIds.empty());
ChunkVersion version(1, 0, OID::gen());
@@ -125,7 +133,11 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle
// It's possible there are no split points or fewer split points than total number of
// shards, and we need to be sure that at least one chunk is placed on the primary shard
- const ShardId shardId = (i == 0) ? databasePrimaryShardId : shardIds[i % shardIds.size()];
+ const ShardId shardId = (i == 0 && splitPoints.size() + 1 < shardIds.size())
+ ? databasePrimaryShardId
+ : shardIds[(i / numContiguousChunksPerShard) % shardIds.size()];
+
+ // const ShardId shardId = shardIds[(i / numContiguousChunksPerShard) % shardIds.size()];
chunks.emplace_back(nss, ChunkRange(min, max), version, shardId);
auto& chunk = chunks.back();
@@ -137,4 +149,93 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardColle
return {std::move(chunks)};
}
+InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::writeFirstChunksToConfig(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardKeyPattern& shardKeyPattern,
+ const ShardId& primaryShardId,
+ const std::vector<BSONObj>& splitPoints,
+ const bool distributeInitialChunks,
+ const int numContiguousChunksPerShard) {
+ const auto& keyPattern = shardKeyPattern.getKeyPattern();
+
+ std::vector<BSONObj> finalSplitPoints;
+ std::vector<ShardId> shardIds;
+
+ if (splitPoints.empty()) {
+ // If no split points were specified use the shard's data distribution to determine them
+ auto primaryShard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId));
+
+ auto result = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryPreferred},
+ nss.db().toString(),
+ BSON("count" << nss.coll()),
+ Shard::RetryPolicy::kIdempotent));
+
+ long long numObjects = 0;
+ uassertStatusOK(result.commandStatus);
+ uassertStatusOK(bsonExtractIntegerField(result.response, "n", &numObjects));
+
+ // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of
+ // the splitVector command and affects the number of chunks returned, has been loaded.
+ uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx));
+
+ if (numObjects > 0) {
+ finalSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
+ opCtx,
+ primaryShardId,
+ nss,
+ shardKeyPattern,
+ ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()),
+ Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
+ 0));
+ }
+
+ // If docs already exist for the collection, must use primary shard,
+ // otherwise defer to passed-in distribution option.
+ if (numObjects == 0 && distributeInitialChunks) {
+ Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
+ } else {
+ shardIds.push_back(primaryShardId);
+ }
+ } else {
+ // Make sure points are unique and ordered
+ auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
+
+ for (const auto& splitPoint : splitPoints) {
+ orderedPts.insert(splitPoint);
+ }
+
+ for (const auto& splitPoint : orderedPts) {
+ finalSplitPoints.push_back(splitPoint);
+ }
+
+ if (distributeInitialChunks) {
+ Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
+ } else {
+ shardIds.push_back(primaryShardId);
+ }
+ }
+
+ auto initialChunks = InitialSplitPolicy::generateShardCollectionInitialChunks(
+ nss,
+ shardKeyPattern,
+ primaryShardId,
+ LogicalClock::get(opCtx)->getClusterTime().asTimestamp(),
+ finalSplitPoints,
+ shardIds,
+ numContiguousChunksPerShard);
+ for (const auto& chunk : initialChunks.chunks) {
+ uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument(
+ opCtx,
+ ChunkType::ConfigNS,
+ chunk.toConfigBSON(),
+ ShardingCatalogClient::kMajorityWriteConcern));
+ }
+
+ return initialChunks;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h
index 3379554e870..521dbf123d6 100644
--- a/src/mongo/db/s/config/initial_split_policy.h
+++ b/src/mongo/db/s/config/initial_split_policy.h
@@ -68,9 +68,19 @@ public:
};
/**
- * Produces the the initial chunks that need to be written for a collection which is being
+ * Produces the initial chunks that need to be written for a collection which is being
* newly-sharded. The function performs some basic validation of the input parameters, but there
* is no checking whether the collection contains any data or not.
+ *
+ * Chunks are assigned to a shard in a round-robin fashion, numContiguousChunksPerShard (k)
+ * chunks at a time. For example, the first k chunks are assigned to the first available shard,
+ * and the next k chunks are assigned to the second available shard and so on.
+ * numContiguousChunksPerShard should only be > 1 when we do not pre-split the range
+ * into larger chunks and then split the resulting chunks on the destination shards as in
+ * configSvrShardCollection, thus should be equal the number of final split points + 1 divided
+ * by the number of initial split points + 1. It serves to preserve the ordering/contigousness
+ * of chunks when split by shardSvrShardCollection so that its yields the exact same shard
+ * assignments as configSvrShardCollection.
*/
static ShardCollectionConfig generateShardCollectionInitialChunks(
const NamespaceString& nss,
@@ -78,7 +88,21 @@ public:
const ShardId& databasePrimaryShardId,
const Timestamp& validAfter,
const std::vector<BSONObj>& splitPoints,
- const std::vector<ShardId>& shardIds);
+ const std::vector<ShardId>& shardIds,
+ const int numContiguousChunksPerShard = 1);
+
+ /**
+ * Creates and writes to the config server the first chunks for a newly sharded collection.
+ * Returns the created chunks.
+ */
+ static ShardCollectionConfig writeFirstChunksToConfig(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardKeyPattern& shardKeyPattern,
+ const ShardId& primaryShardId,
+ const std::vector<BSONObj>& splitPoints,
+ const bool distributeInitialChunks,
+ const int numContiguousChunksPerShard = 1);
};
} // namespace mongo
diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp
index 7ab068feeac..0c0370a37e5 100644
--- a/src/mongo/db/s/config/initial_split_policy_test.cpp
+++ b/src/mongo/db/s/config/initial_split_policy_test.cpp
@@ -79,8 +79,8 @@ void checkCalculatedHashedSplitPoints(bool isHashed,
numInitialChunks,
&initialSplitPoints,
&finalSplitPoints);
- assertBSONObjVectorsAreEqual(initialSplitPoints, *expectedInitialSplitPoints);
- assertBSONObjVectorsAreEqual(finalSplitPoints, *expectedFinalSplitPoints);
+ assertBSONObjVectorsAreEqual(*expectedInitialSplitPoints, initialSplitPoints);
+ assertBSONObjVectorsAreEqual(*expectedFinalSplitPoints, finalSplitPoints);
}
TEST(CalculateHashedSplitPointsTest, EmptyCollectionMoreChunksThanShards) {
@@ -128,6 +128,14 @@ TEST(CalculateHashedSplitPointsTest, NotHashedWithInitialSplitsFails) {
class GenerateInitialSplitChunksTest : public unittest::Test {
public:
+ const std::vector<BSONObj>& hashedChunkBounds() {
+ return _hashedChunkBounds;
+ }
+
+ const std::vector<BSONObj>& hashedSplitPoints() {
+ return _splitPoints;
+ }
+
ChunkType makeChunk(const BSONObj min, const BSONObj max, const ShardId shardId) {
ChunkVersion version(1, 0, OID::gen());
ChunkType chunk(_nss, ChunkRange(min, max), version, shardId);
@@ -155,6 +163,14 @@ private:
const ShardKeyPattern _shardKeyPattern = makeShardKeyPattern(true);
const std::vector<ShardId> _shardIds = {ShardId("testShard0"), ShardId("testShard1")};
const Timestamp _timeStamp{Date_t::now()};
+ const KeyPattern& keyPattern = shardKeyPattern().getKeyPattern();
+ const std::vector<BSONObj> _hashedChunkBounds = {keyPattern.globalMin(),
+ BSON("x" << -4611686018427387902LL),
+ BSON("x" << 0),
+ BSON("x" << 4611686018427387902LL),
+ keyPattern.globalMax()};
+ const std::vector<BSONObj> _splitPoints{_hashedChunkBounds.begin() + 1,
+ _hashedChunkBounds.end() - 1};
};
TEST_F(GenerateInitialSplitChunksTest, NoSplitPoints) {
@@ -170,22 +186,34 @@ TEST_F(GenerateInitialSplitChunksTest, NoSplitPoints) {
}
TEST_F(GenerateInitialSplitChunksTest, SplitPointsMoreThanAvailableShards) {
- const auto& keyPattern = shardKeyPattern().getKeyPattern();
- const std::vector<BSONObj> expectedChunkBounds = {keyPattern.globalMin(),
- BSON("x" << -4611686018427387902LL),
- BSON("x" << 0),
- BSON("x" << 4611686018427387902LL),
- keyPattern.globalMax()};
- const std::vector<BSONObj> splitPoints(expectedChunkBounds.begin() + 1,
- expectedChunkBounds.end() - 1);
const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks(
- nss(), shardKeyPattern(), shardIds()[0], timeStamp(), splitPoints, shardIds());
+ nss(), shardKeyPattern(), shardIds()[0], timeStamp(), hashedSplitPoints(), shardIds());
+
+ ASSERT_EQ(hashedSplitPoints().size() + 1, shardCollectionConfig.chunks.size());
+
+ // chunks should be distributed in a round-robin manner
+ const std::vector<ShardId> expectedShardIds = {
+ ShardId("testShard0"), ShardId("testShard1"), ShardId("testShard0"), ShardId("testShard1")};
+ for (unsigned long i = 0; i < hashedChunkBounds().size() - 1; ++i) {
+ const auto expectedChunk =
+ makeChunk(hashedChunkBounds()[i], hashedChunkBounds()[i + 1], expectedShardIds[i]);
+ ASSERT_BSONOBJ_EQ(expectedChunk.toShardBSON().removeField("lastmod"),
+ shardCollectionConfig.chunks[i].toShardBSON().removeField("lastmod"));
+ }
+}
+
+TEST_F(GenerateInitialSplitChunksTest, SplitPointsNumContiguousChunksPerShardsGreaterThanOne) {
+ const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks(
+ nss(), shardKeyPattern(), shardIds()[0], timeStamp(), hashedSplitPoints(), shardIds(), 2);
+
+ ASSERT_EQ(hashedSplitPoints().size() + 1, shardCollectionConfig.chunks.size());
- ASSERT_EQ(splitPoints.size() + 1, shardCollectionConfig.chunks.size());
- for (unsigned long i = 0; i < expectedChunkBounds.size() - 1; ++i) {
- // chunks should be distributed in a round-robin manner
- const auto expectedChunk = makeChunk(
- expectedChunkBounds[i], expectedChunkBounds[i + 1], shardIds()[i % shardIds().size()]);
+ // chunks should be distributed in a round-robin manner two chunks at a time
+ const std::vector<ShardId> expectedShardIds = {
+ ShardId("testShard0"), ShardId("testShard0"), ShardId("testShard1"), ShardId("testShard1")};
+ for (unsigned long i = 0; i < hashedChunkBounds().size() - 1; ++i) {
+ const auto expectedChunk =
+ makeChunk(hashedChunkBounds()[i], hashedChunkBounds()[i + 1], expectedShardIds[i]);
ASSERT_BSONOBJ_EQ(expectedChunk.toShardBSON().removeField("lastmod"),
shardCollectionConfig.chunks[i].toShardBSON().removeField("lastmod"));
}
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index e01d06d11a9..dd1ffdad078 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -130,96 +130,6 @@ boost::optional<UUID> checkCollectionOptions(OperationContext* opCtx,
} // namespace
-/**
- * Creates and writes to the config server the first chunks for a newly sharded collection. Returns
- * the version generated for the collection.
- */
-InitialSplitPolicy::ShardCollectionConfig createFirstChunks(OperationContext* opCtx,
- const NamespaceString& nss,
- const ShardKeyPattern& shardKeyPattern,
- const ShardId& primaryShardId,
- const std::vector<BSONObj>& initPoints,
- const bool distributeInitialChunks) {
- const auto& keyPattern = shardKeyPattern.getKeyPattern();
-
- std::vector<BSONObj> splitPoints;
- std::vector<ShardId> shardIds;
-
- if (initPoints.empty()) {
- // If no split points were specified use the shard's data distribution to determine them
- auto primaryShard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId));
-
- auto result = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryPreferred},
- nss.db().toString(),
- BSON("count" << nss.coll()),
- Shard::RetryPolicy::kIdempotent));
-
- long long numObjects = 0;
- uassertStatusOK(result.commandStatus);
- uassertStatusOK(bsonExtractIntegerField(result.response, "n", &numObjects));
-
- // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of
- // the splitVector command and affects the number of chunks returned, has been loaded.
- uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx));
-
- if (numObjects > 0) {
- splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
- opCtx,
- primaryShardId,
- nss,
- shardKeyPattern,
- ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()),
- Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- 0));
- }
-
- // If docs already exist for the collection, must use primary shard,
- // otherwise defer to passed-in distribution option.
- if (numObjects == 0 && distributeInitialChunks) {
- Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
- } else {
- shardIds.push_back(primaryShardId);
- }
- } else {
- // Make sure points are unique and ordered
- auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
-
- for (const auto& initPoint : initPoints) {
- orderedPts.insert(initPoint);
- }
-
- for (const auto& initPoint : orderedPts) {
- splitPoints.push_back(initPoint);
- }
-
- if (distributeInitialChunks) {
- Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
- } else {
- shardIds.push_back(primaryShardId);
- }
- }
-
- auto initialChunks = InitialSplitPolicy::generateShardCollectionInitialChunks(
- nss,
- shardKeyPattern,
- primaryShardId,
- LogicalClock::get(opCtx)->getClusterTime().asTimestamp(),
- splitPoints,
- shardIds);
- for (const auto& chunk : initialChunks.chunks) {
- uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument(
- opCtx,
- ChunkType::ConfigNS,
- chunk.toConfigBSON(),
- ShardingCatalogClient::kMajorityWriteConcern));
- }
-
- return initialChunks;
-}
-
void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) {
BSONObjBuilder countBuilder;
countBuilder.append("count", ChunkType::ConfigNS.coll());
@@ -455,7 +365,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
const ShardKeyPattern& fieldsAndOrder,
const BSONObj& defaultCollation,
bool unique,
- const vector<BSONObj>& initPoints,
+ const vector<BSONObj>& splitPoints,
const bool distributeInitialChunks,
const ShardId& dbPrimaryShardId) {
const auto catalogClient = Grid::get(opCtx)->catalogClient();
@@ -475,7 +385,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
uuid->appendToBuilder(&collectionDetail, "uuid");
}
collectionDetail.append("primary", primaryShard->toString());
- collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1));
+ collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1));
uassertStatusOK(catalogClient->logChange(opCtx,
"shardCollection.start",
nss.ns(),
@@ -490,8 +400,8 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
->makeFromBSON(defaultCollation));
}
- const auto initialChunks = createFirstChunks(
- opCtx, nss, fieldsAndOrder, dbPrimaryShardId, initPoints, distributeInitialChunks);
+ const auto initialChunks = InitialSplitPolicy::writeFirstChunksToConfig(
+ opCtx, nss, fieldsAndOrder, dbPrimaryShardId, splitPoints, distributeInitialChunks);
{
CollectionType coll;
diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp
index 10a9b5e1d8e..ec2528b6835 100644
--- a/src/mongo/db/s/shardsvr_shard_collection.cpp
+++ b/src/mongo/db/s/shardsvr_shard_collection.cpp
@@ -32,7 +32,6 @@
#include "mongo/bson/simple_bsonelement_comparator.h"
#include "mongo/bson/util/bson_extract.h"
-#include "mongo/client/connpool.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_session.h"
@@ -56,7 +55,7 @@
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
-#include "mongo/s/request_types/set_shard_version_request.h"
+#include "mongo/s/request_types/clone_collection_options_from_primary_shard_gen.h"
#include "mongo/s/request_types/shard_collection_gen.h"
#include "mongo/s/shard_util.h"
#include "mongo/util/log.h"
@@ -139,11 +138,11 @@ BSONObj makeCreateIndexesCmd(const NamespaceString& nss,
*
* If the collection is empty and no index on the shard key exists, creates the required index.
*/
-void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& proposedKey,
- const ShardKeyPattern& shardKeyPattern,
- const ShardsvrShardCollection& request) {
+void createCollectionOrValidateExisting(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& proposedKey,
+ const ShardKeyPattern& shardKeyPattern,
+ const ShardsvrShardCollection& request) {
// The proposed shard key must be validated against the set of existing indexes.
// In particular, we must ensure the following constraints
//
@@ -173,7 +172,7 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx,
DBDirectClient localClient(opCtx);
std::list<BSONObj> indexes = localClient.getIndexSpecs(nss.ns());
- // 1. Verify consistency with existing unique indexes
+ // 1. Verify consistency with existing unique indexes
for (const auto& idx : indexes) {
BSONObj currentKey = idx["key"].embeddedObject();
bool isUnique = idx["unique"].trueValue();
@@ -370,96 +369,6 @@ boost::optional<UUID> getUUIDFromPrimaryShard(OperationContext* opCtx, const Nam
return uassertStatusOK(UUID::parse(collectionInfo["uuid"]));
}
-/**
- * Creates and writes to the config server the first chunks for a newly sharded collection. Returns
- * the version generated for the collection.
- */
-InitialSplitPolicy::ShardCollectionConfig createFirstChunks(OperationContext* opCtx,
- const NamespaceString& nss,
- const ShardKeyPattern& shardKeyPattern,
- const ShardId& primaryShardId,
- const std::vector<BSONObj>& initPoints,
- const bool distributeInitialChunks) {
- const auto& keyPattern = shardKeyPattern.getKeyPattern();
-
- std::vector<BSONObj> splitPoints;
- std::vector<ShardId> shardIds;
-
- if (initPoints.empty()) {
- // If no split points were specified use the shard's data distribution to determine them
- auto primaryShard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId));
-
- auto result = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryPreferred},
- nss.db().toString(),
- BSON("count" << nss.coll()),
- Shard::RetryPolicy::kIdempotent));
-
- long long numObjects = 0;
- uassertStatusOK(result.commandStatus);
- uassertStatusOK(bsonExtractIntegerField(result.response, "n", &numObjects));
-
- // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of
- // the splitVector command and affects the number of chunks returned, has been loaded.
- uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx));
-
- if (numObjects > 0) {
- splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
- opCtx,
- primaryShardId,
- nss,
- shardKeyPattern,
- ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()),
- Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- 0));
- }
-
- // If docs already exist for the collection, must use primary shard,
- // otherwise defer to passed-in distribution option.
- if (numObjects == 0 && distributeInitialChunks) {
- Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
- } else {
- shardIds.push_back(primaryShardId);
- }
- } else {
- // Make sure points are unique and ordered
- auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
-
- for (const auto& initPoint : initPoints) {
- orderedPts.insert(initPoint);
- }
-
- for (const auto& initPoint : orderedPts) {
- splitPoints.push_back(initPoint);
- }
-
- if (distributeInitialChunks) {
- Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
- } else {
- shardIds.push_back(primaryShardId);
- }
- }
-
- auto initialChunks = InitialSplitPolicy::generateShardCollectionInitialChunks(
- nss,
- shardKeyPattern,
- primaryShardId,
- LogicalClock::get(opCtx)->getClusterTime().asTimestamp(),
- splitPoints,
- shardIds);
- for (const auto& chunk : initialChunks.chunks) {
- uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument(
- opCtx,
- ChunkType::ConfigNS,
- chunk.toConfigBSON(),
- ShardingCatalogClient::kMajorityWriteConcern));
- }
-
- return initialChunks;
-}
-
void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) {
BSONObjBuilder countBuilder;
countBuilder.append("count", ChunkType::ConfigNS.coll());
@@ -501,13 +410,15 @@ void shardCollection(OperationContext* opCtx,
const ShardKeyPattern& fieldsAndOrder,
const BSONObj& defaultCollation,
bool unique,
- const std::vector<BSONObj>& initPoints,
- const bool distributeInitialChunks,
- const ShardId& dbPrimaryShardId) {
+ const std::vector<BSONObj>& splitPoints,
+ const bool fromMapReduce,
+ const ShardId& dbPrimaryShardId,
+ const int numContiguousChunksPerShard) {
const auto catalogClient = Grid::get(opCtx)->catalogClient();
const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId));
+ const bool distributeChunks = fromMapReduce || fieldsAndOrder.isHashedPattern();
// Fail if there are partially written chunks from a previous failed shardCollection.
checkForExistingChunks(opCtx, nss);
@@ -521,7 +432,7 @@ void shardCollection(OperationContext* opCtx,
uuid->appendToBuilder(&collectionDetail, "uuid");
}
collectionDetail.append("primary", primaryShard->toString());
- collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1));
+ collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1));
uassertStatusOK(catalogClient->logChange(opCtx,
"shardCollection.start",
nss.ns(),
@@ -536,8 +447,14 @@ void shardCollection(OperationContext* opCtx,
->makeFromBSON(defaultCollation));
}
- const auto initialChunks = createFirstChunks(
- opCtx, nss, fieldsAndOrder, dbPrimaryShardId, initPoints, distributeInitialChunks);
+ const auto initialChunks =
+ InitialSplitPolicy::writeFirstChunksToConfig(opCtx,
+ nss,
+ fieldsAndOrder,
+ dbPrimaryShardId,
+ splitPoints,
+ distributeChunks,
+ numContiguousChunksPerShard);
{
CollectionType coll;
@@ -556,6 +473,51 @@ void shardCollection(OperationContext* opCtx,
forceShardFilteringMetadataRefresh(opCtx, nss);
+ // Create collections on all shards that will receive chunks. We need to do this after we mark
+ // the collection as sharded so that the shards will update their metadata correctly. We do not
+ // want to do this for mapReduce.
+ if (!fromMapReduce) {
+ std::vector<AsyncRequestsSender::Request> requests;
+ for (const auto& chunk : initialChunks.chunks) {
+ if (chunk.getShard() == dbPrimaryShardId)
+ continue;
+
+ CloneCollectionOptionsFromPrimaryShard cloneCollectionOptionsFromPrimaryShardRequest(
+ nss);
+ cloneCollectionOptionsFromPrimaryShardRequest.setPrimaryShard(
+ dbPrimaryShardId.toString());
+ cloneCollectionOptionsFromPrimaryShardRequest.setDbName(nss.db());
+
+ requests.emplace_back(
+ chunk.getShard(),
+ cloneCollectionOptionsFromPrimaryShardRequest.toBSON(
+ BSON("writeConcern" << ShardingCatalogClient::kMajorityWriteConcern.toBSON())));
+ }
+
+ if (!requests.empty()) {
+ auto responses = gatherResponses(opCtx,
+ nss.db(),
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ requests);
+
+ // If any shards fail to create the collection, fail the entire shardCollection command
+ // (potentially leaving incomplely created sharded collection)
+ for (const auto& response : responses) {
+ auto shardResponse = uassertStatusOKWithContext(
+ std::move(response.swResponse),
+ str::stream() << "Unable to create collection on " << response.shardId);
+ auto status = getStatusFromCommandResult(shardResponse.data);
+ uassertStatusOK(status.withContext(
+ str::stream() << "Unable to create collection on " << response.shardId));
+
+ auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data);
+ uassertStatusOK(wcStatus.withContext(
+ str::stream() << "Unable to create collection on " << response.shardId));
+ }
+ }
+ }
+
catalogClient
->logChange(opCtx,
"shardCollection.end",
@@ -619,9 +581,9 @@ public:
auto proposedKey(request.getKey().getOwned());
ShardKeyPattern shardKeyPattern(proposedKey);
- validateShardKeyAgainstExistingIndexes(opCtx, nss, proposedKey, shardKeyPattern, request);
+ createCollectionOrValidateExisting(opCtx, nss, proposedKey, shardKeyPattern, request);
- // read zone info
+ // Read zone info
auto configServer = Grid::get(opCtx)->shardRegistry()->getConfigShard();
auto tagStatus =
configServer->exhaustiveFindOnConfig(opCtx,
@@ -634,7 +596,7 @@ public:
uassertStatusOK(tagStatus);
const auto& tagDocList = tagStatus.getValue().docs;
- if (tagDocList.size() > 0) {
+ if (!tagDocList.empty()) {
validateShardKeyAgainstExistingZones(opCtx, proposedKey, shardKeyPattern, tagDocList);
}
@@ -647,6 +609,7 @@ public:
Grid::get(opCtx)->shardRegistry()->reload(opCtx);
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ shardRegistry->reload(opCtx);
DBDirectClient localClient(opCtx);
bool isEmpty = (localClient.count(nss.ns()) == 0);
@@ -657,18 +620,19 @@ public:
// SERVER-35794 TODO: Use zone info to determine which shards should have chunks placed on
// them.
- std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these
- std::vector<BSONObj> allSplits; // all of the initial desired split points
+
+ std::vector<BSONObj> initialSplitPoints;
+ std::vector<BSONObj> finalSplitPoints;
if (request.getInitialSplitPoints()) {
- initSplits = std::move(*request.getInitialSplitPoints());
+ finalSplitPoints = std::move(*request.getInitialSplitPoints());
} else {
InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection(
shardKeyPattern,
isEmpty,
numShards,
request.getNumInitialChunks(),
- &initSplits,
- &allSplits);
+ &initialSplitPoints,
+ &finalSplitPoints);
}
result << "collectionsharded" << nss.ns();
@@ -676,19 +640,19 @@ public:
result << "collectionUUID" << *uuid;
}
- result << "allSplits" << allSplits;
-
critSec.enterCommitPhase();
LOG(0) << "CMD: shardcollection: " << cmdObj;
audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, request.getUnique());
- // The initial chunks are distributed evenly across shards only if the initial split
- // points were specified in the request, i.e., by mapReduce. Otherwise, all the initial
- // chunks are placed on the primary shard, and may be distributed across shards through
- // migrations (below) if using a hashed shard key.
- const bool distributeInitialChunks = bool(request.getInitialSplitPoints());
+ // The initial chunks are distributed evenly across shards if the initial split points were
+ // specified in the request by mapReduce or if we are using a hashed shard key. Otherwise,
+ // all the initial chunks are placed on the primary shard.
+ const bool fromMapReduce = bool(request.getInitialSplitPoints());
+ const int numContiguousChunksPerShard = initialSplitPoints.empty()
+ ? 1
+ : (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1);
// Step 6. Actually shard the collection.
shardCollection(opCtx,
@@ -697,9 +661,10 @@ public:
shardKeyPattern,
*request.getCollation(),
request.getUnique(),
- initSplits,
- distributeInitialChunks,
- ShardingState::get(opCtx)->getShardName());
+ finalSplitPoints,
+ fromMapReduce,
+ ShardingState::get(opCtx)->getShardName(),
+ numContiguousChunksPerShard);
return true;
}
diff --git a/src/mongo/s/commands/cluster_commands_helpers.cpp b/src/mongo/s/commands/cluster_commands_helpers.cpp
index 2820cc62d91..86597aea280 100644
--- a/src/mongo/s/commands/cluster_commands_helpers.cpp
+++ b/src/mongo/s/commands/cluster_commands_helpers.cpp
@@ -151,9 +151,8 @@ std::vector<AsyncRequestsSender::Request> buildVersionedRequestsForTargetedShard
return requests;
}
-/**
- * Throws StaleConfigException if any remote returns a stale shardVersion error.
- */
+} // namespace
+
std::vector<AsyncRequestsSender::Response> gatherResponses(
OperationContext* opCtx,
StringData dbName,
@@ -232,8 +231,6 @@ std::vector<AsyncRequestsSender::Response> gatherResponses(
return responses;
}
-} // namespace
-
BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version) {
BSONObjBuilder cmdWithVersionBob(std::move(cmdObj));
version.appendToCommand(&cmdWithVersionBob);
diff --git a/src/mongo/s/commands/cluster_commands_helpers.h b/src/mongo/s/commands/cluster_commands_helpers.h
index 4517f951699..bc848435357 100644
--- a/src/mongo/s/commands/cluster_commands_helpers.h
+++ b/src/mongo/s/commands/cluster_commands_helpers.h
@@ -48,6 +48,20 @@ namespace mongo {
void appendWriteConcernErrorToCmdResponse(const ShardId& shardID,
const BSONElement& wcErrorElem,
BSONObjBuilder& responseBuilder);
+
+/**
+ * Dispatches all the specified requests in parallel and waits until all complete, returning a
+ * vector of the same size and positions as that of 'requests'.
+ *
+ * Throws StaleConfigException if any remote returns a stale shardVersion error.
+ */
+std::vector<AsyncRequestsSender::Response> gatherResponses(
+ OperationContext* opCtx,
+ StringData dbName,
+ const ReadPreferenceSetting& readPref,
+ Shard::RetryPolicy retryPolicy,
+ const std::vector<AsyncRequestsSender::Request>& requests);
+
/**
* Returns a copy of 'cmdObj' with 'version' appended.
*/
diff --git a/src/mongo/s/request_types/shard_collection.idl b/src/mongo/s/request_types/shard_collection.idl
index 97e97aa5ba5..1adcd923527 100644
--- a/src/mongo/s/request_types/shard_collection.idl
+++ b/src/mongo/s/request_types/shard_collection.idl
@@ -104,7 +104,7 @@ structs:
type: uuid
description: "The UUID of the collection that just got sharded."
optional: true
-
+
ShardsvrShardCollection:
description: "The internal shardCollection command on a primary shard"
strict: false
@@ -148,11 +148,7 @@ structs:
type: uuid
description: "The UUID of the collection that just got sharded."
optional: true
- allSplits:
- type: array<object>
- description: "All split points."
- optional: false
-
+
ConfigsvrCommitShardCollection:
description: "The internal commitShardCollection command on the config server"
strict: false