diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-01-28 20:55:04 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-01-29 22:15:56 -0500 |
commit | 1c61dfa3307c2009dd29c893b8d2b08af6bcf7d6 (patch) | |
tree | 0c1158d649100c015c8e30d142a4b04a62213e90 /src/mongo/db/s | |
parent | 09abfff1c4ad2f98a9b83093b7e8b6454bc7c393 (diff) | |
download | mongo-1c61dfa3307c2009dd29c893b8d2b08af6bcf7d6.tar.gz |
SERVER-39234 Ensure `shardCollection` initial split works the same between config server and shard primary
Diffstat (limited to 'src/mongo/db/s')
8 files changed, 557 insertions, 239 deletions
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp index 433c655cb46..5c7c354b614 100644 --- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp @@ -396,22 +396,9 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx, */ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, const NamespaceString& nss, - int numShards, const std::vector<ShardId>& shardIds, - bool isEmpty, - const ShardKeyPattern& shardKeyPattern, const std::vector<BSONObj>& finalSplitPoints) { - auto catalogCache = Grid::get(opCtx)->catalogCache(); - - if (!shardKeyPattern.isHashedPattern()) { - // Only initially move chunks when using a hashed shard key. - return; - } - - if (!isEmpty) { - // If the collection is not empty, rely on the balancer to migrate the chunks. - return; - } + const auto catalogCache = Grid::get(opCtx)->catalogCache(); auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::ConflictingOperationInProgress, @@ -422,13 +409,18 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, auto chunkManager = routingInfo.cm(); // Move and commit each "big chunk" to a different shard. - int i = 0; + auto nextShardId = [&, indx = 0 ]() mutable { + return shardIds[indx++ % shardIds.size()]; + }; + for (auto chunk : chunkManager->chunks()) { - const ShardId& shardId = shardIds[i++ % numShards]; + const auto shardId = nextShardId(); + const auto toStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); if (!toStatus.isOK()) { continue; } + const auto to = toStatus.getValue(); // Can't move chunk to shard it's already on @@ -647,11 +639,9 @@ public: std::vector<ShardId> shardIds; shardRegistry->getAllShardIds(opCtx, &shardIds); - const int numShards = shardIds.size(); - uassert(ErrorCodes::IllegalOperation, "cannot shard collections before there are shards", - numShards > 0); + !shardIds.empty()); // Handle collections in the config db separately. if (nss.db() == NamespaceString::kConfigDb) { @@ -686,7 +676,8 @@ public: ON_BLOCK_EXIT([&conn] { conn.done(); }); // Step 1. - validateAndDeduceFullRequestOptions(opCtx, nss, shardKeyPattern, numShards, conn, &request); + validateAndDeduceFullRequestOptions( + opCtx, nss, shardKeyPattern, shardIds.size(), conn, &request); // The collation option should have been set to the collection default collation after being // validated. @@ -753,7 +744,23 @@ public: return true; } - bool isEmpty = (conn->count(nss.ns()) == 0); + // This check for empty collection is racy, because it is not guaranteed that documents + // will not show up in the collection right after the count below has executed. It is + // left here for backwards compatiblity with pre-4.0.4 clusters, which do not support + // sharding being performed by the primary shard. + const bool isEmpty = (conn->count(nss.ns()) == 0); + + // Map/reduce with output to an empty collection assumes it has full control of the + // output collection and it would be an unsupported operation if the collection is being + // concurrently written + const bool fromMapReduce = bool(request.getInitialSplitPoints()); + if (fromMapReduce) { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Map reduce with sharded output to a new collection found " + << nss.ns() + << " to be non-empty which is not supported.", + isEmpty); + } // Step 3. validateShardKeyAgainstExistingIndexes( @@ -770,12 +777,12 @@ public: std::vector<BSONObj> initialSplitPoints; // there will be at most numShards-1 of these std::vector<BSONObj> finalSplitPoints; // all of the desired split points if (request.getInitialSplitPoints()) { - initialSplitPoints = std::move(*request.getInitialSplitPoints()); + initialSplitPoints = *request.getInitialSplitPoints(); } else { InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( shardKeyPattern, isEmpty, - numShards, + shardIds.size(), request.getNumInitialChunks(), &initialSplitPoints, &finalSplitPoints); @@ -784,15 +791,7 @@ public: LOG(0) << "CMD: shardcollection: " << cmdObj; audit::logShardCollection( - Client::getCurrent(), nss.ns(), proposedKey, request.getUnique()); - - // The initial chunks are distributed evenly across shards only if the initial split - // points - // were specified in the request, i.e., by mapReduce. Otherwise, all the initial chunks - // are - // placed on the primary shard, and may be distributed across shards through migrations - // (below) if using a hashed shard key. - const bool distributeInitialChunks = bool(request.getInitialSplitPoints()); + opCtx->getClient(), nss.ns(), proposedKey, request.getUnique()); // Step 6. Actually shard the collection. catalogManager->shardCollection(opCtx, @@ -802,7 +801,7 @@ public: *request.getCollation(), request.getUnique(), initialSplitPoints, - distributeInitialChunks, + fromMapReduce, primaryShardId); result << "collectionsharded" << nss.ns(); if (uuid) { @@ -816,9 +815,12 @@ public: collDistLock.reset(); dbDistLock.reset(); - // Step 7. Migrate initial chunks to distribute them across shards. - migrateAndFurtherSplitInitialChunks( - opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, finalSplitPoints); + // Step 7. If the collection is empty and using hashed sharding, migrate initial chunks + // to spread them evenly across shards from the beginning. Otherwise rely on the + // balancer to do it. + if (isEmpty && shardKeyPattern.isHashedPattern()) { + migrateAndFurtherSplitInitialChunks(opCtx, nss, shardIds, finalSplitPoints); + } return true; } diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp index 4fb66af6867..e8217a4540a 100644 --- a/src/mongo/db/s/config/initial_split_policy.cpp +++ b/src/mongo/db/s/config/initial_split_policy.cpp @@ -274,34 +274,32 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( const ShardId& primaryShardId, const std::vector<BSONObj>& splitPoints, const std::vector<TagsType>& tags, - const bool distributeInitialChunks, - const bool isEmpty, - const int numContiguousChunksPerShard) { - const auto& keyPattern = shardKeyPattern.getKeyPattern(); + bool isEmpty, + int numContiguousChunksPerShard) { + uassert(ErrorCodes::InvalidOptions, + "Cannot generate initial chunks based on both split points and zones", + tags.empty() || splitPoints.empty()); - std::vector<BSONObj> finalSplitPoints; + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); - if (splitPoints.empty() && tags.empty()) { - // If neither split points nor tags were specified use the shard's data distribution to - // determine them - auto primaryShard = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); + const auto& keyPattern = shardKeyPattern.getKeyPattern(); - // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of - // the splitVector command and affects the number of chunks returned, has been loaded. - uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx)); - - if (!isEmpty) { - finalSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( - opCtx, - primaryShardId, - nss, - shardKeyPattern, - ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()), - Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), - 0)); - } + const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); + + // On which shards are the generated chunks allowed to be placed + std::vector<ShardId> shardIds; + if (isEmpty) { + shardRegistry->getAllShardIdsNoReload(&shardIds); } else { + shardIds.push_back(primaryShardId); + } + + ShardCollectionConfig initialChunks; + + // If split points are requested, they take precedence over zones + if (!splitPoints.empty()) { + std::vector<BSONObj> finalSplitPoints; + // Make sure points are unique and ordered auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); @@ -312,37 +310,63 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( for (const auto& splitPoint : orderedPts) { finalSplitPoints.push_back(splitPoint); } - } - - uassert(ErrorCodes::InvalidOptions, - str::stream() << "cannot generate initial chunks based on both split points and tags", - tags.empty() || finalSplitPoints.empty()); - - const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); - - // If docs already exist for the collection, must use primary shard, otherwise defer to - // passed-in distribution option. - std::vector<ShardId> shardIds; - if (isEmpty && distributeInitialChunks) { - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - } else { - shardIds.push_back(primaryShardId); + initialChunks = generateShardCollectionInitialChunks(nss, + shardKeyPattern, + primaryShardId, + validAfter, + finalSplitPoints, + shardIds, + numContiguousChunksPerShard); } + // If zones are defined, use the zones + else if (!tags.empty()) { + if (isEmpty) { + initialChunks = generateShardCollectionInitialZonedChunks( + nss, shardKeyPattern, validAfter, tags, getTagToShardIds(opCtx, tags), shardIds); + } else { + // For a non-empty collection, create one chunk on the primary shard and leave it to the + // balancer to do the zone splitting and placement + ChunkVersion version(1, 0, OID::gen()); + appendChunk(nss, + keyPattern.globalMin(), + keyPattern.globalMax(), + &version, + validAfter, + primaryShardId, + &initialChunks.chunks); + } + } + // If neither split points nor zones are available and the collection is not empty, ask the + // shard to select split points based on the data distribution + else if (!isEmpty) { + auto primaryShard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); - ShardCollectionConfig initialChunks; + // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of + // the splitVector command and affects the number of chunks returned, has been loaded. + const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); + uassertStatusOK(balancerConfig->refreshAndCheck(opCtx)); + + const auto shardSelectedSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( + opCtx, + primaryShardId, + nss, + shardKeyPattern, + ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()), + balancerConfig->getMaxChunkSizeBytes(), + 0)); - if (tags.empty()) { initialChunks = generateShardCollectionInitialChunks(nss, shardKeyPattern, primaryShardId, validAfter, - finalSplitPoints, + shardSelectedSplitPoints, shardIds, numContiguousChunksPerShard); - } else if (!isEmpty) { - // For a non-empty collection, create one chunk on the primary shard and leave it to the - // balancer to do the zone split and rebalancing + } + // For empty collection, just create a single chunk + else { ChunkVersion version(1, 0, OID::gen()); appendChunk(nss, keyPattern.globalMin(), @@ -351,9 +375,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks( validAfter, primaryShardId, &initialChunks.chunks); - } else { - initialChunks = generateShardCollectionInitialZonedChunks( - nss, shardKeyPattern, validAfter, tags, getTagToShardIds(opCtx, tags), shardIds); } LOG(0) << "Created " << initialChunks.chunks.size() << " chunk(s) for: " << nss diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h index 6e43265b3ae..b7bf33c797a 100644 --- a/src/mongo/db/s/config/initial_split_policy.h +++ b/src/mongo/db/s/config/initial_split_policy.h @@ -120,8 +120,12 @@ public: const std::vector<ShardId>& shardIdsForGaps); /** - * Creates the first chunks for a newly sharded collection. - * Returns the created chunks. + * Generates a list with what are the most optimal first chunks and placement for a newly + * sharded collection. + * + * If the collection 'isEmpty', chunks will be spread across all available (appropriate based on + * zoning rules) shards. Otherwise, they will all end up on the primary shard after which the + * balancer will take care of properly distributing them around. */ static ShardCollectionConfig createFirstChunks(OperationContext* opCtx, const NamespaceString& nss, @@ -129,9 +133,8 @@ public: const ShardId& primaryShardId, const std::vector<BSONObj>& splitPoints, const std::vector<TagsType>& tags, - const bool distributeInitialChunks, - const bool isEmpty, - const int numContiguousChunksPerShard = 1); + bool isEmpty, + int numContiguousChunksPerShard = 1); /** * Writes to the config server the first chunks for a newly sharded collection. diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp index 7ff8e44fef2..daa8d0c29f3 100644 --- a/src/mongo/db/s/config/initial_split_policy_test.cpp +++ b/src/mongo/db/s/config/initial_split_policy_test.cpp @@ -118,7 +118,7 @@ TEST(CalculateHashedSplitPointsTest, EmptyCollectionChunksEqualToShards) { checkCalculatedHashedSplitPoints(true, true, 3, 3, &expectedSplitPoints, &expectedSplitPoints); } -TEST(CalculateHashedSplitPointsTest, EmptyCollectionHashedWithInitialSplitsReturnsEmptySplits) { +TEST(CalculateHashedSplitPointsTest, EmptyCollectionHashedWithNoInitialSplitsReturnsEmptySplits) { const std::vector<BSONObj> expectedSplitPoints; checkCalculatedHashedSplitPoints(true, true, 2, 1, &expectedSplitPoints, &expectedSplitPoints); } @@ -147,7 +147,7 @@ TEST(CalculateHashedSplitPointsTest, NotHashedWithInitialSplitsFails) { ErrorCodes::InvalidOptions); } -class GenerateInitialSplitChunksTest : public unittest::Test { +class GenerateInitialSplitChunksTestBase : public unittest::Test { public: /** * Returns a vector of ChunkType objects for the given chunk ranges. @@ -207,7 +207,7 @@ private: const Timestamp _timeStamp{Date_t::now()}; }; -class GenerateInitialHashedSplitChunksTest : public GenerateInitialSplitChunksTest { +class GenerateInitialHashedSplitChunksTest : public GenerateInitialSplitChunksTestBase { public: const std::vector<BSONObj>& hashedSplitPoints() { return _splitPoints; @@ -263,7 +263,7 @@ TEST_F(GenerateInitialHashedSplitChunksTest, assertChunkVectorsAreEqual(expectedChunks, shardCollectionConfig.chunks); } -class GenerateShardCollectionInitialZonedChunksTest : public GenerateInitialSplitChunksTest { +class GenerateShardCollectionInitialZonedChunksTest : public GenerateInitialSplitChunksTestBase { public: /** * Calls generateShardCollectionInitialZonedChunks according to the given arguments diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 309f5eaa386..071085395cc 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -256,19 +256,18 @@ public: /** - * Shards a collection. Assumes that the database is enabled for sharding. + * Shards collection with namespace 'nss' and implicitly assumes that the database is enabled + * for sharding (i.e., doesn't check whether enableSharding has been called previously). * - * @param ns: namespace of collection to shard - * @param uuid: the collection's UUID. Optional because new in 3.6. - * @param fieldsAndOrder: shardKey pattern - * @param defaultCollation: the default collation for the collection, to be written to - * config.collections. If empty, the collection default collation is simple binary - * comparison. Note the the shard key collation will always be simple binary comparison, - * even if the collection default collation is non-simple. - * @param unique: if true, ensure underlying index enforces a unique constraint. - * @param initPoints: create chunks based on a set of specified split points. - * @param initShardIds: If non-empty, specifies the set of shards to assign chunks between. - * Otherwise all chunks will be assigned to the primary shard for the database. + * uuid - the collection's UUID. Optional because new in 3.6. + * fieldsAndOrder - shard key pattern to use. + * defaultCollation - the default collation for the collection, excluding the shard key. If + * empty, defaults to simple binary comparison. Note that the shard key collation will always + * be simple binary comparison, even if the collection default collation is non-simple. + * unique - if true, ensure underlying index enforces a unique constraint. + * initPoints - create chunks based on a set of specified split points. + * isFromMapReduce - whether this request comes from map/reduce, in which case the generated + * chunks can be spread across shards. Otherwise they will stay on the primary shard. */ void shardCollection(OperationContext* opCtx, const NamespaceString& nss, @@ -277,7 +276,7 @@ public: const BSONObj& defaultCollation, bool unique, const std::vector<BSONObj>& initPoints, - const bool distributeInitialChunks, + bool isFromMapReduce, const ShardId& dbPrimaryShardId); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 8909c2575cd..26a9c8bae8d 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -364,7 +364,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, const BSONObj& defaultCollation, bool unique, const vector<BSONObj>& splitPoints, - const bool distributeInitialChunks, + bool isFromMapReduce, const ShardId& dbPrimaryShardId) { const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); @@ -373,14 +373,23 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, // Fail if there are partially written chunks from a previous failed shardCollection. checkForExistingChunks(opCtx, nss); + // Prior to 4.0.5, zones cannot be taken into account at collection sharding time, so ignore + // them and let the balancer apply them later + const std::vector<TagsType> treatAsNoZonesDefined; + + // Map/reduce with output to sharded collection ignores consistency checks and requires the + // initial chunks to be spread across shards unconditionally + const bool treatAsEmpty = isFromMapReduce; + // Record start in changelog { BSONObjBuilder collectionDetail; collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); collectionDetail.append("collection", nss.ns()); - if (uuid) { + if (uuid) uuid->appendToBuilder(&collectionDetail, "uuid"); - } + collectionDetail.append("empty", treatAsEmpty); + collectionDetail.append("fromMapReduce", isFromMapReduce); collectionDetail.append("primary", primaryShard->toString()); collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1)); uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked( @@ -398,19 +407,13 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx, ->makeFromBSON(defaultCollation)); } - std::vector<TagsType> tags; - // Since this code runs on the config server, we cannot guarantee that the collection is still - // empty by the time the metadata is written so always assume we are sharding a non-empty - // collection. - bool isEmpty = false; const auto initialChunks = InitialSplitPolicy::createFirstChunks(opCtx, nss, fieldsAndOrder, dbPrimaryShardId, splitPoints, - tags, - distributeInitialChunks, - isEmpty); + treatAsNoZonesDefined, + treatAsEmpty); InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp index c2fd0035b34..1f16b0c5009 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp @@ -39,25 +39,15 @@ #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" -#include "mongo/db/client.h" -#include "mongo/db/commands.h" #include "mongo/db/s/config/initial_split_policy.h" #include "mongo/db/s/config/sharding_catalog_manager.h" -#include "mongo/executor/network_interface_mock.h" -#include "mongo/executor/task_executor.h" #include "mongo/rpc/metadata/tracking_metadata.h" -#include "mongo/s/catalog/type_changelog.h" #include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_database.h" -#include "mongo/s/catalog/type_locks.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config_server_test_fixture.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" -#include "mongo/stdx/future.h" -#include "mongo/transport/mock_session.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" @@ -74,12 +64,8 @@ using std::string; using std::vector; using unittest::assertGet; -const ShardId testPrimaryShard = ShardId("shard0"); - -const NamespaceString kNamespace("db1.foo"); - -class ShardCollectionTest : public ConfigServerTestFixture { -public: +class ShardCollectionTestBase : public ConfigServerTestFixture { +protected: void expectSplitVector(const HostAndPort& shardHost, const ShardKeyPattern& keyPattern, const BSONObj& splitPoints) { @@ -106,13 +92,50 @@ public: }); } + const ShardId testPrimaryShard{"shard0"}; + const NamespaceString kNamespace{"db1.foo"}; + private: const HostAndPort configHost{"configHost1"}; const ConnectionString configCS{ConnectionString::forReplicaSet("configReplSet", {configHost})}; const HostAndPort clientHost{"clientHost1"}; }; -TEST_F(ShardCollectionTest, anotherMongosSharding) { + +// Tests which exercise the ShardingCatalogManager::shardCollection logic, which is what the config +// server uses to shard collections, when the '_shardsvrShardCollection' command is not available +// (fast initial split optimization) +class ConfigServerShardCollectionTest : public ShardCollectionTestBase { +protected: + void checkWrittenChunks(const std::vector<ChunkType>& expectedChunks) { + const auto grid = Grid::get(operationContext()); + const auto catalogClient = grid->catalogClient(); + repl::OpTime unusedOpTime; + const auto writtenChunks = + assertGet(catalogClient->getChunks(operationContext(), + BSON("ns" << kNamespace.ns()), + BSON("min" << 1), + boost::none, + &unusedOpTime, + repl::ReadConcernLevel::kLocalReadConcern)); + ASSERT_EQ(expectedChunks.size(), writtenChunks.size()); + + auto itE = expectedChunks.begin(); + auto itW = writtenChunks.begin(); + for (; itE != expectedChunks.end(); itE++, itW++) { + const auto& expected = *itE; + const auto& written = *itW; + ASSERT_BSONOBJ_EQ(expected.getMin(), expected.getMin()); + ASSERT_BSONOBJ_EQ(expected.getMax(), expected.getMax()); + ASSERT_EQ(expected.getShard(), written.getShard()); + } + } + + const ShardKeyPattern keyPattern{BSON("_id" << 1)}; + const BSONObj defaultCollation; +}; + +TEST_F(ConfigServerShardCollectionTest, Partially_Written_Chunks_Present) { ShardType shard; shard.setName("shard0"); shard.setHost("shardHost"); @@ -130,25 +153,21 @@ TEST_F(ShardCollectionTest, anotherMongosSharding) { chunk.setMax(BSON("_id" << 5)); ASSERT_OK(setupChunks({chunk})); - ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); - BSONObj defaultCollation; - ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext()) ->shardCollection(operationContext(), kNamespace, boost::none, // UUID - shardKeyPattern, + keyPattern, defaultCollation, false, - vector<BSONObj>{}, - false, + {}, + false, // isFromMapReduce testPrimaryShard), AssertionException, ErrorCodes::ManualInterventionRequired); } -TEST_F(ShardCollectionTest, noInitialChunksOrData) { - // Initial setup +TEST_F(ConfigServerShardCollectionTest, RangeSharding_ForMapReduce_NoInitialSplitPoints) { const HostAndPort shardHost{"shardHost"}; ShardType shard; shard.setName("shard0"); @@ -164,28 +183,22 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) { setupDatabase(kNamespace.db().toString(), shard.getName(), true); - ShardKeyPattern shardKeyPattern(BSON("_id" << 1)); - BSONObj defaultCollation; - // Now start actually sharding the collection. auto future = launchAsync([&] { - ThreadClient tc("Test", getGlobalServiceContext()); + ThreadClient tc("Test", getServiceContext()); auto opCtx = cc().makeOperationContext(); ShardingCatalogManager::get(operationContext()) ->shardCollection(opCtx.get(), kNamespace, boost::none, // UUID - shardKeyPattern, + keyPattern, defaultCollation, false, - vector<BSONObj>{}, - false, + {}, // No split points + true, // isFromMapReduce testPrimaryShard); }); - // Respond to the splitVector command sent to the shard to figure out initial split points. - expectSplitVector(shardHost, shardKeyPattern, BSONObj()); - // Expect the set shard version for that namespace. // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was // generated by shardCollection for the first chunk. @@ -193,10 +206,15 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) { expectSetShardVersion(shardHost, shard, kNamespace, boost::none /* expected ChunkVersion */); future.timed_get(kFutureTimeout); + + checkWrittenChunks( + {ChunkType(kNamespace, + {keyPattern.getKeyPattern().globalMin(), keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + testPrimaryShard)}); } -TEST_F(ShardCollectionTest, withInitialChunks) { - // Initial setup +TEST_F(ConfigServerShardCollectionTest, RangeSharding_ForMapReduce_WithInitialSplitPoints) { const HostAndPort shard0Host{"shardHost0"}; const HostAndPort shard1Host{"shardHost1"}; const HostAndPort shard2Host{"shardHost2"}; @@ -233,65 +251,17 @@ TEST_F(ShardCollectionTest, withInitialChunks) { setupDatabase(kNamespace.db().toString(), shard0.getName(), true); - ShardKeyPattern keyPattern(BSON("_id" << 1)); - BSONObj splitPoint0 = BSON("_id" << 1); BSONObj splitPoint1 = BSON("_id" << 100); BSONObj splitPoint2 = BSON("_id" << 200); BSONObj splitPoint3 = BSON("_id" << 300); - ChunkVersion expectedVersion(1, 0, OID::gen()); - - ChunkType expectedChunk0; - expectedChunk0.setNS(kNamespace); - expectedChunk0.setShard(shard0.getName()); - expectedChunk0.setMin(keyPattern.getKeyPattern().globalMin()); - expectedChunk0.setMax(splitPoint0); - expectedChunk0.setVersion(expectedVersion); - expectedVersion.incMinor(); - - ChunkType expectedChunk1; - expectedChunk1.setNS(kNamespace); - expectedChunk1.setShard(shard1.getName()); - expectedChunk1.setMin(splitPoint0); - expectedChunk1.setMax(splitPoint1); - expectedChunk1.setVersion(expectedVersion); - expectedVersion.incMinor(); - - ChunkType expectedChunk2; - expectedChunk2.setNS(kNamespace); - expectedChunk2.setShard(shard2.getName()); - expectedChunk2.setMin(splitPoint1); - expectedChunk2.setMax(splitPoint2); - expectedChunk2.setVersion(expectedVersion); - expectedVersion.incMinor(); - - ChunkType expectedChunk3; - expectedChunk3.setNS(kNamespace); - expectedChunk3.setShard(shard0.getName()); - expectedChunk3.setMin(splitPoint2); - expectedChunk3.setMax(splitPoint3); - expectedChunk3.setVersion(expectedVersion); - expectedVersion.incMinor(); - - ChunkType expectedChunk4; - expectedChunk4.setNS(kNamespace); - expectedChunk4.setShard(shard1.getName()); - expectedChunk4.setMin(splitPoint3); - expectedChunk4.setMax(keyPattern.getKeyPattern().globalMax()); - expectedChunk4.setVersion(expectedVersion); - - vector<ChunkType> expectedChunks{ - expectedChunk0, expectedChunk1, expectedChunk2, expectedChunk3, expectedChunk4}; - - BSONObj defaultCollation; - // Now start actually sharding the collection. auto future = launchAsync([&] { // TODO: can we mock the ShardRegistry to return these? set<ShardId> shards{shard0.getName(), shard1.getName(), shard2.getName()}; - ThreadClient tc("Test", getGlobalServiceContext()); + ThreadClient tc("Test", getServiceContext()); auto opCtx = cc().makeOperationContext(); ShardingCatalogManager::get(operationContext()) ->shardCollection(opCtx.get(), @@ -300,8 +270,8 @@ TEST_F(ShardCollectionTest, withInitialChunks) { keyPattern, defaultCollation, true, - vector<BSONObj>{splitPoint0, splitPoint1, splitPoint2, splitPoint3}, - true, + {splitPoint0, splitPoint1, splitPoint2, splitPoint3}, + true, // isFromMapReduce testPrimaryShard); }); @@ -312,10 +282,30 @@ TEST_F(ShardCollectionTest, withInitialChunks) { expectSetShardVersion(shard0Host, shard0, kNamespace, boost::none /* expected ChunkVersion */); future.timed_get(kFutureTimeout); + + checkWrittenChunks({ChunkType(kNamespace, + ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0}, + ChunkVersion::IGNORED(), + shard0.getName()), + ChunkType(kNamespace, + ChunkRange{splitPoint0, splitPoint1}, + ChunkVersion::IGNORED(), + shard1.getName()), + ChunkType(kNamespace, + ChunkRange{splitPoint1, splitPoint2}, + ChunkVersion::IGNORED(), + shard2.getName()), + ChunkType(kNamespace, + ChunkRange{splitPoint2, splitPoint3}, + ChunkVersion::IGNORED(), + shard0.getName()), + ChunkType(kNamespace, + ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + shard1.getName())}); } -TEST_F(ShardCollectionTest, withInitialData) { - // Initial setup +TEST_F(ConfigServerShardCollectionTest, RangeSharding_NoInitialSplitPoints_NoSplitVectorPoints) { const HostAndPort shardHost{"shardHost"}; ShardType shard; shard.setName("shard0"); @@ -331,18 +321,59 @@ TEST_F(ShardCollectionTest, withInitialData) { setupDatabase(kNamespace.db().toString(), shard.getName(), true); - ShardKeyPattern keyPattern(BSON("_id" << 1)); + // Now start actually sharding the collection. + auto future = launchAsync([&] { + ThreadClient tc("Test", getServiceContext()); + auto opCtx = cc().makeOperationContext(); + ShardingCatalogManager::get(operationContext()) + ->shardCollection(opCtx.get(), + kNamespace, + boost::none, // UUID + keyPattern, + defaultCollation, + false, + {}, // No split points + false, // isFromMapReduce + testPrimaryShard); + }); + + // Respond to the splitVector command sent to the shard to figure out initial split points. + expectSplitVector(shardHost, keyPattern, BSONObj()); - BSONObj splitPoint0 = BSON("_id" << 1); - BSONObj splitPoint1 = BSON("_id" << 100); - BSONObj splitPoint2 = BSON("_id" << 200); - BSONObj splitPoint3 = BSON("_id" << 300); + // Expect the set shard version for that namespace. + // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was + // generated by shardCollection for the first chunk. + // TODO SERVER-29451: add hooks to the mock storage engine to expect reads and writes. + expectSetShardVersion(shardHost, shard, kNamespace, boost::none /* expected ChunkVersion */); + + future.timed_get(kFutureTimeout); + + checkWrittenChunks( + {ChunkType(kNamespace, + {keyPattern.getKeyPattern().globalMin(), keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + testPrimaryShard)}); +} + +TEST_F(ConfigServerShardCollectionTest, RangeSharding_NoInitialSplitPoints_WithSplitVectorPoints) { + const HostAndPort shardHost{"shardHost"}; + ShardType shard; + shard.setName("shard0"); + shard.setHost(shardHost.toString()); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(shardHost)); + targeter->setFindHostReturnValue(shardHost); + targeterFactory()->addTargeterToReturn(ConnectionString(shardHost), std::move(targeter)); + + ASSERT_OK(setupShards(vector<ShardType>{shard})); - BSONObj defaultCollation; + setupDatabase(kNamespace.db().toString(), shard.getName(), true); // Now start actually sharding the collection. auto future = launchAsync([&] { - ThreadClient tc("Test", getGlobalServiceContext()); + ThreadClient tc("Test", getServiceContext()); auto opCtx = cc().makeOperationContext(); ShardingCatalogManager::get(operationContext()) ->shardCollection(opCtx.get(), @@ -351,11 +382,16 @@ TEST_F(ShardCollectionTest, withInitialData) { keyPattern, defaultCollation, false, - vector<BSONObj>{}, - false, + {}, // No split points + false, // isFromMapReduce testPrimaryShard); }); + BSONObj splitPoint0 = BSON("_id" << 1); + BSONObj splitPoint1 = BSON("_id" << 100); + BSONObj splitPoint2 = BSON("_id" << 200); + BSONObj splitPoint3 = BSON("_id" << 300); + // Respond to the splitVector command sent to the shard to figure out initial split points. expectSplitVector(shardHost, keyPattern, @@ -368,12 +404,135 @@ TEST_F(ShardCollectionTest, withInitialData) { expectSetShardVersion(shardHost, shard, kNamespace, boost::none); future.timed_get(kFutureTimeout); + + checkWrittenChunks({ChunkType(kNamespace, + ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint0, splitPoint1}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint1, splitPoint2}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint2, splitPoint3}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + testPrimaryShard)}); +} + +TEST_F(ConfigServerShardCollectionTest, RangeSharding_WithInitialSplitPoints_NoSplitVectorPoints) { + const HostAndPort shardHost{"shardHost"}; + ShardType shard; + shard.setName("shard0"); + shard.setHost(shardHost.toString()); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(shardHost)); + targeter->setFindHostReturnValue(shardHost); + targeterFactory()->addTargeterToReturn(ConnectionString(shardHost), std::move(targeter)); + + ASSERT_OK(setupShards(vector<ShardType>{shard})); + + setupDatabase(kNamespace.db().toString(), shard.getName(), true); + + BSONObj splitPoint0 = BSON("_id" << 1); + BSONObj splitPoint1 = BSON("_id" << 100); + BSONObj splitPoint2 = BSON("_id" << 200); + BSONObj splitPoint3 = BSON("_id" << 300); + + // Now start actually sharding the collection. + auto future = launchAsync([&] { + ThreadClient tc("Test", getServiceContext()); + auto opCtx = cc().makeOperationContext(); + ShardingCatalogManager::get(operationContext()) + ->shardCollection(opCtx.get(), + kNamespace, + boost::none, // UUID + keyPattern, + defaultCollation, + false, + {splitPoint0, splitPoint1, splitPoint2, splitPoint3}, + false, // isFromMapReduce + testPrimaryShard); + }); + + // Expect the set shard version for that namespace + // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was + // generated by shardCollection for the first chunk. + // TODO SERVER-29451: add hooks to the mock storage engine to expect reads and writes. + expectSetShardVersion(shardHost, shard, kNamespace, boost::none); + + future.timed_get(kFutureTimeout); + + checkWrittenChunks({ChunkType(kNamespace, + ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint0, splitPoint1}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint1, splitPoint2}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint2, splitPoint3}, + ChunkVersion::IGNORED(), + testPrimaryShard), + ChunkType(kNamespace, + ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()}, + ChunkVersion::IGNORED(), + testPrimaryShard)}); } -using CreateFirstChunksTest = ShardCollectionTest; -TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmptyCollection) { +// Direct tests for InitialSplitPolicy::createFirstChunks which is the base call for both the config +// server and shard server's shard collection logic +class CreateFirstChunksTest : public ShardCollectionTestBase { +protected: const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)}; +}; + +TEST_F(CreateFirstChunksTest, Split_Disallowed_With_Both_SplitPoints_And_Zones) { + ASSERT_THROWS_CODE( + InitialSplitPolicy::createFirstChunks( + operationContext(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {BSON("x" << 0)}, + {TagsType(kNamespace, + "TestZone", + ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))}, + true /* isEmpty */), + AssertionException, + ErrorCodes::InvalidOptions); + + ASSERT_THROWS_CODE( + InitialSplitPolicy::createFirstChunks( + operationContext(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {BSON("x" << 0)}, /* No split points */ + {TagsType(kNamespace, + "TestZone", + ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))}, + false /* isEmpty */), + AssertionException, + ErrorCodes::InvalidOptions); +} + +TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromSplitVector_ManyChunksToPrimary) { const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"), ShardType("shard1", "rs1/shard1:123"), ShardType("shard2", "rs2/shard2:123")}; @@ -396,9 +555,8 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmp kNamespace, kShardKeyPattern, ShardId("shard1"), - {}, - {}, - true, + {}, /* No split points */ + {}, /* No zones */ false /* isEmpty */); }); @@ -410,8 +568,41 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmp ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard()); } -TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithTagsIgnoredForNonEmptyCollection) { - const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)}; +TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromClient_ManyChunksToPrimary) { + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"), + ShardType("shard1", "rs1/shard1:123"), + ShardType("shard2", "rs2/shard2:123")}; + + const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost())); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(connStr); + targeter->setFindHostReturnValue(connStr.getServers()[0]); + targeterFactory()->addTargeterToReturn(connStr, std::move(targeter)); + + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + auto future = launchAsync([&] { + ThreadClient tc("Test", getServiceContext()); + auto opCtx = cc().makeOperationContext(); + return InitialSplitPolicy::createFirstChunks(opCtx.get(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {BSON("x" << 0)}, + {}, /* No zones */ + false /* isEmpty */); + }); + + const auto& firstChunks = future.timed_get(kFutureTimeout); + ASSERT_EQ(2U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard()); +} + +TEST_F(CreateFirstChunksTest, NonEmptyCollection_WithZones_OneChunkToPrimary) { const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123", {"TestZone"}), ShardType("shard1", "rs1/shard1:123", {"TestZone"}), ShardType("shard2", "rs2/shard2:123")}; @@ -423,14 +614,106 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithTagsIgnoredForNonEmptyC kNamespace, kShardKeyPattern, ShardId("shard1"), - {}, - {TagsType(kNamespace, "TestZone", ChunkRange(BSON("x" << MinKey), BSON("x" << 0)))}, - true, + {}, /* No split points */ + {TagsType(kNamespace, + "TestZone", + ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))}, false /* isEmpty */); ASSERT_EQ(1U, firstChunks.chunks.size()); ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard()); } +TEST_F(CreateFirstChunksTest, EmptyCollection_SplitPoints_FromClient_ManyChunksDistributed) { + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"), + ShardType("shard1", "rs1/shard1:123"), + ShardType("shard2", "rs2/shard2:123")}; + + const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost())); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(connStr); + targeter->setFindHostReturnValue(connStr.getServers()[0]); + targeterFactory()->addTargeterToReturn(connStr, std::move(targeter)); + + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + auto future = launchAsync([&] { + ThreadClient tc("Test", getServiceContext()); + auto opCtx = cc().makeOperationContext(); + return InitialSplitPolicy::createFirstChunks(opCtx.get(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {BSON("x" << 0), BSON("x" << 100)}, + {}, /* No zones */ + true /* isEmpty */); + }); + + const auto& firstChunks = future.timed_get(kFutureTimeout); + ASSERT_EQ(3U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[0].getShard()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard()); + ASSERT_EQ(kShards[2].getName(), firstChunks.chunks[2].getShard()); +} + +TEST_F(CreateFirstChunksTest, EmptyCollection_NoSplitPoints_OneChunkToPrimary) { + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"), + ShardType("shard1", "rs1/shard1:123"), + ShardType("shard2", "rs2/shard2:123")}; + + const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost())); + + std::unique_ptr<RemoteCommandTargeterMock> targeter( + stdx::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(connStr); + targeter->setFindHostReturnValue(connStr.getServers()[0]); + targeterFactory()->addTargeterToReturn(connStr, std::move(targeter)); + + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + auto future = launchAsync([&] { + ThreadClient tc("Test", getServiceContext()); + auto opCtx = cc().makeOperationContext(); + return InitialSplitPolicy::createFirstChunks(opCtx.get(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {}, /* No split points */ + {}, /* No zones */ + true /* isEmpty */); + }); + + const auto& firstChunks = future.timed_get(kFutureTimeout); + ASSERT_EQ(1U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard()); +} + +TEST_F(CreateFirstChunksTest, EmptyCollection_WithZones_ManyChunksOnFirstZoneShard) { + const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123", {"TestZone"}), + ShardType("shard1", "rs1/shard1:123", {"TestZone"}), + ShardType("shard2", "rs2/shard2:123")}; + ASSERT_OK(setupShards(kShards)); + shardRegistry()->reload(operationContext()); + + const auto firstChunks = InitialSplitPolicy::createFirstChunks( + operationContext(), + kNamespace, + kShardKeyPattern, + ShardId("shard1"), + {}, /* No split points */ + {TagsType(kNamespace, + "TestZone", + ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))}, + true /* isEmpty */); + + ASSERT_EQ(2U, firstChunks.chunks.size()); + ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[0].getShard()); + ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[1].getShard()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp index 45c942eb7bb..c10ed60aed3 100644 --- a/src/mongo/db/s/shardsvr_shard_collection.cpp +++ b/src/mongo/db/s/shardsvr_shard_collection.cpp @@ -423,15 +423,13 @@ void shardCollection(OperationContext* opCtx, bool unique, const std::vector<BSONObj>& splitPoints, const std::vector<TagsType>& tags, - const bool fromMapReduce, + bool fromMapReduce, const ShardId& dbPrimaryShardId, - const int numContiguousChunksPerShard, - const bool isEmpty) { + int numContiguousChunksPerShard, + bool isEmpty) { const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId)); - const bool distributeChunks = - fromMapReduce || fieldsAndOrder.isHashedPattern() || !tags.empty(); // Fail if there are partially written chunks from a previous failed shardCollection. checkForExistingChunks(opCtx, nss); @@ -441,9 +439,10 @@ void shardCollection(OperationContext* opCtx, BSONObjBuilder collectionDetail; collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); collectionDetail.append("collection", nss.ns()); - if (uuid) { + if (uuid) uuid->appendToBuilder(&collectionDetail, "uuid"); - } + collectionDetail.append("empty", isEmpty); + collectionDetail.append("fromMapReduce", fromMapReduce); collectionDetail.append("primary", primaryShard->toString()); collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1)); uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked( @@ -467,7 +466,6 @@ void shardCollection(OperationContext* opCtx, dbPrimaryShardId, splitPoints, tags, - distributeChunks, isEmpty, numContiguousChunksPerShard); @@ -681,7 +679,7 @@ public: std::vector<BSONObj> finalSplitPoints; if (request.getInitialSplitPoints()) { - finalSplitPoints = std::move(*request.getInitialSplitPoints()); + finalSplitPoints = *request.getInitialSplitPoints(); } else if (tags.empty()) { InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection( shardKeyPattern, @@ -702,12 +700,21 @@ public: LOG(0) << "CMD: shardcollection: " << cmdObj; audit::logShardCollection( - Client::getCurrent(), nss.ns(), proposedKey, request.getUnique()); + opCtx->getClient(), nss.ns(), proposedKey, request.getUnique()); - // The initial chunks are distributed evenly across shards if the initial split - // points were specified in the request by mapReduce or if we are using a hashed - // shard key. Otherwise, all the initial chunks are placed on the primary shard. + // Map/reduce with output to an empty collection assumes it has full control of the + // output collection and it would be an unsupported operation if the collection is + // being concurrently written const bool fromMapReduce = bool(request.getInitialSplitPoints()); + if (fromMapReduce) { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() + << "Map reduce with sharded output to a new collection found " + << nss.ns() + << " to be non-empty which is not supported.", + isEmpty); + } + const int numContiguousChunksPerShard = initialSplitPoints.empty() ? 1 : (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1); |