diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_shard_collection_command.cpp | 100 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.cpp | 140 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy.h | 84 | ||||
-rw-r--r-- | src/mongo/db/s/config/initial_split_policy_test.cpp | 195 | ||||
-rw-r--r-- | src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp | 98 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_shard_collection.cpp | 222 |
9 files changed, 533 insertions, 322 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index dc9b1c7458b..59c60799573 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -181,12 +181,12 @@ env.Library( env.Library( target='balancer', source=[ - 'balancer/balancer.cpp', - 'balancer/balancer_chunk_selection_policy.cpp', 'balancer/balancer_chunk_selection_policy_impl.cpp', + 'balancer/balancer_chunk_selection_policy.cpp', 'balancer/balancer_policy.cpp', - 'balancer/cluster_statistics.cpp', + 'balancer/balancer.cpp', 'balancer/cluster_statistics_impl.cpp', + 'balancer/cluster_statistics.cpp', 'balancer/migration_manager.cpp', 'balancer/scoped_migration_request.cpp', 'balancer/type_migration.cpp', @@ -223,13 +223,14 @@ env.Library( target='sharding_catalog_manager', source=[ 'add_shard_util.cpp', + 'config/initial_split_policy.cpp', + 'config/namespace_serializer.cpp', 'config/sharding_catalog_manager_chunk_operations.cpp', 'config/sharding_catalog_manager_collection_operations.cpp', - 'config/sharding_catalog_manager.cpp', 'config/sharding_catalog_manager_database_operations.cpp', 'config/sharding_catalog_manager_shard_operations.cpp', 'config/sharding_catalog_manager_zone_operations.cpp', - 'config/namespace_serializer.cpp', + 'config/sharding_catalog_manager.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/audit', @@ -419,6 +420,7 @@ env.CppUnitTest( env.CppUnitTest( target='sharding_catalog_manager_test', source=[ + 'config/initial_split_policy_test.cpp', 'config/sharding_catalog_manager_add_shard_test.cpp', 'config/sharding_catalog_manager_add_shard_to_zone_test.cpp', 'config/sharding_catalog_manager_assign_key_range_to_zone_test.cpp', diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp index b00b42cc37c..458495012b3 100644 --- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp @@ -47,6 +47,7 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/s/config/initial_split_policy.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/type_database.h" @@ -56,7 +57,6 @@ #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" -#include "mongo/s/request_types/migration_secondary_throttle_options.h" #include "mongo/s/request_types/shard_collection_gen.h" #include "mongo/s/shard_util.h" #include "mongo/util/log.h" @@ -438,82 +438,6 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx, } /** - * For new collections which use hashed shard keys, we can can pre-split the range of possible - * hashes into a large number of chunks, and distribute them evenly at creation time. Until we - * design a better initialization scheme, the safest way to pre-split is to make one big chunk for - * each shard and migrate them one at a time. - * - * Populates 'initSplits' with the split points to use on the primary shard to produce the initial - * "big chunks." - * Also populates 'allSplits' with the additional split points to use on the "big chunks" after the - * "big chunks" have been spread evenly across shards through migrations. - */ -void determinePresplittingPoints(OperationContext* opCtx, - int numShards, - bool isEmpty, - const BSONObj& proposedKey, - const ShardKeyPattern& shardKeyPattern, - const ConfigsvrShardCollectionRequest& request, - std::vector<BSONObj>* initSplits, - std::vector<BSONObj>* allSplits) { - auto numChunks = request.getNumInitialChunks(); - - if (request.getInitialSplitPoints()) { - *initSplits = std::move(*request.getInitialSplitPoints()); - return; - } - - if (shardKeyPattern.isHashedPattern() && isEmpty) { - // If initial split points are not specified, only pre-split when using a hashed shard - // key and the collection is empty - if (numChunks <= 0) { - // default number of initial chunks - numChunks = 2 * numShards; - } - - // hashes are signed, 64-bit ints. So we divide the range (-MIN long, +MAX long) - // into intervals of size (2^64/numChunks) and create split points at the - // boundaries. The logic below ensures that initial chunks are all - // symmetric around 0. - long long intervalSize = (std::numeric_limits<long long>::max() / numChunks) * 2; - long long current = 0; - - if (numChunks % 2 == 0) { - allSplits->push_back(BSON(proposedKey.firstElementFieldName() << current)); - current += intervalSize; - } else { - current += intervalSize / 2; - } - - for (int i = 0; i < (numChunks - 1) / 2; i++) { - allSplits->push_back(BSON(proposedKey.firstElementFieldName() << current)); - allSplits->push_back(BSON(proposedKey.firstElementFieldName() << -current)); - current += intervalSize; - } - - sort(allSplits->begin(), - allSplits->end(), - SimpleBSONObjComparator::kInstance.makeLessThan()); - - // The initial splits define the "big chunks" that we will subdivide later. - int lastIndex = -1; - for (int i = 1; i < numShards; i++) { - if (lastIndex < (i * numChunks) / numShards - 1) { - lastIndex = (i * numChunks) / numShards - 1; - initSplits->push_back(allSplits->at(lastIndex)); - } - } - } else if (numChunks > 0) { - uasserted(ErrorCodes::InvalidOptions, - str::stream() << (!shardKeyPattern.isHashedPattern() - ? "numInitialChunks is not supported " - "when the shard key is not hashed." - : "numInitialChunks is not supported " - "when the collection is not empty.")); - } -} - -/** * Migrates the initial "big chunks" from the primary shard to spread them evenly across the shards. * * If 'allSplits' is not empty, additionally splits each "big chunk" into smaller chunks using the @@ -901,15 +825,17 @@ public: // Step 5. std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these std::vector<BSONObj> allSplits; // all of the initial desired split points - determinePresplittingPoints(opCtx, - numShards, - isEmpty, - proposedKey, - shardKeyPattern, - request, - &initSplits, - &allSplits); - + if (request.getInitialSplitPoints()) { + initSplits = std::move(*request.getInitialSplitPoints()); + } else { + InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( + shardKeyPattern, + isEmpty, + numShards, + request.getNumInitialChunks(), + &initSplits, + &allSplits); + } LOG(0) << "CMD: shardcollection: " << cmdObj; @@ -919,7 +845,7 @@ public: // were specified in the request, i.e., by mapReduce. Otherwise, all the initial chunks are // placed on the primary shard, and may be distributed across shards through migrations // (below) if using a hashed shard key. - const bool distributeInitialChunks = request.getInitialSplitPoints().is_initialized(); + const bool distributeInitialChunks = bool(request.getInitialSplitPoints()); // Step 6. Actually shard the collection. catalogManager->shardCollection(opCtx, diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp new file mode 100644 index 00000000000..879a7b90339 --- /dev/null +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -0,0 +1,140 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/config/initial_split_policy.h" + +#include "mongo/util/log.h" + +namespace mongo { + +void InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( + const ShardKeyPattern& shardKeyPattern, + bool isEmpty, + int numShards, + int numInitialChunks, + std::vector<BSONObj>* initialSplitPoints, + std::vector<BSONObj>* finalSplitPoints) { + if (!shardKeyPattern.isHashedPattern() || !isEmpty) { + uassert(ErrorCodes::InvalidOptions, + str::stream() << "numInitialChunks is not supported when the collection is not " + << (!shardKeyPattern.isHashedPattern() ? "hashed" : "empty"), + !numInitialChunks); + return; + } + + // no split points are needed + if (numInitialChunks == 1) { + return; + } + + // If initial split points are not specified, only pre-split when using a hashed shard key and + // the collection is empty + if (numInitialChunks <= 0) { + // Default the number of initial chunks it they are not specified + numInitialChunks = 2 * numShards; + } + + // Hashes are signed, 64-bit integers. So we divide the range (-MIN long, +MAX long) into + // intervals of size (2^64/numInitialChunks) and create split points at the boundaries. + // + // The logic below ensures that initial chunks are all symmetric around 0. + const long long intervalSize = (std::numeric_limits<long long>::max() / numInitialChunks) * 2; + long long current = 0; + + const auto proposedKey(shardKeyPattern.getKeyPattern().toBSON()); + + if (numInitialChunks % 2 == 0) { + finalSplitPoints->push_back(BSON(proposedKey.firstElementFieldName() << current)); + current += intervalSize; + } else { + current += intervalSize / 2; + } + + for (int i = 0; i < (numInitialChunks - 1) / 2; i++) { + finalSplitPoints->push_back(BSON(proposedKey.firstElementFieldName() << current)); + finalSplitPoints->push_back(BSON(proposedKey.firstElementFieldName() << -current)); + current += intervalSize; + } + + sort(finalSplitPoints->begin(), + finalSplitPoints->end(), + SimpleBSONObjComparator::kInstance.makeLessThan()); + + // The initial splits define the "big chunks" that we will subdivide later. + int lastIndex = -1; + for (int i = 1; i < numShards; i++) { + if (lastIndex < (i * numInitialChunks) / numShards - 1) { + lastIndex = (i * numInitialChunks) / numShards - 1; + initialSplitPoints->push_back(finalSplitPoints->at(lastIndex)); + } + } +} + +InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::generateShardCollectionInitialChunks( + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const ShardId& databasePrimaryShardId, + const Timestamp& validAfter, + const std::vector<BSONObj>& splitPoints, + const std::vector<ShardId>& shardIds) { + invariant(!shardIds.empty()); + + ChunkVersion version(1, 0, OID::gen()); + + const size_t numChunksToCreate = splitPoints.size() + 1; + + log() << "Going to create " << numChunksToCreate << " chunk(s) for: " << nss + << " using new epoch " << version.epoch(); + + const auto& keyPattern(shardKeyPattern.getKeyPattern()); + + std::vector<ChunkType> chunks; + + for (size_t i = 0; i <= splitPoints.size(); i++) { + const BSONObj min = (i == 0) ? keyPattern.globalMin() : splitPoints[i - 1]; + const BSONObj max = (i < splitPoints.size()) ? splitPoints[i] : keyPattern.globalMax(); + + // It's possible there are no split points or fewer split points than total number of + // shards, and we need to be sure that at least one chunk is placed on the primary shard + const ShardId shardId = (i == 0) ? databasePrimaryShardId : shardIds[i % shardIds.size()]; + + chunks.emplace_back(nss, ChunkRange(min, max), version, shardId); + auto& chunk = chunks.back(); + chunk.setHistory({ChunkHistory(validAfter, shardId)}); + + version.incMinor(); + } + + return {std::move(chunks)}; +} + +} // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h new file mode 100644 index 00000000000..3379554e870 --- /dev/null +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/shard_id.h" +#include "mongo/s/shard_key_pattern.h" + +namespace mongo { + +class InitialSplitPolicy { +public: + /** + * For new collections which use hashed shard keys, we can can pre-split the range of possible + * hashes into a large number of chunks, and distribute them evenly at creation time. + * + * Until we design a better initialization scheme, the most performant way to pre-split is to + * make one big chunk for each shard and migrate them one at a time. Because of this: + * - 'initialSplitPoints' is populated with the split points to use on the primary shard to + * produce the initial "big chunks." + * - 'finalSplitPoints' is populated with the additional split points to use on the "big chunks" + * after the "big chunks" have been spread evenly across shards through migrations. + */ + static void calculateHashedSplitPointsForEmptyCollection( + const ShardKeyPattern& shardKeyPattern, + bool isEmpty, + int numShards, + int numInitialChunks, + std::vector<BSONObj>* initialSplitPoints, + std::vector<BSONObj>* finalSplitPoints); + + struct ShardCollectionConfig { + std::vector<ChunkType> chunks; + + const auto& collVersion() const { + return chunks.back().getVersion(); + } + }; + + /** + * Produces the the initial chunks that need to be written for a collection which is being + * newly-sharded. The function performs some basic validation of the input parameters, but there + * is no checking whether the collection contains any data or not. + */ + static ShardCollectionConfig generateShardCollectionInitialChunks( + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const ShardId& databasePrimaryShardId, + const Timestamp& validAfter, + const std::vector<BSONObj>& splitPoints, + const std::vector<ShardId>& shardIds); +}; + +} // namespace mongo diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp new file mode 100644 index 00000000000..2ac29a84e17 --- /dev/null +++ b/src/mongo/db/s/config/initial_split_policy_test.cpp @@ -0,0 +1,195 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/config/initial_split_policy.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +/** + * Asserts that the given vectors of BSON objects are equal + */ +void assertBSONObjVectorsAreEqual(const std::vector<BSONObj>& expected, + const std::vector<BSONObj>& actual) { + ASSERT_EQ(expected.size(), actual.size()); + for (auto expectedIt = expected.begin(), actualIt = actual.begin(); + expectedIt != expected.end() && actualIt != actual.end(); + ++expectedIt, ++actualIt) { + ASSERT_BSONOBJ_EQ(*expectedIt, *actualIt); + } +} + +/** + * Returns a test hashed shard key pattern if isHashed is true. + * Otherwise, returns a regular shard key pattern. + */ +ShardKeyPattern makeShardKeyPattern(bool isHashed) { + if (isHashed) + return ShardKeyPattern(BSON("x" + << "hashed")); + return ShardKeyPattern(BSON("x" << 1)); +} + +/** + * Calls calculateHashedSplitPointsForEmptyCollection according to the given arguments + * and asserts that calculated split points match with the expected split points. + */ +void checkCalculatedHashedSplitPoints(bool isHashed, + bool isEmpty, + int numShards, + int numInitialChunks, + const std::vector<BSONObj>* expectedInitialSplitPoints, + const std::vector<BSONObj>* expectedFinalSplitPoints) { + std::vector<BSONObj> initialSplitPoints; + std::vector<BSONObj> finalSplitPoints; + InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection(makeShardKeyPattern(isHashed), + isEmpty, + numShards, + numInitialChunks, + &initialSplitPoints, + &finalSplitPoints); + assertBSONObjVectorsAreEqual(initialSplitPoints, *expectedInitialSplitPoints); + assertBSONObjVectorsAreEqual(finalSplitPoints, *expectedFinalSplitPoints); +} + +TEST(CalculateHashedSplitPointsTest, EmptyCollectionMoreChunksThanShards) { + const std::vector<BSONObj> expectedInitialSplitPoints = {BSON("x" << 0)}; + const std::vector<BSONObj> expectedFinalSplitPoints = { + BSON("x" << -4611686018427387902), BSON("x" << 0), BSON("x" << 4611686018427387902)}; + checkCalculatedHashedSplitPoints( + true, true, 2, 4, &expectedInitialSplitPoints, &expectedFinalSplitPoints); +} + +TEST(CalculateHashedSplitPointsTest, EmptyCollectionChunksEqualToShards) { + const std::vector<BSONObj> expectedSplitPoints = {BSON("x" << -3074457345618258602), + BSON("x" << 3074457345618258602)}; + checkCalculatedHashedSplitPoints(true, true, 3, 3, &expectedSplitPoints, &expectedSplitPoints); +} + +TEST(CalculateHashedSplitPointsTest, EmptyCollectionHashedWithInitialSplitsReturnsEmptySplits) { + const std::vector<BSONObj> expectedSplitPoints; + checkCalculatedHashedSplitPoints(true, true, 2, 1, &expectedSplitPoints, &expectedSplitPoints); +} + +TEST(CalculateHashedSplitPointsTest, EmptyCollectionNumInitialChunksZero) { + const std::vector<BSONObj> expectedInitialSplitPoints = {BSON("x" << 0)}; + const std::vector<BSONObj> expectedFinalSplitPoints = { + BSON("x" << -4611686018427387902), BSON("x" << 0), BSON("x" << 4611686018427387902)}; + checkCalculatedHashedSplitPoints( + true, true, 2, 0, &expectedInitialSplitPoints, &expectedFinalSplitPoints); +} + +TEST(CalculateHashedSplitPointsTest, NonEmptyCollectionHashedWithInitialSplitsFails) { + std::vector<BSONObj> expectedSplitPoints; + ASSERT_THROWS_CODE(checkCalculatedHashedSplitPoints( + true, false, 2, 3, &expectedSplitPoints, &expectedSplitPoints), + AssertionException, + ErrorCodes::InvalidOptions); +} + +TEST(CalculateHashedSplitPointsTest, NotHashedWithInitialSplitsFails) { + std::vector<BSONObj> expectedSplitPoints; + ASSERT_THROWS_CODE(checkCalculatedHashedSplitPoints( + false, true, 2, 3, &expectedSplitPoints, &expectedSplitPoints), + AssertionException, + ErrorCodes::InvalidOptions); +} + +class GenerateInitialSplitChunksTest : public unittest::Test { +public: + ChunkType makeChunk(const BSONObj min, const BSONObj max, const ShardId shardId) { + ChunkVersion version(1, 0, OID::gen()); + ChunkType chunk(_nss, ChunkRange(min, max), version, shardId); + chunk.setHistory({ChunkHistory(_timeStamp, shardId)}); + return chunk; + } + const NamespaceString nss() { + return _nss; + } + + const ShardKeyPattern& shardKeyPattern() { + return _shardKeyPattern; + } + + const std::vector<ShardId> shardIds() { + return _shardIds; + } + + const Timestamp timeStamp() { + return _timeStamp; + } + +private: + const NamespaceString _nss{"test.foo"}; + const ShardKeyPattern _shardKeyPattern = makeShardKeyPattern(true); + const std::vector<ShardId> _shardIds = {ShardId("testShard0"), ShardId("testShard1")}; + const Timestamp _timeStamp{Date_t::now()}; +}; + +TEST_F(GenerateInitialSplitChunksTest, NoSplitPoints) { + const std::vector<BSONObj> splitPoints; + const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks( + nss(), shardKeyPattern(), shardIds()[0], timeStamp(), splitPoints, shardIds()); + + const auto& keyPattern = shardKeyPattern().getKeyPattern(); + const auto expectedChunk = + makeChunk(keyPattern.globalMin(), keyPattern.globalMax(), shardIds()[0]); + ASSERT_EQ(1U, shardCollectionConfig.chunks.size()); + ASSERT_BSONOBJ_EQ(expectedChunk.toShardBSON(), shardCollectionConfig.chunks[0].toShardBSON()); +} + +TEST_F(GenerateInitialSplitChunksTest, SplitPointsMoreThanAvailableShards) { + const auto& keyPattern = shardKeyPattern().getKeyPattern(); + const std::vector<BSONObj> expectedChunkBounds = {keyPattern.globalMin(), + BSON("x" << -4611686018427387902), + BSON("x" << 0), + BSON("x" << 4611686018427387902), + keyPattern.globalMax()}; + const std::vector<BSONObj> splitPoints(expectedChunkBounds.begin() + 1, + expectedChunkBounds.end() - 1); + const auto shardCollectionConfig = InitialSplitPolicy::generateShardCollectionInitialChunks( + nss(), shardKeyPattern(), shardIds()[0], timeStamp(), splitPoints, shardIds()); + + ASSERT_EQ(splitPoints.size() + 1, shardCollectionConfig.chunks.size()); + for (unsigned long i = 0; i < expectedChunkBounds.size() - 1; ++i) { + // chunks should be distributed in a round-robin manner + const auto expectedChunk = makeChunk( + expectedChunkBounds[i], expectedChunkBounds[i + 1], shardIds()[i % shardIds().size()]); + ASSERT_BSONOBJ_EQ(expectedChunk.toShardBSON().removeField("lastmod"), + shardCollectionConfig.chunks[i].toShardBSON().removeField("lastmod")); + } +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 6e3a310084d..e01d06d11a9 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -49,11 +49,11 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/config/initial_split_policy.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" @@ -134,16 +134,16 @@ boost::optional<UUID> checkCollectionOptions(OperationContext* opCtx, * Creates and writes to the config server the first chunks for a newly sharded collection. Returns * the version generated for the collection. */ -ChunkVersion createFirstChunks(OperationContext* opCtx, - const NamespaceString& nss, - const ShardKeyPattern& shardKeyPattern, - const ShardId& primaryShardId, - const std::vector<BSONObj>& initPoints, - const bool distributeInitialChunks) { - const KeyPattern keyPattern = shardKeyPattern.getKeyPattern(); +InitialSplitPolicy::ShardCollectionConfig createFirstChunks(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const ShardId& primaryShardId, + const std::vector<BSONObj>& initPoints, + const bool distributeInitialChunks) { + const auto& keyPattern = shardKeyPattern.getKeyPattern(); - vector<BSONObj> splitPoints; - vector<ShardId> shardIds; + std::vector<BSONObj> splitPoints; + std::vector<ShardId> shardIds; if (initPoints.empty()) { // If no split points were specified use the shard's data distribution to determine them @@ -202,40 +202,14 @@ ChunkVersion createFirstChunks(OperationContext* opCtx, } } - // This is the first chunk; start the versioning from scratch - const OID epoch = OID::gen(); - ChunkVersion version(1, 0, epoch); - - log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << nss - << " using new epoch " << version.epoch(); - - const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); - - for (unsigned i = 0; i <= splitPoints.size(); i++) { - const BSONObj min = (i == 0) ? keyPattern.globalMin() : splitPoints[i - 1]; - const BSONObj max = (i < splitPoints.size()) ? splitPoints[i] : keyPattern.globalMax(); - - // The correct version must be returned as part of this call so only increment for versions, - // which get written - if (i > 0) { - version.incMinor(); - } - - ChunkType chunk; - chunk.setNS(nss); - chunk.setMin(min); - chunk.setMax(max); - chunk.setVersion(version); - - // It's possible there are no split points or fewer split points than total number of - // shards, and we need to be sure that at least one chunk is placed on the primary shard. - auto shardId = (i == 0) ? primaryShardId : shardIds[i % shardIds.size()]; - chunk.setShard(shardId); - - std::vector<ChunkHistory> initialHistory; - initialHistory.emplace_back(ChunkHistory(validAfter, shardId)); - chunk.setHistory(std::move(initialHistory)); - + auto initialChunks = InitialSplitPolicy::generateShardCollectionInitialChunks( + nss, + shardKeyPattern, + primaryShardId, + LogicalClock::get(opCtx)->getClusterTime().asTimestamp(), + splitPoints, + shardIds); + for (const auto& chunk : initialChunks.chunks) { uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( opCtx, ChunkType::ConfigNS, @@ -243,7 +217,7 @@ ChunkVersion createFirstChunks(OperationContext* opCtx, ShardingCatalogClient::kMajorityWriteConcern)); } - return version; + return initialChunks; } void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) { @@ -502,17 +476,13 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, } collectionDetail.append("primary", primaryShard->toString()); collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1)); - catalogClient - ->logChange(opCtx, - "shardCollection.start", - nss.ns(), - collectionDetail.obj(), - ShardingCatalogClient::kMajorityWriteConcern) - .transitional_ignore(); + uassertStatusOK(catalogClient->logChange(opCtx, + "shardCollection.start", + nss.ns(), + collectionDetail.obj(), + ShardingCatalogClient::kMajorityWriteConcern)); } - // const NamespaceString nss(ns); - // Construct the collection default collator. std::unique_ptr<CollatorInterface> defaultCollator; if (!defaultCollation.isEmpty()) { @@ -520,20 +490,16 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, ->makeFromBSON(defaultCollation)); } - const auto& collVersion = createFirstChunks( + const auto initialChunks = createFirstChunks( opCtx, nss, fieldsAndOrder, dbPrimaryShardId, initPoints, distributeInitialChunks); { CollectionType coll; coll.setNs(nss); - if (uuid) { + if (uuid) coll.setUUID(*uuid); - } - coll.setEpoch(collVersion.epoch()); - - // TODO(schwerin): The following isn't really a date, but is stored as one in-memory and in - // config.collections, as a historical oddity. - coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(collVersion.toLong())); + coll.setEpoch(initialChunks.collVersion().epoch()); + coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(initialChunks.collVersion().toLong())); coll.setKeyPattern(fieldsAndOrder.toBSON()); coll.setDefaultCollation(defaultCollator ? defaultCollator->getSpec().toBSON() : BSONObj()); coll.setUnique(unique); @@ -546,14 +512,12 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, invariant(!shard->isConfig()); // Tell the primary mongod to refresh its data - // TODO: Think the real fix here is for mongos to just - // assume that all collections are sharded, when we get there SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( shardRegistry->getConfigServerConnectionString(), dbPrimaryShardId, primaryShard->getConnString(), nss, - collVersion, + initialChunks.collVersion(), true /* isAuthoritative */, true /* forceRefresh */); @@ -574,9 +538,9 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, ->logChange(opCtx, "shardCollection.end", nss.ns(), - BSON("version" << collVersion.toString()), + BSON("version" << initialChunks.collVersion().toString()), ShardingCatalogClient::kMajorityWriteConcern) - .transitional_ignore(); + .ignore(); } void ShardingCatalogManager::generateUUIDsForExistingShardedCollections(OperationContext* opCtx) { diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 03e99c4f1a3..1949095bd44 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -496,7 +496,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio void MigrationDestinationManager::cloneCollectionIndexesAndOptions(OperationContext* opCtx, const NamespaceString& nss, - ShardId fromShardId) { + const ShardId& fromShardId) { auto fromShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, fromShardId)); diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 228dc1d104e..405ffd7b277 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -134,7 +134,7 @@ public: */ static void cloneCollectionIndexesAndOptions(OperationContext* opCtx, const NamespaceString& nss, - ShardId fromShardId); + const ShardId& fromShardId); private: /** diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index 4b8801fd6be..5b849715d9a 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -44,6 +44,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/config/initial_split_policy.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_state.h" @@ -330,82 +331,6 @@ void validateShardKeyAgainstExistingZones(OperationContext* opCtx, } } -/** - * For new collections which use hashed shard keys, we can can pre-split the range of possible - * hashes into a large number of chunks, and distribute them evenly at creation time. Until we - * design a better initialization scheme, the safest way to pre-split is to make one big chunk for - * each shard and migrate them one at a time. - * - * Populates 'initSplits' with the split points to use on the primary shard to produce the initial - * "big chunks." - * Also populates 'allSplits' with the additional split points to use on the "big chunks" after the - * "big chunks" have been spread evenly across shards through migrations. - */ -void determinePresplittingPoints(OperationContext* opCtx, - int numShards, - bool isEmpty, - const BSONObj& proposedKey, - const ShardKeyPattern& shardKeyPattern, - const ShardsvrShardCollection& request, - std::vector<BSONObj>* initSplits, - std::vector<BSONObj>* allSplits) { - auto numChunks = request.getNumInitialChunks(); - - if (request.getInitialSplitPoints()) { - *initSplits = std::move(*request.getInitialSplitPoints()); - return; - } - - if (shardKeyPattern.isHashedPattern() && isEmpty) { - // If initial split points are not specified, only pre-split when using a hashed shard - // key and the collection is empty - if (numChunks <= 0) { - // default number of initial chunks - numChunks = 2 * numShards; - } - - // hashes are signed, 64-bit ints. So we divide the range (-MIN long, +MAX long) - // into intervals of size (2^64/numChunks) and create split points at the - // boundaries. The logic below ensures that initial chunks are all - // symmetric around 0. - long long intervalSize = (std::numeric_limits<long long>::max() / numChunks) * 2; - long long current = 0; - - if (numChunks % 2 == 0) { - allSplits->push_back(BSON(proposedKey.firstElementFieldName() << current)); - current += intervalSize; - } else { - current += intervalSize / 2; - } - - for (int i = 0; i < (numChunks - 1) / 2; i++) { - allSplits->push_back(BSON(proposedKey.firstElementFieldName() << current)); - allSplits->push_back(BSON(proposedKey.firstElementFieldName() << -current)); - current += intervalSize; - } - - sort(allSplits->begin(), - allSplits->end(), - SimpleBSONObjComparator::kInstance.makeLessThan()); - - // The initial splits define the "big chunks" that we will subdivide later. - int lastIndex = -1; - for (int i = 1; i < numShards; i++) { - if (lastIndex < (i * numChunks) / numShards - 1) { - lastIndex = (i * numChunks) / numShards - 1; - initSplits->push_back(allSplits->at(lastIndex)); - } - } - } else if (numChunks > 0) { - uasserted(ErrorCodes::InvalidOptions, - str::stream() << (!shardKeyPattern.isHashedPattern() - ? "numInitialChunks is not supported " - "when the shard key is not hashed." - : "numInitialChunks is not supported " - "when the collection is not empty.")); - } -} - boost::optional<UUID> getUUIDFromPrimaryShard(OperationContext* opCtx, const NamespaceString& nss) { // Obtain the collection's UUID from the primary shard's listCollections response. DBDirectClient localClient(opCtx); @@ -449,21 +374,32 @@ boost::optional<UUID> getUUIDFromPrimaryShard(OperationContext* opCtx, const Nam * Creates and writes to the config server the first chunks for a newly sharded collection. Returns * the version generated for the collection. */ -ChunkVersion createFirstChunks(OperationContext* opCtx, - const NamespaceString& nss, - const ShardKeyPattern& shardKeyPattern, - const ShardId& primaryShardId, - const std::vector<BSONObj>& initPoints, - const bool distributeInitialChunks) { - const KeyPattern keyPattern = shardKeyPattern.getKeyPattern(); - DBDirectClient localClient(opCtx); +InitialSplitPolicy::ShardCollectionConfig createFirstChunks(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const ShardId& primaryShardId, + const std::vector<BSONObj>& initPoints, + const bool distributeInitialChunks) { + const auto& keyPattern = shardKeyPattern.getKeyPattern(); std::vector<BSONObj> splitPoints; std::vector<ShardId> shardIds; if (initPoints.empty()) { // If no split points were specified use the shard's data distribution to determine them - long long numObjects = localClient.count(nss.ns()); + auto primaryShard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); + + auto result = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, + nss.db().toString(), + BSON("count" << nss.coll()), + Shard::RetryPolicy::kIdempotent)); + + long long numObjects = 0; + uassertStatusOK(result.commandStatus); + uassertStatusOK(bsonExtractIntegerField(result.response, "n", &numObjects)); // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of // the splitVector command and affects the number of chunks returned, has been loaded. @@ -506,40 +442,14 @@ ChunkVersion createFirstChunks(OperationContext* opCtx, } } - // This is the first chunk; start the versioning from scratch - const OID epoch = OID::gen(); - ChunkVersion version(1, 0, epoch); - - log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << nss - << " using new epoch " << version.epoch(); - - const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); - - for (unsigned i = 0; i <= splitPoints.size(); i++) { - const BSONObj min = (i == 0) ? keyPattern.globalMin() : splitPoints[i - 1]; - const BSONObj max = (i < splitPoints.size()) ? splitPoints[i] : keyPattern.globalMax(); - - // The correct version must be returned as part of this call so only increment for versions, - // which get written - if (i > 0) { - version.incMinor(); - } - - ChunkType chunk; - chunk.setNS(nss); - chunk.setMin(min); - chunk.setMax(max); - chunk.setVersion(version); - - // It's possible there are no split points or fewer split points than total number of - // shards, and we need to be sure that at least one chunk is placed on the primary shard. - auto shardId = (i == 0) ? primaryShardId : shardIds[i % shardIds.size()]; - chunk.setShard(shardId); - - std::vector<ChunkHistory> initialHistory; - initialHistory.emplace_back(ChunkHistory(validAfter, shardId)); - chunk.setHistory(std::move(initialHistory)); - + auto initialChunks = InitialSplitPolicy::generateShardCollectionInitialChunks( + nss, + shardKeyPattern, + primaryShardId, + LogicalClock::get(opCtx)->getClusterTime().asTimestamp(), + splitPoints, + shardIds); + for (const auto& chunk : initialChunks.chunks) { uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( opCtx, ChunkType::ConfigNS, @@ -547,7 +457,7 @@ ChunkVersion createFirstChunks(OperationContext* opCtx, ShardingCatalogClient::kMajorityWriteConcern)); } - return version; + return initialChunks; } void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) { @@ -612,17 +522,13 @@ void shardCollection(OperationContext* opCtx, } collectionDetail.append("primary", primaryShard->toString()); collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1)); - catalogClient - ->logChange(opCtx, - "shardCollection.start", - nss.ns(), - collectionDetail.obj(), - ShardingCatalogClient::kMajorityWriteConcern) - .transitional_ignore(); + uassertStatusOK(catalogClient->logChange(opCtx, + "shardCollection.start", + nss.ns(), + collectionDetail.obj(), + ShardingCatalogClient::kMajorityWriteConcern)); } - // const NamespaceString nss(ns); - // Construct the collection default collator. std::unique_ptr<CollatorInterface> defaultCollator; if (!defaultCollation.isEmpty()) { @@ -630,20 +536,16 @@ void shardCollection(OperationContext* opCtx, ->makeFromBSON(defaultCollation)); } - const auto& collVersion = createFirstChunks( + const auto initialChunks = createFirstChunks( opCtx, nss, fieldsAndOrder, dbPrimaryShardId, initPoints, distributeInitialChunks); { CollectionType coll; coll.setNs(nss); - if (uuid) { + if (uuid) coll.setUUID(*uuid); - } - coll.setEpoch(collVersion.epoch()); - - // TODO(schwerin): The following isn't really a date, but is stored as one in-memory and in - // config.collections, as a historical oddity. - coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(collVersion.toLong())); + coll.setEpoch(initialChunks.collVersion().epoch()); + coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(initialChunks.collVersion().toLong())); coll.setKeyPattern(fieldsAndOrder.toBSON()); coll.setDefaultCollation(defaultCollator ? defaultCollator->getSpec().toBSON() : BSONObj()); coll.setUnique(unique); @@ -658,9 +560,9 @@ void shardCollection(OperationContext* opCtx, ->logChange(opCtx, "shardCollection.end", nss.ns(), - BSON("version" << collVersion.toString()), + BSON("version" << initialChunks.collVersion().toString()), ShardingCatalogClient::kMajorityWriteConcern) - .transitional_ignore(); + .ignore(); } /** @@ -704,22 +606,20 @@ public: const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder& result) override { - auto const shardingState = ShardingState::get(opCtx); uassertStatusOK(shardingState->canAcceptShardedCommands()); - const auto shardsvrShardCollectionRequest = ShardsvrShardCollection::parse( + const auto request = ShardsvrShardCollection::parse( IDLParserErrorContext("_shardsvrShardCollection"), cmdObj); const NamespaceString nss(parseNs(dbname, cmdObj)); // Take the collection critical section so that no writes can happen. CollectionCriticalSection critSec(opCtx, nss); - auto proposedKey(shardsvrShardCollectionRequest.getKey().getOwned()); + auto proposedKey(request.getKey().getOwned()); ShardKeyPattern shardKeyPattern(proposedKey); - validateShardKeyAgainstExistingIndexes( - opCtx, nss, proposedKey, shardKeyPattern, shardsvrShardCollectionRequest); + validateShardKeyAgainstExistingIndexes(opCtx, nss, proposedKey, shardKeyPattern, request); // read zone info auto configServer = Grid::get(opCtx)->shardRegistry()->getConfigShard(); @@ -739,7 +639,7 @@ public: } boost::optional<UUID> uuid; - if (shardsvrShardCollectionRequest.getGetUUIDfromPrimaryShard()) { + if (request.getGetUUIDfromPrimaryShard()) { uuid = getUUIDFromPrimaryShard(opCtx, nss); } else { uuid = UUID::gen(); @@ -759,14 +659,18 @@ public: // them. std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these std::vector<BSONObj> allSplits; // all of the initial desired split points - determinePresplittingPoints(opCtx, - numShards, - isEmpty, - proposedKey, - shardKeyPattern, - shardsvrShardCollectionRequest, - &initSplits, - &allSplits); + if (request.getInitialSplitPoints()) { + initSplits = std::move(*request.getInitialSplitPoints()); + } else { + InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( + shardKeyPattern, + isEmpty, + numShards, + request.getNumInitialChunks(), + &initSplits, + &allSplits); + } + result << "collectionsharded" << nss.ns(); if (uuid) { result << "collectionUUID" << *uuid; @@ -778,25 +682,21 @@ public: LOG(0) << "CMD: shardcollection: " << cmdObj; - audit::logShardCollection(Client::getCurrent(), - nss.ns(), - proposedKey, - shardsvrShardCollectionRequest.getUnique()); + audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, request.getUnique()); // The initial chunks are distributed evenly across shards only if the initial split // points were specified in the request, i.e., by mapReduce. Otherwise, all the initial // chunks are placed on the primary shard, and may be distributed across shards through // migrations (below) if using a hashed shard key. - const bool distributeInitialChunks = - shardsvrShardCollectionRequest.getInitialSplitPoints().is_initialized(); + const bool distributeInitialChunks = bool(request.getInitialSplitPoints()); // Step 6. Actually shard the collection. shardCollection(opCtx, nss, uuid, shardKeyPattern, - *shardsvrShardCollectionRequest.getCollation(), - shardsvrShardCollectionRequest.getUnique(), + *request.getCollation(), + request.getUnique(), initSplits, distributeInitialChunks, ShardingState::get(opCtx)->getShardName()); |