summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2018-09-27 12:58:41 -0400
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-10-01 14:24:42 -0400
commitf9ae642a5cab2be1c4592d67f722b849b5151739 (patch)
tree5cb18213f01177ed1d03c10940f1bc98bd2311cf /src/mongo/s
parent5b13c41ef9773fa875b278cbc7ce418f444e5698 (diff)
downloadmongo-f9ae642a5cab2be1c4592d67f722b849b5151739.tar.gz
SERVER-36113 Add ability to perform $exchange when shard key is hashed
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.cpp16
-rw-r--r--src/mongo/s/catalog_cache_test_fixture.h11
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp3
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner_test.cpp61
4 files changed, 81 insertions, 10 deletions
diff --git a/src/mongo/s/catalog_cache_test_fixture.cpp b/src/mongo/s/catalog_cache_test_fixture.cpp
index c7df14e96b1..cc53b998540 100644
--- a/src/mongo/s/catalog_cache_test_fixture.cpp
+++ b/src/mongo/s/catalog_cache_test_fixture.cpp
@@ -183,8 +183,22 @@ void CatalogCacheTestFixture::expectGetCollection(NamespaceString nss,
CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShards(
NamespaceString nss) {
+
+ return loadRoutingTableWithTwoChunksAndTwoShardsImpl(nss, BSON("_id" << 1));
+}
+
+CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsHash(
+ NamespaceString nss) {
+
+ return loadRoutingTableWithTwoChunksAndTwoShardsImpl(nss,
+ BSON("_id"
+ << "hashed"));
+}
+
+CachedCollectionRoutingInfo CatalogCacheTestFixture::loadRoutingTableWithTwoChunksAndTwoShardsImpl(
+ NamespaceString nss, const BSONObj& shardKey) {
const OID epoch = OID::gen();
- const ShardKeyPattern shardKeyPattern(BSON("_id" << 1));
+ const ShardKeyPattern shardKeyPattern(shardKey);
auto future = scheduleRoutingInfoRefresh(nss);
diff --git a/src/mongo/s/catalog_cache_test_fixture.h b/src/mongo/s/catalog_cache_test_fixture.h
index c7cacad7eec..61edc84b8f9 100644
--- a/src/mongo/s/catalog_cache_test_fixture.h
+++ b/src/mongo/s/catalog_cache_test_fixture.h
@@ -82,6 +82,17 @@ protected:
CachedCollectionRoutingInfo loadRoutingTableWithTwoChunksAndTwoShards(NamespaceString nss);
/**
+ * Same as the above method but the sharding key is hashed.
+ */
+ CachedCollectionRoutingInfo loadRoutingTableWithTwoChunksAndTwoShardsHash(NamespaceString nss);
+
+ /**
+ * The common implementation for any shard key.
+ */
+ CachedCollectionRoutingInfo loadRoutingTableWithTwoChunksAndTwoShardsImpl(
+ NamespaceString nss, const BSONObj& shardKey);
+
+ /**
* Mocks network responses for loading a sharded database and collection from the config server.
*/
void expectGetDatabase(NamespaceString nss, std::string primaryShard = "0");
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 451333236e6..a19b832cb30 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -338,8 +338,7 @@ boost::optional<ShardedExchangePolicy> walkPipelineBackwardsTrackingShardKey(
}
consumerIds.emplace_back(shardToConsumer[chunk.getShardId()]);
}
- exchangeSpec.setPolicy(newShardKey.isHashedPattern() ? ExchangePolicyEnum::kHash
- : ExchangePolicyEnum::kRange);
+ exchangeSpec.setPolicy(ExchangePolicyEnum::kKeyRange);
exchangeSpec.setKey(newShardKey.toBSON());
exchangeSpec.setBoundaries(std::move(boundaries));
exchangeSpec.setConsumers(shardToConsumer.size());
diff --git a/src/mongo/s/query/cluster_aggregation_planner_test.cpp b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
index 1819e5eaa29..d9ddb7fdad6 100644
--- a/src/mongo/s/query/cluster_aggregation_planner_test.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
@@ -221,7 +221,8 @@ TEST_F(ClusterExchangeTest, GroupFollowedByOutIsEligbleForExchange) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kKeyRange);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(), BSON("_id" << 1));
ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
ASSERT_EQ(boundaries.size(), 3UL);
@@ -250,7 +251,8 @@ TEST_F(ClusterExchangeTest, RenamesAreEligibleForExchange) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kKeyRange);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(), BSON("_id" << 1));
ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
@@ -282,7 +284,8 @@ TEST_F(ClusterExchangeTest, MatchesAreEligibleForExchange) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kKeyRange);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(), BSON("_id" << 1));
ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
@@ -319,7 +322,48 @@ TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchange) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kKeyRange);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(), BSON("x" << 1));
+ ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
+ const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
+ const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
+ ASSERT_EQ(boundaries.size(), 3UL);
+
+ ASSERT_BSONOBJ_EQ(boundaries[0], BSON("x" << MINKEY));
+ ASSERT_BSONOBJ_EQ(boundaries[1], BSON("x" << 0));
+ ASSERT_BSONOBJ_EQ(boundaries[2], BSON("x" << MAXKEY));
+
+ ASSERT_EQ(consumerIds[0], 0);
+ ASSERT_EQ(consumerIds[1], 1);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchangeHash) {
+ // Sharded by {_id: "hashed"}, [MinKey, 0) on shard "0", [0, MaxKey) on shard "1".
+ setupNShards(2);
+ loadRoutingTableWithTwoChunksAndTwoShardsHash(kTestOutNss);
+
+ // This would be the merging half of the pipeline if the original pipeline was
+ // [{$sort: {x: 1}},
+ // {$group: {_id: "$x"}},
+ // {$out: {to: "sharded_by_id", mode: "replaceDocuments"}}].
+ // No $sort stage appears in the merging half since we'd expect that to be absorbed by the
+ // $mergeCursors and AsyncResultsMerger.
+ auto mergePipe = unittest::assertGet(Pipeline::create(
+ {parse("{$group: {_id: '$x'}}"),
+ DocumentSourceOut::create(kTestOutNss, expCtx(), WriteModeEnum::kModeInsertDocuments)},
+ expCtx()));
+
+ auto future = launchAsync([&] {
+ auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
+ operationContext(), mergePipe.get());
+ ASSERT_TRUE(exchangeSpec);
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kKeyRange);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(),
+ BSON("x"
+ << "hashed"));
ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
@@ -385,7 +429,8 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExample) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kKeyRange);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(), BSON("_id" << 1));
ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
@@ -442,7 +487,8 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExampleShardedByWord) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kKeyRange);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(), BSON("_id" << 1));
ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
@@ -514,7 +560,8 @@ TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kKeyRange);
+ ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(), BSON("_id" << 1 << "_id" << 1));
ASSERT_EQ(exchangeSpec->consumerShards.size(), 3UL); // One for each shard.
const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();