summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_aggregation_planner_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner_test.cpp')
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner_test.cpp211
1 files changed, 91 insertions, 120 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner_test.cpp b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
index 5ff85690c51..1819e5eaa29 100644
--- a/src/mongo/s/query/cluster_aggregation_planner_test.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner_test.cpp
@@ -221,16 +221,14 @@ TEST_F(ClusterExchangeTest, GroupFollowedByOutIsEligbleForExchange) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
- ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
- const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
- ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
-
- auto shard0Ranges = partitions.find("0");
- ASSERT(shard0Ranges != partitions.end());
- ASSERT_EQ(shard0Ranges->second.size(), 1UL);
- auto shard0Range = shard0Ranges->second[0];
- ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0)));
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
+ const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
+ ASSERT_EQ(boundaries.size(), 3UL);
+
+ ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY));
+ ASSERT_BSONOBJ_EQ(boundaries[1], BSON("_id" << 0));
+ ASSERT_BSONOBJ_EQ(boundaries[2], BSON("_id" << MAXKEY));
});
future.timed_get(kFutureTimeout);
@@ -252,22 +250,18 @@ TEST_F(ClusterExchangeTest, RenamesAreEligibleForExchange) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
- ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
- const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
- ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
-
- auto shard0Ranges = partitions.find("0");
- ASSERT(shard0Ranges != partitions.end());
- ASSERT_EQ(shard0Ranges->second.size(), 1UL);
- auto shard0Range = shard0Ranges->second[0];
- ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0)));
-
- auto shard1Ranges = partitions.find("1");
- ASSERT(shard1Ranges != partitions.end());
- ASSERT_EQ(shard1Ranges->second.size(), 1UL);
- auto shard1Range = shard1Ranges->second[0];
- ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY)));
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
+ const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
+ const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
+ ASSERT_EQ(boundaries.size(), 3UL);
+
+ ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY));
+ ASSERT_BSONOBJ_EQ(boundaries[1], BSON("_id" << 0));
+ ASSERT_BSONOBJ_EQ(boundaries[2], BSON("_id" << MAXKEY));
+
+ ASSERT_EQ(consumerIds[0], 0);
+ ASSERT_EQ(consumerIds[1], 1);
});
future.timed_get(kFutureTimeout);
@@ -288,22 +282,18 @@ TEST_F(ClusterExchangeTest, MatchesAreEligibleForExchange) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
- ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
- const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
- ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
-
- auto shard0Ranges = partitions.find("0");
- ASSERT(shard0Ranges != partitions.end());
- ASSERT_EQ(shard0Ranges->second.size(), 1UL);
- auto shard0Range = shard0Ranges->second[0];
- ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0)));
-
- auto shard1Ranges = partitions.find("1");
- ASSERT(shard1Ranges != partitions.end());
- ASSERT_EQ(shard1Ranges->second.size(), 1UL);
- auto shard1Range = shard1Ranges->second[0];
- ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY)));
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
+ const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
+ const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
+ ASSERT_EQ(boundaries.size(), 3UL);
+
+ ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY));
+ ASSERT_BSONOBJ_EQ(boundaries[1], BSON("_id" << 0));
+ ASSERT_BSONOBJ_EQ(boundaries[2], BSON("_id" << MAXKEY));
+
+ ASSERT_EQ(consumerIds[0], 0);
+ ASSERT_EQ(consumerIds[1], 1);
});
future.timed_get(kFutureTimeout);
@@ -329,24 +319,18 @@ TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchange) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
- ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
- ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(),
- BSON("x" << 1));
- const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
- ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
-
- auto shard0Ranges = partitions.find("0");
- ASSERT(shard0Ranges != partitions.end());
- ASSERT_EQ(shard0Ranges->second.size(), 1UL);
- auto shard0Range = shard0Ranges->second[0];
- ASSERT(shard0Range == ChunkRange(BSON("x" << MINKEY), BSON("x" << 0)));
-
- auto shard1Ranges = partitions.find("1");
- ASSERT(shard1Ranges != partitions.end());
- ASSERT_EQ(shard1Ranges->second.size(), 1UL);
- auto shard1Range = shard1Ranges->second[0];
- ASSERT(shard1Range == ChunkRange(BSON("x" << 0), BSON("x" << MAXKEY)));
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
+ const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
+ const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
+ ASSERT_EQ(boundaries.size(), 3UL);
+
+ ASSERT_BSONOBJ_EQ(boundaries[0], BSON("x" << MINKEY));
+ ASSERT_BSONOBJ_EQ(boundaries[1], BSON("x" << 0));
+ ASSERT_BSONOBJ_EQ(boundaries[2], BSON("x" << MAXKEY));
+
+ ASSERT_EQ(consumerIds[0], 0);
+ ASSERT_EQ(consumerIds[1], 1);
});
future.timed_get(kFutureTimeout);
@@ -401,22 +385,18 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExample) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
- ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
- const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
- ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
-
- auto shard0Ranges = partitions.find("0");
- ASSERT(shard0Ranges != partitions.end());
- ASSERT_EQ(shard0Ranges->second.size(), 1UL);
- auto shard0Range = shard0Ranges->second[0];
- ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0)));
-
- auto shard1Ranges = partitions.find("1");
- ASSERT(shard1Ranges != partitions.end());
- ASSERT_EQ(shard1Ranges->second.size(), 1UL);
- auto shard1Range = shard1Ranges->second[0];
- ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY)));
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
+ const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
+ const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
+ ASSERT_EQ(boundaries.size(), 3UL);
+
+ ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY));
+ ASSERT_BSONOBJ_EQ(boundaries[1], BSON("_id" << 0));
+ ASSERT_BSONOBJ_EQ(boundaries[2], BSON("_id" << MAXKEY));
+
+ ASSERT_EQ(consumerIds[0], 0);
+ ASSERT_EQ(consumerIds[1], 1);
});
future.timed_get(kFutureTimeout);
@@ -462,33 +442,25 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExampleShardedByWord) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
- ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
- ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(),
- BSON("_id" << 1));
- const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
- ASSERT_EQ(partitions.size(), 2UL); // One for each shard.
-
- auto shard0Ranges = partitions.find("0");
- ASSERT(shard0Ranges != partitions.end());
- ASSERT_EQ(shard0Ranges->second.size(), 1UL);
- auto firstRangeOnShard0 = shard0Ranges->second[0];
- ASSERT(firstRangeOnShard0 == ChunkRange(BSON("_id" << MINKEY),
- BSON("_id"
- << "hello")));
-
- auto shard1Ranges = partitions.find("1");
- ASSERT(shard1Ranges != partitions.end());
- ASSERT_EQ(shard1Ranges->second.size(), 2UL);
- auto firstRangeOnShard1 = shard1Ranges->second[0];
- ASSERT(firstRangeOnShard1 == ChunkRange(BSON("_id"
- << "hello"),
- BSON("_id"
- << "world")));
- auto secondRangeOnShard1 = shard1Ranges->second[1];
- ASSERT(secondRangeOnShard1 == ChunkRange(BSON("_id"
- << "world"),
- BSON("_id" << MAXKEY)));
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard.
+ const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
+ const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
+ ASSERT_EQ(boundaries.size(), 4UL);
+ ASSERT_EQ(consumerIds.size(), 3UL);
+
+ ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY));
+ ASSERT_BSONOBJ_EQ(boundaries[1],
+ BSON("_id"
+ << "hello"));
+ ASSERT_BSONOBJ_EQ(boundaries[2],
+ BSON("_id"
+ << "world"));
+ ASSERT_BSONOBJ_EQ(boundaries[3], BSON("_id" << MAXKEY));
+
+ ASSERT_EQ(consumerIds[0], 0);
+ ASSERT_EQ(consumerIds[1], 1);
+ ASSERT_EQ(consumerIds[2], 1);
});
future.timed_get(kFutureTimeout);
@@ -517,13 +489,13 @@ TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) {
ChunkRange{BSON("x" << xBoundaries[i] << "y" << MINKEY),
BSON("x" << xBoundaries[i + 1] << "y" << MINKEY)},
version,
- ShardId(str::stream() << i % 3));
+ ShardId(str::stream() << (i + 1) % 3));
}
chunks.emplace_back(kTestOutNss,
ChunkRange{BSON("x" << xBoundaries.back() << "y" << MINKEY),
BSON("x" << MAXKEY << "y" << MAXKEY)},
version,
- ShardId(str::stream() << "1"));
+ ShardId(str::stream() << xBoundaries.size() % 3));
return chunks;
}();
@@ -542,36 +514,35 @@ TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) {
auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange(
operationContext(), mergePipe.get());
ASSERT_TRUE(exchangeSpec);
- ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange);
- ASSERT_TRUE(exchangeSpec->shardDistributionInfo);
- ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(),
- BSON("_id" << 1 << "_id" << 1));
- const auto& partitions = exchangeSpec->shardDistributionInfo->partitions;
- ASSERT_EQ(partitions.size(), 3UL); // One for each shard.
+ ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange);
+ ASSERT_EQ(exchangeSpec->consumerShards.size(), 3UL); // One for each shard.
+ const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get();
+ const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get();
+ ASSERT_EQ(boundaries.size(), chunks.size() + 1);
+ ASSERT_EQ(consumerIds.size(), chunks.size());
+
+ ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(), BSON("_id" << 1 << "_id" << 1));
// Make sure each shard has the same chunks that it started with, just with the names of the
// boundary fields translated. For each chunk that we created to begin with, make sure its
// corresponding/translated chunk is present on the same shard in the same order.
- StringMap<std::size_t> numChunksExaminedOnShard = {{"0", 0}, {"1", 0}, {"2", 0}};
+ int counter = 0;
for (auto&& chunk : chunks) {
- auto shardId = chunk.getShard().toString();
- auto shardRanges = partitions.find(shardId);
- ASSERT(shardRanges != partitions.end());
- auto nextChunkOnShard = numChunksExaminedOnShard[shardId]++;
- ASSERT_LTE(nextChunkOnShard, shardRanges->second.size());
- auto outputChunk = shardRanges->second[nextChunkOnShard];
+ ASSERT_EQ(consumerIds[counter], (counter % 3));
auto expectedChunkMin = [&]() {
ASSERT_EQ(chunk.getMin().nFields(), 2);
return BSON("_id" << chunk.getMin()["x"] << "_id" << chunk.getMin()["y"]);
}();
- ASSERT_BSONOBJ_EQ(outputChunk.getMin(), expectedChunkMin);
+ ASSERT_BSONOBJ_EQ(boundaries[counter], expectedChunkMin);
auto expectedChunkMax = [&]() {
ASSERT_EQ(chunk.getMax().nFields(), 2);
return BSON("_id" << chunk.getMax()["x"] << "_id" << chunk.getMax()["y"]);
}();
- ASSERT_BSONOBJ_EQ(outputChunk.getMax(), expectedChunkMax);
+ ASSERT_BSONOBJ_EQ(boundaries[counter + 1], expectedChunkMax);
+
+ ++counter;
}
});