summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-28 20:55:04 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-01-29 22:15:56 -0500
commit1c61dfa3307c2009dd29c893b8d2b08af6bcf7d6 (patch)
tree0c1158d649100c015c8e30d142a4b04a62213e90 /src/mongo/db/s
parent09abfff1c4ad2f98a9b83093b7e8b6454bc7c393 (diff)
downloadmongo-1c61dfa3307c2009dd29c893b8d2b08af6bcf7d6.tar.gz
SERVER-39234 Ensure `shardCollection` initial split works the same between config server and shard primary
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/config/configsvr_shard_collection_command.cpp74
-rw-r--r--src/mongo/db/s/config/initial_split_policy.cpp117
-rw-r--r--src/mongo/db/s/config/initial_split_policy.h13
-rw-r--r--src/mongo/db/s/config/initial_split_policy_test.cpp8
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h25
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp25
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp501
-rw-r--r--src/mongo/db/s/shardsvr_shard_collection.cpp33
8 files changed, 557 insertions, 239 deletions
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
index 433c655cb46..5c7c354b614 100644
--- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
@@ -396,22 +396,9 @@ void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx,
*/
void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
const NamespaceString& nss,
- int numShards,
const std::vector<ShardId>& shardIds,
- bool isEmpty,
- const ShardKeyPattern& shardKeyPattern,
const std::vector<BSONObj>& finalSplitPoints) {
- auto catalogCache = Grid::get(opCtx)->catalogCache();
-
- if (!shardKeyPattern.isHashedPattern()) {
- // Only initially move chunks when using a hashed shard key.
- return;
- }
-
- if (!isEmpty) {
- // If the collection is not empty, rely on the balancer to migrate the chunks.
- return;
- }
+ const auto catalogCache = Grid::get(opCtx)->catalogCache();
auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
uassert(ErrorCodes::ConflictingOperationInProgress,
@@ -422,13 +409,18 @@ void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
auto chunkManager = routingInfo.cm();
// Move and commit each "big chunk" to a different shard.
- int i = 0;
+ auto nextShardId = [&, indx = 0 ]() mutable {
+ return shardIds[indx++ % shardIds.size()];
+ };
+
for (auto chunk : chunkManager->chunks()) {
- const ShardId& shardId = shardIds[i++ % numShards];
+ const auto shardId = nextShardId();
+
const auto toStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId);
if (!toStatus.isOK()) {
continue;
}
+
const auto to = toStatus.getValue();
// Can't move chunk to shard it's already on
@@ -647,11 +639,9 @@ public:
std::vector<ShardId> shardIds;
shardRegistry->getAllShardIds(opCtx, &shardIds);
- const int numShards = shardIds.size();
-
uassert(ErrorCodes::IllegalOperation,
"cannot shard collections before there are shards",
- numShards > 0);
+ !shardIds.empty());
// Handle collections in the config db separately.
if (nss.db() == NamespaceString::kConfigDb) {
@@ -686,7 +676,8 @@ public:
ON_BLOCK_EXIT([&conn] { conn.done(); });
// Step 1.
- validateAndDeduceFullRequestOptions(opCtx, nss, shardKeyPattern, numShards, conn, &request);
+ validateAndDeduceFullRequestOptions(
+ opCtx, nss, shardKeyPattern, shardIds.size(), conn, &request);
// The collation option should have been set to the collection default collation after being
// validated.
@@ -753,7 +744,23 @@ public:
return true;
}
- bool isEmpty = (conn->count(nss.ns()) == 0);
+ // This check for empty collection is racy, because it is not guaranteed that documents
+ // will not show up in the collection right after the count below has executed. It is
+ // left here for backwards compatiblity with pre-4.0.4 clusters, which do not support
+ // sharding being performed by the primary shard.
+ const bool isEmpty = (conn->count(nss.ns()) == 0);
+
+ // Map/reduce with output to an empty collection assumes it has full control of the
+ // output collection and it would be an unsupported operation if the collection is being
+ // concurrently written
+ const bool fromMapReduce = bool(request.getInitialSplitPoints());
+ if (fromMapReduce) {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Map reduce with sharded output to a new collection found "
+ << nss.ns()
+ << " to be non-empty which is not supported.",
+ isEmpty);
+ }
// Step 3.
validateShardKeyAgainstExistingIndexes(
@@ -770,12 +777,12 @@ public:
std::vector<BSONObj> initialSplitPoints; // there will be at most numShards-1 of these
std::vector<BSONObj> finalSplitPoints; // all of the desired split points
if (request.getInitialSplitPoints()) {
- initialSplitPoints = std::move(*request.getInitialSplitPoints());
+ initialSplitPoints = *request.getInitialSplitPoints();
} else {
InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection(
shardKeyPattern,
isEmpty,
- numShards,
+ shardIds.size(),
request.getNumInitialChunks(),
&initialSplitPoints,
&finalSplitPoints);
@@ -784,15 +791,7 @@ public:
LOG(0) << "CMD: shardcollection: " << cmdObj;
audit::logShardCollection(
- Client::getCurrent(), nss.ns(), proposedKey, request.getUnique());
-
- // The initial chunks are distributed evenly across shards only if the initial split
- // points
- // were specified in the request, i.e., by mapReduce. Otherwise, all the initial chunks
- // are
- // placed on the primary shard, and may be distributed across shards through migrations
- // (below) if using a hashed shard key.
- const bool distributeInitialChunks = bool(request.getInitialSplitPoints());
+ opCtx->getClient(), nss.ns(), proposedKey, request.getUnique());
// Step 6. Actually shard the collection.
catalogManager->shardCollection(opCtx,
@@ -802,7 +801,7 @@ public:
*request.getCollation(),
request.getUnique(),
initialSplitPoints,
- distributeInitialChunks,
+ fromMapReduce,
primaryShardId);
result << "collectionsharded" << nss.ns();
if (uuid) {
@@ -816,9 +815,12 @@ public:
collDistLock.reset();
dbDistLock.reset();
- // Step 7. Migrate initial chunks to distribute them across shards.
- migrateAndFurtherSplitInitialChunks(
- opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, finalSplitPoints);
+ // Step 7. If the collection is empty and using hashed sharding, migrate initial chunks
+ // to spread them evenly across shards from the beginning. Otherwise rely on the
+ // balancer to do it.
+ if (isEmpty && shardKeyPattern.isHashedPattern()) {
+ migrateAndFurtherSplitInitialChunks(opCtx, nss, shardIds, finalSplitPoints);
+ }
return true;
}
diff --git a/src/mongo/db/s/config/initial_split_policy.cpp b/src/mongo/db/s/config/initial_split_policy.cpp
index 4fb66af6867..e8217a4540a 100644
--- a/src/mongo/db/s/config/initial_split_policy.cpp
+++ b/src/mongo/db/s/config/initial_split_policy.cpp
@@ -274,34 +274,32 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks(
const ShardId& primaryShardId,
const std::vector<BSONObj>& splitPoints,
const std::vector<TagsType>& tags,
- const bool distributeInitialChunks,
- const bool isEmpty,
- const int numContiguousChunksPerShard) {
- const auto& keyPattern = shardKeyPattern.getKeyPattern();
+ bool isEmpty,
+ int numContiguousChunksPerShard) {
+ uassert(ErrorCodes::InvalidOptions,
+ "Cannot generate initial chunks based on both split points and zones",
+ tags.empty() || splitPoints.empty());
- std::vector<BSONObj> finalSplitPoints;
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (splitPoints.empty() && tags.empty()) {
- // If neither split points nor tags were specified use the shard's data distribution to
- // determine them
- auto primaryShard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId));
+ const auto& keyPattern = shardKeyPattern.getKeyPattern();
- // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of
- // the splitVector command and affects the number of chunks returned, has been loaded.
- uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx));
-
- if (!isEmpty) {
- finalSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
- opCtx,
- primaryShardId,
- nss,
- shardKeyPattern,
- ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()),
- Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- 0));
- }
+ const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp();
+
+ // On which shards are the generated chunks allowed to be placed
+ std::vector<ShardId> shardIds;
+ if (isEmpty) {
+ shardRegistry->getAllShardIdsNoReload(&shardIds);
} else {
+ shardIds.push_back(primaryShardId);
+ }
+
+ ShardCollectionConfig initialChunks;
+
+ // If split points are requested, they take precedence over zones
+ if (!splitPoints.empty()) {
+ std::vector<BSONObj> finalSplitPoints;
+
// Make sure points are unique and ordered
auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet();
@@ -312,37 +310,63 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks(
for (const auto& splitPoint : orderedPts) {
finalSplitPoints.push_back(splitPoint);
}
- }
-
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "cannot generate initial chunks based on both split points and tags",
- tags.empty() || finalSplitPoints.empty());
-
- const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp();
-
- // If docs already exist for the collection, must use primary shard, otherwise defer to
- // passed-in distribution option.
- std::vector<ShardId> shardIds;
- if (isEmpty && distributeInitialChunks) {
- Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
- } else {
- shardIds.push_back(primaryShardId);
+ initialChunks = generateShardCollectionInitialChunks(nss,
+ shardKeyPattern,
+ primaryShardId,
+ validAfter,
+ finalSplitPoints,
+ shardIds,
+ numContiguousChunksPerShard);
}
+ // If zones are defined, use the zones
+ else if (!tags.empty()) {
+ if (isEmpty) {
+ initialChunks = generateShardCollectionInitialZonedChunks(
+ nss, shardKeyPattern, validAfter, tags, getTagToShardIds(opCtx, tags), shardIds);
+ } else {
+ // For a non-empty collection, create one chunk on the primary shard and leave it to the
+ // balancer to do the zone splitting and placement
+ ChunkVersion version(1, 0, OID::gen());
+ appendChunk(nss,
+ keyPattern.globalMin(),
+ keyPattern.globalMax(),
+ &version,
+ validAfter,
+ primaryShardId,
+ &initialChunks.chunks);
+ }
+ }
+ // If neither split points nor zones are available and the collection is not empty, ask the
+ // shard to select split points based on the data distribution
+ else if (!isEmpty) {
+ auto primaryShard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId));
- ShardCollectionConfig initialChunks;
+ // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of
+ // the splitVector command and affects the number of chunks returned, has been loaded.
+ const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration();
+ uassertStatusOK(balancerConfig->refreshAndCheck(opCtx));
+
+ const auto shardSelectedSplitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints(
+ opCtx,
+ primaryShardId,
+ nss,
+ shardKeyPattern,
+ ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()),
+ balancerConfig->getMaxChunkSizeBytes(),
+ 0));
- if (tags.empty()) {
initialChunks = generateShardCollectionInitialChunks(nss,
shardKeyPattern,
primaryShardId,
validAfter,
- finalSplitPoints,
+ shardSelectedSplitPoints,
shardIds,
numContiguousChunksPerShard);
- } else if (!isEmpty) {
- // For a non-empty collection, create one chunk on the primary shard and leave it to the
- // balancer to do the zone split and rebalancing
+ }
+ // For empty collection, just create a single chunk
+ else {
ChunkVersion version(1, 0, OID::gen());
appendChunk(nss,
keyPattern.globalMin(),
@@ -351,9 +375,6 @@ InitialSplitPolicy::ShardCollectionConfig InitialSplitPolicy::createFirstChunks(
validAfter,
primaryShardId,
&initialChunks.chunks);
- } else {
- initialChunks = generateShardCollectionInitialZonedChunks(
- nss, shardKeyPattern, validAfter, tags, getTagToShardIds(opCtx, tags), shardIds);
}
LOG(0) << "Created " << initialChunks.chunks.size() << " chunk(s) for: " << nss
diff --git a/src/mongo/db/s/config/initial_split_policy.h b/src/mongo/db/s/config/initial_split_policy.h
index 6e43265b3ae..b7bf33c797a 100644
--- a/src/mongo/db/s/config/initial_split_policy.h
+++ b/src/mongo/db/s/config/initial_split_policy.h
@@ -120,8 +120,12 @@ public:
const std::vector<ShardId>& shardIdsForGaps);
/**
- * Creates the first chunks for a newly sharded collection.
- * Returns the created chunks.
+ * Generates a list with what are the most optimal first chunks and placement for a newly
+ * sharded collection.
+ *
+ * If the collection 'isEmpty', chunks will be spread across all available (appropriate based on
+ * zoning rules) shards. Otherwise, they will all end up on the primary shard after which the
+ * balancer will take care of properly distributing them around.
*/
static ShardCollectionConfig createFirstChunks(OperationContext* opCtx,
const NamespaceString& nss,
@@ -129,9 +133,8 @@ public:
const ShardId& primaryShardId,
const std::vector<BSONObj>& splitPoints,
const std::vector<TagsType>& tags,
- const bool distributeInitialChunks,
- const bool isEmpty,
- const int numContiguousChunksPerShard = 1);
+ bool isEmpty,
+ int numContiguousChunksPerShard = 1);
/**
* Writes to the config server the first chunks for a newly sharded collection.
diff --git a/src/mongo/db/s/config/initial_split_policy_test.cpp b/src/mongo/db/s/config/initial_split_policy_test.cpp
index 7ff8e44fef2..daa8d0c29f3 100644
--- a/src/mongo/db/s/config/initial_split_policy_test.cpp
+++ b/src/mongo/db/s/config/initial_split_policy_test.cpp
@@ -118,7 +118,7 @@ TEST(CalculateHashedSplitPointsTest, EmptyCollectionChunksEqualToShards) {
checkCalculatedHashedSplitPoints(true, true, 3, 3, &expectedSplitPoints, &expectedSplitPoints);
}
-TEST(CalculateHashedSplitPointsTest, EmptyCollectionHashedWithInitialSplitsReturnsEmptySplits) {
+TEST(CalculateHashedSplitPointsTest, EmptyCollectionHashedWithNoInitialSplitsReturnsEmptySplits) {
const std::vector<BSONObj> expectedSplitPoints;
checkCalculatedHashedSplitPoints(true, true, 2, 1, &expectedSplitPoints, &expectedSplitPoints);
}
@@ -147,7 +147,7 @@ TEST(CalculateHashedSplitPointsTest, NotHashedWithInitialSplitsFails) {
ErrorCodes::InvalidOptions);
}
-class GenerateInitialSplitChunksTest : public unittest::Test {
+class GenerateInitialSplitChunksTestBase : public unittest::Test {
public:
/**
* Returns a vector of ChunkType objects for the given chunk ranges.
@@ -207,7 +207,7 @@ private:
const Timestamp _timeStamp{Date_t::now()};
};
-class GenerateInitialHashedSplitChunksTest : public GenerateInitialSplitChunksTest {
+class GenerateInitialHashedSplitChunksTest : public GenerateInitialSplitChunksTestBase {
public:
const std::vector<BSONObj>& hashedSplitPoints() {
return _splitPoints;
@@ -263,7 +263,7 @@ TEST_F(GenerateInitialHashedSplitChunksTest,
assertChunkVectorsAreEqual(expectedChunks, shardCollectionConfig.chunks);
}
-class GenerateShardCollectionInitialZonedChunksTest : public GenerateInitialSplitChunksTest {
+class GenerateShardCollectionInitialZonedChunksTest : public GenerateInitialSplitChunksTestBase {
public:
/**
* Calls generateShardCollectionInitialZonedChunks according to the given arguments
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 309f5eaa386..071085395cc 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -256,19 +256,18 @@ public:
/**
- * Shards a collection. Assumes that the database is enabled for sharding.
+ * Shards collection with namespace 'nss' and implicitly assumes that the database is enabled
+ * for sharding (i.e., doesn't check whether enableSharding has been called previously).
*
- * @param ns: namespace of collection to shard
- * @param uuid: the collection's UUID. Optional because new in 3.6.
- * @param fieldsAndOrder: shardKey pattern
- * @param defaultCollation: the default collation for the collection, to be written to
- * config.collections. If empty, the collection default collation is simple binary
- * comparison. Note the the shard key collation will always be simple binary comparison,
- * even if the collection default collation is non-simple.
- * @param unique: if true, ensure underlying index enforces a unique constraint.
- * @param initPoints: create chunks based on a set of specified split points.
- * @param initShardIds: If non-empty, specifies the set of shards to assign chunks between.
- * Otherwise all chunks will be assigned to the primary shard for the database.
+ * uuid - the collection's UUID. Optional because new in 3.6.
+ * fieldsAndOrder - shard key pattern to use.
+ * defaultCollation - the default collation for the collection, excluding the shard key. If
+ * empty, defaults to simple binary comparison. Note that the shard key collation will always
+ * be simple binary comparison, even if the collection default collation is non-simple.
+ * unique - if true, ensure underlying index enforces a unique constraint.
+ * initPoints - create chunks based on a set of specified split points.
+ * isFromMapReduce - whether this request comes from map/reduce, in which case the generated
+ * chunks can be spread across shards. Otherwise they will stay on the primary shard.
*/
void shardCollection(OperationContext* opCtx,
const NamespaceString& nss,
@@ -277,7 +276,7 @@ public:
const BSONObj& defaultCollation,
bool unique,
const std::vector<BSONObj>& initPoints,
- const bool distributeInitialChunks,
+ bool isFromMapReduce,
const ShardId& dbPrimaryShardId);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index 8909c2575cd..26a9c8bae8d 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -364,7 +364,7 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
const BSONObj& defaultCollation,
bool unique,
const vector<BSONObj>& splitPoints,
- const bool distributeInitialChunks,
+ bool isFromMapReduce,
const ShardId& dbPrimaryShardId) {
const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
@@ -373,14 +373,23 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
// Fail if there are partially written chunks from a previous failed shardCollection.
checkForExistingChunks(opCtx, nss);
+ // Prior to 4.0.5, zones cannot be taken into account at collection sharding time, so ignore
+ // them and let the balancer apply them later
+ const std::vector<TagsType> treatAsNoZonesDefined;
+
+ // Map/reduce with output to sharded collection ignores consistency checks and requires the
+ // initial chunks to be spread across shards unconditionally
+ const bool treatAsEmpty = isFromMapReduce;
+
// Record start in changelog
{
BSONObjBuilder collectionDetail;
collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
collectionDetail.append("collection", nss.ns());
- if (uuid) {
+ if (uuid)
uuid->appendToBuilder(&collectionDetail, "uuid");
- }
+ collectionDetail.append("empty", treatAsEmpty);
+ collectionDetail.append("fromMapReduce", isFromMapReduce);
collectionDetail.append("primary", primaryShard->toString());
collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1));
uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked(
@@ -398,19 +407,13 @@ void ShardingCatalogManager::shardCollection(OperationContext* opCtx,
->makeFromBSON(defaultCollation));
}
- std::vector<TagsType> tags;
- // Since this code runs on the config server, we cannot guarantee that the collection is still
- // empty by the time the metadata is written so always assume we are sharding a non-empty
- // collection.
- bool isEmpty = false;
const auto initialChunks = InitialSplitPolicy::createFirstChunks(opCtx,
nss,
fieldsAndOrder,
dbPrimaryShardId,
splitPoints,
- tags,
- distributeInitialChunks,
- isEmpty);
+ treatAsNoZonesDefined,
+ treatAsEmpty);
InitialSplitPolicy::writeFirstChunksToConfig(opCtx, initialChunks);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
index c2fd0035b34..1f16b0c5009 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_collection_test.cpp
@@ -39,25 +39,15 @@
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
-#include "mongo/db/client.h"
-#include "mongo/db/commands.h"
#include "mongo/db/s/config/initial_split_policy.h"
#include "mongo/db/s/config/sharding_catalog_manager.h"
-#include "mongo/executor/network_interface_mock.h"
-#include "mongo/executor/task_executor.h"
#include "mongo/rpc/metadata/tracking_metadata.h"
-#include "mongo/s/catalog/type_changelog.h"
#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/catalog/type_database.h"
-#include "mongo/s/catalog/type_locks.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/config_server_test_fixture.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
-#include "mongo/stdx/future.h"
-#include "mongo/transport/mock_session.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
@@ -74,12 +64,8 @@ using std::string;
using std::vector;
using unittest::assertGet;
-const ShardId testPrimaryShard = ShardId("shard0");
-
-const NamespaceString kNamespace("db1.foo");
-
-class ShardCollectionTest : public ConfigServerTestFixture {
-public:
+class ShardCollectionTestBase : public ConfigServerTestFixture {
+protected:
void expectSplitVector(const HostAndPort& shardHost,
const ShardKeyPattern& keyPattern,
const BSONObj& splitPoints) {
@@ -106,13 +92,50 @@ public:
});
}
+ const ShardId testPrimaryShard{"shard0"};
+ const NamespaceString kNamespace{"db1.foo"};
+
private:
const HostAndPort configHost{"configHost1"};
const ConnectionString configCS{ConnectionString::forReplicaSet("configReplSet", {configHost})};
const HostAndPort clientHost{"clientHost1"};
};
-TEST_F(ShardCollectionTest, anotherMongosSharding) {
+
+// Tests which exercise the ShardingCatalogManager::shardCollection logic, which is what the config
+// server uses to shard collections, when the '_shardsvrShardCollection' command is not available
+// (fast initial split optimization)
+class ConfigServerShardCollectionTest : public ShardCollectionTestBase {
+protected:
+ void checkWrittenChunks(const std::vector<ChunkType>& expectedChunks) {
+ const auto grid = Grid::get(operationContext());
+ const auto catalogClient = grid->catalogClient();
+ repl::OpTime unusedOpTime;
+ const auto writtenChunks =
+ assertGet(catalogClient->getChunks(operationContext(),
+ BSON("ns" << kNamespace.ns()),
+ BSON("min" << 1),
+ boost::none,
+ &unusedOpTime,
+ repl::ReadConcernLevel::kLocalReadConcern));
+ ASSERT_EQ(expectedChunks.size(), writtenChunks.size());
+
+ auto itE = expectedChunks.begin();
+ auto itW = writtenChunks.begin();
+ for (; itE != expectedChunks.end(); itE++, itW++) {
+ const auto& expected = *itE;
+ const auto& written = *itW;
+ ASSERT_BSONOBJ_EQ(expected.getMin(), expected.getMin());
+ ASSERT_BSONOBJ_EQ(expected.getMax(), expected.getMax());
+ ASSERT_EQ(expected.getShard(), written.getShard());
+ }
+ }
+
+ const ShardKeyPattern keyPattern{BSON("_id" << 1)};
+ const BSONObj defaultCollation;
+};
+
+TEST_F(ConfigServerShardCollectionTest, Partially_Written_Chunks_Present) {
ShardType shard;
shard.setName("shard0");
shard.setHost("shardHost");
@@ -130,25 +153,21 @@ TEST_F(ShardCollectionTest, anotherMongosSharding) {
chunk.setMax(BSON("_id" << 5));
ASSERT_OK(setupChunks({chunk}));
- ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
- BSONObj defaultCollation;
-
ASSERT_THROWS_CODE(ShardingCatalogManager::get(operationContext())
->shardCollection(operationContext(),
kNamespace,
boost::none, // UUID
- shardKeyPattern,
+ keyPattern,
defaultCollation,
false,
- vector<BSONObj>{},
- false,
+ {},
+ false, // isFromMapReduce
testPrimaryShard),
AssertionException,
ErrorCodes::ManualInterventionRequired);
}
-TEST_F(ShardCollectionTest, noInitialChunksOrData) {
- // Initial setup
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_ForMapReduce_NoInitialSplitPoints) {
const HostAndPort shardHost{"shardHost"};
ShardType shard;
shard.setName("shard0");
@@ -164,28 +183,22 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) {
setupDatabase(kNamespace.db().toString(), shard.getName(), true);
- ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
- BSONObj defaultCollation;
-
// Now start actually sharding the collection.
auto future = launchAsync([&] {
- ThreadClient tc("Test", getGlobalServiceContext());
+ ThreadClient tc("Test", getServiceContext());
auto opCtx = cc().makeOperationContext();
ShardingCatalogManager::get(operationContext())
->shardCollection(opCtx.get(),
kNamespace,
boost::none, // UUID
- shardKeyPattern,
+ keyPattern,
defaultCollation,
false,
- vector<BSONObj>{},
- false,
+ {}, // No split points
+ true, // isFromMapReduce
testPrimaryShard);
});
- // Respond to the splitVector command sent to the shard to figure out initial split points.
- expectSplitVector(shardHost, shardKeyPattern, BSONObj());
-
// Expect the set shard version for that namespace.
// We do not check for a specific ChunkVersion, because we cannot easily know the OID that was
// generated by shardCollection for the first chunk.
@@ -193,10 +206,15 @@ TEST_F(ShardCollectionTest, noInitialChunksOrData) {
expectSetShardVersion(shardHost, shard, kNamespace, boost::none /* expected ChunkVersion */);
future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks(
+ {ChunkType(kNamespace,
+ {keyPattern.getKeyPattern().globalMin(), keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard)});
}
-TEST_F(ShardCollectionTest, withInitialChunks) {
- // Initial setup
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_ForMapReduce_WithInitialSplitPoints) {
const HostAndPort shard0Host{"shardHost0"};
const HostAndPort shard1Host{"shardHost1"};
const HostAndPort shard2Host{"shardHost2"};
@@ -233,65 +251,17 @@ TEST_F(ShardCollectionTest, withInitialChunks) {
setupDatabase(kNamespace.db().toString(), shard0.getName(), true);
- ShardKeyPattern keyPattern(BSON("_id" << 1));
-
BSONObj splitPoint0 = BSON("_id" << 1);
BSONObj splitPoint1 = BSON("_id" << 100);
BSONObj splitPoint2 = BSON("_id" << 200);
BSONObj splitPoint3 = BSON("_id" << 300);
- ChunkVersion expectedVersion(1, 0, OID::gen());
-
- ChunkType expectedChunk0;
- expectedChunk0.setNS(kNamespace);
- expectedChunk0.setShard(shard0.getName());
- expectedChunk0.setMin(keyPattern.getKeyPattern().globalMin());
- expectedChunk0.setMax(splitPoint0);
- expectedChunk0.setVersion(expectedVersion);
- expectedVersion.incMinor();
-
- ChunkType expectedChunk1;
- expectedChunk1.setNS(kNamespace);
- expectedChunk1.setShard(shard1.getName());
- expectedChunk1.setMin(splitPoint0);
- expectedChunk1.setMax(splitPoint1);
- expectedChunk1.setVersion(expectedVersion);
- expectedVersion.incMinor();
-
- ChunkType expectedChunk2;
- expectedChunk2.setNS(kNamespace);
- expectedChunk2.setShard(shard2.getName());
- expectedChunk2.setMin(splitPoint1);
- expectedChunk2.setMax(splitPoint2);
- expectedChunk2.setVersion(expectedVersion);
- expectedVersion.incMinor();
-
- ChunkType expectedChunk3;
- expectedChunk3.setNS(kNamespace);
- expectedChunk3.setShard(shard0.getName());
- expectedChunk3.setMin(splitPoint2);
- expectedChunk3.setMax(splitPoint3);
- expectedChunk3.setVersion(expectedVersion);
- expectedVersion.incMinor();
-
- ChunkType expectedChunk4;
- expectedChunk4.setNS(kNamespace);
- expectedChunk4.setShard(shard1.getName());
- expectedChunk4.setMin(splitPoint3);
- expectedChunk4.setMax(keyPattern.getKeyPattern().globalMax());
- expectedChunk4.setVersion(expectedVersion);
-
- vector<ChunkType> expectedChunks{
- expectedChunk0, expectedChunk1, expectedChunk2, expectedChunk3, expectedChunk4};
-
- BSONObj defaultCollation;
-
// Now start actually sharding the collection.
auto future = launchAsync([&] {
// TODO: can we mock the ShardRegistry to return these?
set<ShardId> shards{shard0.getName(), shard1.getName(), shard2.getName()};
- ThreadClient tc("Test", getGlobalServiceContext());
+ ThreadClient tc("Test", getServiceContext());
auto opCtx = cc().makeOperationContext();
ShardingCatalogManager::get(operationContext())
->shardCollection(opCtx.get(),
@@ -300,8 +270,8 @@ TEST_F(ShardCollectionTest, withInitialChunks) {
keyPattern,
defaultCollation,
true,
- vector<BSONObj>{splitPoint0, splitPoint1, splitPoint2, splitPoint3},
- true,
+ {splitPoint0, splitPoint1, splitPoint2, splitPoint3},
+ true, // isFromMapReduce
testPrimaryShard);
});
@@ -312,10 +282,30 @@ TEST_F(ShardCollectionTest, withInitialChunks) {
expectSetShardVersion(shard0Host, shard0, kNamespace, boost::none /* expected ChunkVersion */);
future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks({ChunkType(kNamespace,
+ ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0},
+ ChunkVersion::IGNORED(),
+ shard0.getName()),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint0, splitPoint1},
+ ChunkVersion::IGNORED(),
+ shard1.getName()),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint1, splitPoint2},
+ ChunkVersion::IGNORED(),
+ shard2.getName()),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint2, splitPoint3},
+ ChunkVersion::IGNORED(),
+ shard0.getName()),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ shard1.getName())});
}
-TEST_F(ShardCollectionTest, withInitialData) {
- // Initial setup
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_NoInitialSplitPoints_NoSplitVectorPoints) {
const HostAndPort shardHost{"shardHost"};
ShardType shard;
shard.setName("shard0");
@@ -331,18 +321,59 @@ TEST_F(ShardCollectionTest, withInitialData) {
setupDatabase(kNamespace.db().toString(), shard.getName(), true);
- ShardKeyPattern keyPattern(BSON("_id" << 1));
+ // Now start actually sharding the collection.
+ auto future = launchAsync([&] {
+ ThreadClient tc("Test", getServiceContext());
+ auto opCtx = cc().makeOperationContext();
+ ShardingCatalogManager::get(operationContext())
+ ->shardCollection(opCtx.get(),
+ kNamespace,
+ boost::none, // UUID
+ keyPattern,
+ defaultCollation,
+ false,
+ {}, // No split points
+ false, // isFromMapReduce
+ testPrimaryShard);
+ });
+
+ // Respond to the splitVector command sent to the shard to figure out initial split points.
+ expectSplitVector(shardHost, keyPattern, BSONObj());
- BSONObj splitPoint0 = BSON("_id" << 1);
- BSONObj splitPoint1 = BSON("_id" << 100);
- BSONObj splitPoint2 = BSON("_id" << 200);
- BSONObj splitPoint3 = BSON("_id" << 300);
+ // Expect the set shard version for that namespace.
+ // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was
+ // generated by shardCollection for the first chunk.
+ // TODO SERVER-29451: add hooks to the mock storage engine to expect reads and writes.
+ expectSetShardVersion(shardHost, shard, kNamespace, boost::none /* expected ChunkVersion */);
+
+ future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks(
+ {ChunkType(kNamespace,
+ {keyPattern.getKeyPattern().globalMin(), keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard)});
+}
+
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_NoInitialSplitPoints_WithSplitVectorPoints) {
+ const HostAndPort shardHost{"shardHost"};
+ ShardType shard;
+ shard.setName("shard0");
+ shard.setHost(shardHost.toString());
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(shardHost));
+ targeter->setFindHostReturnValue(shardHost);
+ targeterFactory()->addTargeterToReturn(ConnectionString(shardHost), std::move(targeter));
+
+ ASSERT_OK(setupShards(vector<ShardType>{shard}));
- BSONObj defaultCollation;
+ setupDatabase(kNamespace.db().toString(), shard.getName(), true);
// Now start actually sharding the collection.
auto future = launchAsync([&] {
- ThreadClient tc("Test", getGlobalServiceContext());
+ ThreadClient tc("Test", getServiceContext());
auto opCtx = cc().makeOperationContext();
ShardingCatalogManager::get(operationContext())
->shardCollection(opCtx.get(),
@@ -351,11 +382,16 @@ TEST_F(ShardCollectionTest, withInitialData) {
keyPattern,
defaultCollation,
false,
- vector<BSONObj>{},
- false,
+ {}, // No split points
+ false, // isFromMapReduce
testPrimaryShard);
});
+ BSONObj splitPoint0 = BSON("_id" << 1);
+ BSONObj splitPoint1 = BSON("_id" << 100);
+ BSONObj splitPoint2 = BSON("_id" << 200);
+ BSONObj splitPoint3 = BSON("_id" << 300);
+
// Respond to the splitVector command sent to the shard to figure out initial split points.
expectSplitVector(shardHost,
keyPattern,
@@ -368,12 +404,135 @@ TEST_F(ShardCollectionTest, withInitialData) {
expectSetShardVersion(shardHost, shard, kNamespace, boost::none);
future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks({ChunkType(kNamespace,
+ ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint0, splitPoint1},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint1, splitPoint2},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint2, splitPoint3},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard)});
+}
+
+TEST_F(ConfigServerShardCollectionTest, RangeSharding_WithInitialSplitPoints_NoSplitVectorPoints) {
+ const HostAndPort shardHost{"shardHost"};
+ ShardType shard;
+ shard.setName("shard0");
+ shard.setHost(shardHost.toString());
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(shardHost));
+ targeter->setFindHostReturnValue(shardHost);
+ targeterFactory()->addTargeterToReturn(ConnectionString(shardHost), std::move(targeter));
+
+ ASSERT_OK(setupShards(vector<ShardType>{shard}));
+
+ setupDatabase(kNamespace.db().toString(), shard.getName(), true);
+
+ BSONObj splitPoint0 = BSON("_id" << 1);
+ BSONObj splitPoint1 = BSON("_id" << 100);
+ BSONObj splitPoint2 = BSON("_id" << 200);
+ BSONObj splitPoint3 = BSON("_id" << 300);
+
+ // Now start actually sharding the collection.
+ auto future = launchAsync([&] {
+ ThreadClient tc("Test", getServiceContext());
+ auto opCtx = cc().makeOperationContext();
+ ShardingCatalogManager::get(operationContext())
+ ->shardCollection(opCtx.get(),
+ kNamespace,
+ boost::none, // UUID
+ keyPattern,
+ defaultCollation,
+ false,
+ {splitPoint0, splitPoint1, splitPoint2, splitPoint3},
+ false, // isFromMapReduce
+ testPrimaryShard);
+ });
+
+ // Expect the set shard version for that namespace
+ // We do not check for a specific ChunkVersion, because we cannot easily know the OID that was
+ // generated by shardCollection for the first chunk.
+ // TODO SERVER-29451: add hooks to the mock storage engine to expect reads and writes.
+ expectSetShardVersion(shardHost, shard, kNamespace, boost::none);
+
+ future.timed_get(kFutureTimeout);
+
+ checkWrittenChunks({ChunkType(kNamespace,
+ ChunkRange{keyPattern.getKeyPattern().globalMin(), splitPoint0},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint0, splitPoint1},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint1, splitPoint2},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint2, splitPoint3},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard),
+ ChunkType(kNamespace,
+ ChunkRange{splitPoint3, keyPattern.getKeyPattern().globalMax()},
+ ChunkVersion::IGNORED(),
+ testPrimaryShard)});
}
-using CreateFirstChunksTest = ShardCollectionTest;
-TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmptyCollection) {
+// Direct tests for InitialSplitPolicy::createFirstChunks which is the base call for both the config
+// server and shard server's shard collection logic
+class CreateFirstChunksTest : public ShardCollectionTestBase {
+protected:
const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)};
+};
+
+TEST_F(CreateFirstChunksTest, Split_Disallowed_With_Both_SplitPoints_And_Zones) {
+ ASSERT_THROWS_CODE(
+ InitialSplitPolicy::createFirstChunks(
+ operationContext(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {BSON("x" << 0)},
+ {TagsType(kNamespace,
+ "TestZone",
+ ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))},
+ true /* isEmpty */),
+ AssertionException,
+ ErrorCodes::InvalidOptions);
+
+ ASSERT_THROWS_CODE(
+ InitialSplitPolicy::createFirstChunks(
+ operationContext(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {BSON("x" << 0)}, /* No split points */
+ {TagsType(kNamespace,
+ "TestZone",
+ ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))},
+ false /* isEmpty */),
+ AssertionException,
+ ErrorCodes::InvalidOptions);
+}
+
+TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromSplitVector_ManyChunksToPrimary) {
const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"),
ShardType("shard1", "rs1/shard1:123"),
ShardType("shard2", "rs2/shard2:123")};
@@ -396,9 +555,8 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmp
kNamespace,
kShardKeyPattern,
ShardId("shard1"),
- {},
- {},
- true,
+ {}, /* No split points */
+ {}, /* No zones */
false /* isEmpty */);
});
@@ -410,8 +568,41 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithoutTagsIgnoredForNonEmp
ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard());
}
-TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithTagsIgnoredForNonEmptyCollection) {
- const ShardKeyPattern kShardKeyPattern{BSON("x" << 1)};
+TEST_F(CreateFirstChunksTest, NonEmptyCollection_SplitPoints_FromClient_ManyChunksToPrimary) {
+ const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"),
+ ShardType("shard1", "rs1/shard1:123"),
+ ShardType("shard2", "rs2/shard2:123")};
+
+ const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost()));
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(connStr);
+ targeter->setFindHostReturnValue(connStr.getServers()[0]);
+ targeterFactory()->addTargeterToReturn(connStr, std::move(targeter));
+
+ ASSERT_OK(setupShards(kShards));
+ shardRegistry()->reload(operationContext());
+
+ auto future = launchAsync([&] {
+ ThreadClient tc("Test", getServiceContext());
+ auto opCtx = cc().makeOperationContext();
+ return InitialSplitPolicy::createFirstChunks(opCtx.get(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {BSON("x" << 0)},
+ {}, /* No zones */
+ false /* isEmpty */);
+ });
+
+ const auto& firstChunks = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(2U, firstChunks.chunks.size());
+ ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard());
+ ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard());
+}
+
+TEST_F(CreateFirstChunksTest, NonEmptyCollection_WithZones_OneChunkToPrimary) {
const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123", {"TestZone"}),
ShardType("shard1", "rs1/shard1:123", {"TestZone"}),
ShardType("shard2", "rs2/shard2:123")};
@@ -423,14 +614,106 @@ TEST_F(CreateFirstChunksTest, DistributeInitialChunksWithTagsIgnoredForNonEmptyC
kNamespace,
kShardKeyPattern,
ShardId("shard1"),
- {},
- {TagsType(kNamespace, "TestZone", ChunkRange(BSON("x" << MinKey), BSON("x" << 0)))},
- true,
+ {}, /* No split points */
+ {TagsType(kNamespace,
+ "TestZone",
+ ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))},
false /* isEmpty */);
ASSERT_EQ(1U, firstChunks.chunks.size());
ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard());
}
+TEST_F(CreateFirstChunksTest, EmptyCollection_SplitPoints_FromClient_ManyChunksDistributed) {
+ const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"),
+ ShardType("shard1", "rs1/shard1:123"),
+ ShardType("shard2", "rs2/shard2:123")};
+
+ const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost()));
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(connStr);
+ targeter->setFindHostReturnValue(connStr.getServers()[0]);
+ targeterFactory()->addTargeterToReturn(connStr, std::move(targeter));
+
+ ASSERT_OK(setupShards(kShards));
+ shardRegistry()->reload(operationContext());
+
+ auto future = launchAsync([&] {
+ ThreadClient tc("Test", getServiceContext());
+ auto opCtx = cc().makeOperationContext();
+ return InitialSplitPolicy::createFirstChunks(opCtx.get(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {BSON("x" << 0), BSON("x" << 100)},
+ {}, /* No zones */
+ true /* isEmpty */);
+ });
+
+ const auto& firstChunks = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(3U, firstChunks.chunks.size());
+ ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[0].getShard());
+ ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[1].getShard());
+ ASSERT_EQ(kShards[2].getName(), firstChunks.chunks[2].getShard());
+}
+
+TEST_F(CreateFirstChunksTest, EmptyCollection_NoSplitPoints_OneChunkToPrimary) {
+ const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123"),
+ ShardType("shard1", "rs1/shard1:123"),
+ ShardType("shard2", "rs2/shard2:123")};
+
+ const auto connStr = assertGet(ConnectionString::parse(kShards[1].getHost()));
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(connStr);
+ targeter->setFindHostReturnValue(connStr.getServers()[0]);
+ targeterFactory()->addTargeterToReturn(connStr, std::move(targeter));
+
+ ASSERT_OK(setupShards(kShards));
+ shardRegistry()->reload(operationContext());
+
+ auto future = launchAsync([&] {
+ ThreadClient tc("Test", getServiceContext());
+ auto opCtx = cc().makeOperationContext();
+ return InitialSplitPolicy::createFirstChunks(opCtx.get(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {}, /* No split points */
+ {}, /* No zones */
+ true /* isEmpty */);
+ });
+
+ const auto& firstChunks = future.timed_get(kFutureTimeout);
+ ASSERT_EQ(1U, firstChunks.chunks.size());
+ ASSERT_EQ(kShards[1].getName(), firstChunks.chunks[0].getShard());
+}
+
+TEST_F(CreateFirstChunksTest, EmptyCollection_WithZones_ManyChunksOnFirstZoneShard) {
+ const std::vector<ShardType> kShards{ShardType("shard0", "rs0/shard0:123", {"TestZone"}),
+ ShardType("shard1", "rs1/shard1:123", {"TestZone"}),
+ ShardType("shard2", "rs2/shard2:123")};
+ ASSERT_OK(setupShards(kShards));
+ shardRegistry()->reload(operationContext());
+
+ const auto firstChunks = InitialSplitPolicy::createFirstChunks(
+ operationContext(),
+ kNamespace,
+ kShardKeyPattern,
+ ShardId("shard1"),
+ {}, /* No split points */
+ {TagsType(kNamespace,
+ "TestZone",
+ ChunkRange(kShardKeyPattern.getKeyPattern().globalMin(), BSON("x" << 0)))},
+ true /* isEmpty */);
+
+ ASSERT_EQ(2U, firstChunks.chunks.size());
+ ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[0].getShard());
+ ASSERT_EQ(kShards[0].getName(), firstChunks.chunks[1].getShard());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/shardsvr_shard_collection.cpp b/src/mongo/db/s/shardsvr_shard_collection.cpp
index 45c942eb7bb..c10ed60aed3 100644
--- a/src/mongo/db/s/shardsvr_shard_collection.cpp
+++ b/src/mongo/db/s/shardsvr_shard_collection.cpp
@@ -423,15 +423,13 @@ void shardCollection(OperationContext* opCtx,
bool unique,
const std::vector<BSONObj>& splitPoints,
const std::vector<TagsType>& tags,
- const bool fromMapReduce,
+ bool fromMapReduce,
const ShardId& dbPrimaryShardId,
- const int numContiguousChunksPerShard,
- const bool isEmpty) {
+ int numContiguousChunksPerShard,
+ bool isEmpty) {
const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId));
- const bool distributeChunks =
- fromMapReduce || fieldsAndOrder.isHashedPattern() || !tags.empty();
// Fail if there are partially written chunks from a previous failed shardCollection.
checkForExistingChunks(opCtx, nss);
@@ -441,9 +439,10 @@ void shardCollection(OperationContext* opCtx,
BSONObjBuilder collectionDetail;
collectionDetail.append("shardKey", fieldsAndOrder.toBSON());
collectionDetail.append("collection", nss.ns());
- if (uuid) {
+ if (uuid)
uuid->appendToBuilder(&collectionDetail, "uuid");
- }
+ collectionDetail.append("empty", isEmpty);
+ collectionDetail.append("fromMapReduce", fromMapReduce);
collectionDetail.append("primary", primaryShard->toString());
collectionDetail.append("numChunks", static_cast<int>(splitPoints.size() + 1));
uassertStatusOK(ShardingLogging::get(opCtx)->logChangeChecked(
@@ -467,7 +466,6 @@ void shardCollection(OperationContext* opCtx,
dbPrimaryShardId,
splitPoints,
tags,
- distributeChunks,
isEmpty,
numContiguousChunksPerShard);
@@ -681,7 +679,7 @@ public:
std::vector<BSONObj> finalSplitPoints;
if (request.getInitialSplitPoints()) {
- finalSplitPoints = std::move(*request.getInitialSplitPoints());
+ finalSplitPoints = *request.getInitialSplitPoints();
} else if (tags.empty()) {
InitialSplitPolicy::calculateHashedSplitPointsForEmptyCollection(
shardKeyPattern,
@@ -702,12 +700,21 @@ public:
LOG(0) << "CMD: shardcollection: " << cmdObj;
audit::logShardCollection(
- Client::getCurrent(), nss.ns(), proposedKey, request.getUnique());
+ opCtx->getClient(), nss.ns(), proposedKey, request.getUnique());
- // The initial chunks are distributed evenly across shards if the initial split
- // points were specified in the request by mapReduce or if we are using a hashed
- // shard key. Otherwise, all the initial chunks are placed on the primary shard.
+ // Map/reduce with output to an empty collection assumes it has full control of the
+ // output collection and it would be an unsupported operation if the collection is
+ // being concurrently written
const bool fromMapReduce = bool(request.getInitialSplitPoints());
+ if (fromMapReduce) {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream()
+ << "Map reduce with sharded output to a new collection found "
+ << nss.ns()
+ << " to be non-empty which is not supported.",
+ isEmpty);
+ }
+
const int numContiguousChunksPerShard = initialSplitPoints.empty()
? 1
: (finalSplitPoints.size() + 1) / (initialSplitPoints.size() + 1);