diff options
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner_test.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner_test.cpp | 211 |
1 files changed, 91 insertions, 120 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner_test.cpp b/src/mongo/s/query/cluster_aggregation_planner_test.cpp index 5ff85690c51..1819e5eaa29 100644 --- a/src/mongo/s/query/cluster_aggregation_planner_test.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner_test.cpp @@ -221,16 +221,14 @@ TEST_F(ClusterExchangeTest, GroupFollowedByOutIsEligbleForExchange) { auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( operationContext(), mergePipe.get()); ASSERT_TRUE(exchangeSpec); - ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); - ASSERT_TRUE(exchangeSpec->shardDistributionInfo); - const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; - ASSERT_EQ(partitions.size(), 2UL); // One for each shard. - - auto shard0Ranges = partitions.find("0"); - ASSERT(shard0Ranges != partitions.end()); - ASSERT_EQ(shard0Ranges->second.size(), 1UL); - auto shard0Range = shard0Ranges->second[0]; - ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0))); + ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange); + ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard. + const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get(); + ASSERT_EQ(boundaries.size(), 3UL); + + ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY)); + ASSERT_BSONOBJ_EQ(boundaries[1], BSON("_id" << 0)); + ASSERT_BSONOBJ_EQ(boundaries[2], BSON("_id" << MAXKEY)); }); future.timed_get(kFutureTimeout); @@ -252,22 +250,18 @@ TEST_F(ClusterExchangeTest, RenamesAreEligibleForExchange) { auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( operationContext(), mergePipe.get()); ASSERT_TRUE(exchangeSpec); - ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); - ASSERT_TRUE(exchangeSpec->shardDistributionInfo); - const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; - ASSERT_EQ(partitions.size(), 2UL); // One for each shard. - - auto shard0Ranges = partitions.find("0"); - ASSERT(shard0Ranges != partitions.end()); - ASSERT_EQ(shard0Ranges->second.size(), 1UL); - auto shard0Range = shard0Ranges->second[0]; - ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0))); - - auto shard1Ranges = partitions.find("1"); - ASSERT(shard1Ranges != partitions.end()); - ASSERT_EQ(shard1Ranges->second.size(), 1UL); - auto shard1Range = shard1Ranges->second[0]; - ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY))); + ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange); + ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard. + const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get(); + const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get(); + ASSERT_EQ(boundaries.size(), 3UL); + + ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY)); + ASSERT_BSONOBJ_EQ(boundaries[1], BSON("_id" << 0)); + ASSERT_BSONOBJ_EQ(boundaries[2], BSON("_id" << MAXKEY)); + + ASSERT_EQ(consumerIds[0], 0); + ASSERT_EQ(consumerIds[1], 1); }); future.timed_get(kFutureTimeout); @@ -288,22 +282,18 @@ TEST_F(ClusterExchangeTest, MatchesAreEligibleForExchange) { auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( operationContext(), mergePipe.get()); ASSERT_TRUE(exchangeSpec); - ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); - ASSERT_TRUE(exchangeSpec->shardDistributionInfo); - const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; - ASSERT_EQ(partitions.size(), 2UL); // One for each shard. - - auto shard0Ranges = partitions.find("0"); - ASSERT(shard0Ranges != partitions.end()); - ASSERT_EQ(shard0Ranges->second.size(), 1UL); - auto shard0Range = shard0Ranges->second[0]; - ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0))); - - auto shard1Ranges = partitions.find("1"); - ASSERT(shard1Ranges != partitions.end()); - ASSERT_EQ(shard1Ranges->second.size(), 1UL); - auto shard1Range = shard1Ranges->second[0]; - ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY))); + ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange); + ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard. + const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get(); + const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get(); + ASSERT_EQ(boundaries.size(), 3UL); + + ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY)); + ASSERT_BSONOBJ_EQ(boundaries[1], BSON("_id" << 0)); + ASSERT_BSONOBJ_EQ(boundaries[2], BSON("_id" << MAXKEY)); + + ASSERT_EQ(consumerIds[0], 0); + ASSERT_EQ(consumerIds[1], 1); }); future.timed_get(kFutureTimeout); @@ -329,24 +319,18 @@ TEST_F(ClusterExchangeTest, SortThenGroupIsEligibleForExchange) { auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( operationContext(), mergePipe.get()); ASSERT_TRUE(exchangeSpec); - ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); - ASSERT_TRUE(exchangeSpec->shardDistributionInfo); - ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(), - BSON("x" << 1)); - const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; - ASSERT_EQ(partitions.size(), 2UL); // One for each shard. - - auto shard0Ranges = partitions.find("0"); - ASSERT(shard0Ranges != partitions.end()); - ASSERT_EQ(shard0Ranges->second.size(), 1UL); - auto shard0Range = shard0Ranges->second[0]; - ASSERT(shard0Range == ChunkRange(BSON("x" << MINKEY), BSON("x" << 0))); - - auto shard1Ranges = partitions.find("1"); - ASSERT(shard1Ranges != partitions.end()); - ASSERT_EQ(shard1Ranges->second.size(), 1UL); - auto shard1Range = shard1Ranges->second[0]; - ASSERT(shard1Range == ChunkRange(BSON("x" << 0), BSON("x" << MAXKEY))); + ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange); + ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard. + const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get(); + const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get(); + ASSERT_EQ(boundaries.size(), 3UL); + + ASSERT_BSONOBJ_EQ(boundaries[0], BSON("x" << MINKEY)); + ASSERT_BSONOBJ_EQ(boundaries[1], BSON("x" << 0)); + ASSERT_BSONOBJ_EQ(boundaries[2], BSON("x" << MAXKEY)); + + ASSERT_EQ(consumerIds[0], 0); + ASSERT_EQ(consumerIds[1], 1); }); future.timed_get(kFutureTimeout); @@ -401,22 +385,18 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExample) { auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( operationContext(), mergePipe.get()); ASSERT_TRUE(exchangeSpec); - ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); - ASSERT_TRUE(exchangeSpec->shardDistributionInfo); - const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; - ASSERT_EQ(partitions.size(), 2UL); // One for each shard. - - auto shard0Ranges = partitions.find("0"); - ASSERT(shard0Ranges != partitions.end()); - ASSERT_EQ(shard0Ranges->second.size(), 1UL); - auto shard0Range = shard0Ranges->second[0]; - ASSERT(shard0Range == ChunkRange(BSON("_id" << MINKEY), BSON("_id" << 0))); - - auto shard1Ranges = partitions.find("1"); - ASSERT(shard1Ranges != partitions.end()); - ASSERT_EQ(shard1Ranges->second.size(), 1UL); - auto shard1Range = shard1Ranges->second[0]; - ASSERT(shard1Range == ChunkRange(BSON("_id" << 0), BSON("_id" << MAXKEY))); + ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange); + ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard. + const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get(); + const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get(); + ASSERT_EQ(boundaries.size(), 3UL); + + ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY)); + ASSERT_BSONOBJ_EQ(boundaries[1], BSON("_id" << 0)); + ASSERT_BSONOBJ_EQ(boundaries[2], BSON("_id" << MAXKEY)); + + ASSERT_EQ(consumerIds[0], 0); + ASSERT_EQ(consumerIds[1], 1); }); future.timed_get(kFutureTimeout); @@ -462,33 +442,25 @@ TEST_F(ClusterExchangeTest, WordCountUseCaseExampleShardedByWord) { auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( operationContext(), mergePipe.get()); ASSERT_TRUE(exchangeSpec); - ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); - ASSERT_TRUE(exchangeSpec->shardDistributionInfo); - ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(), - BSON("_id" << 1)); - const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; - ASSERT_EQ(partitions.size(), 2UL); // One for each shard. - - auto shard0Ranges = partitions.find("0"); - ASSERT(shard0Ranges != partitions.end()); - ASSERT_EQ(shard0Ranges->second.size(), 1UL); - auto firstRangeOnShard0 = shard0Ranges->second[0]; - ASSERT(firstRangeOnShard0 == ChunkRange(BSON("_id" << MINKEY), - BSON("_id" - << "hello"))); - - auto shard1Ranges = partitions.find("1"); - ASSERT(shard1Ranges != partitions.end()); - ASSERT_EQ(shard1Ranges->second.size(), 2UL); - auto firstRangeOnShard1 = shard1Ranges->second[0]; - ASSERT(firstRangeOnShard1 == ChunkRange(BSON("_id" - << "hello"), - BSON("_id" - << "world"))); - auto secondRangeOnShard1 = shard1Ranges->second[1]; - ASSERT(secondRangeOnShard1 == ChunkRange(BSON("_id" - << "world"), - BSON("_id" << MAXKEY))); + ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange); + ASSERT_EQ(exchangeSpec->consumerShards.size(), 2UL); // One for each shard. + const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get(); + const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get(); + ASSERT_EQ(boundaries.size(), 4UL); + ASSERT_EQ(consumerIds.size(), 3UL); + + ASSERT_BSONOBJ_EQ(boundaries[0], BSON("_id" << MINKEY)); + ASSERT_BSONOBJ_EQ(boundaries[1], + BSON("_id" + << "hello")); + ASSERT_BSONOBJ_EQ(boundaries[2], + BSON("_id" + << "world")); + ASSERT_BSONOBJ_EQ(boundaries[3], BSON("_id" << MAXKEY)); + + ASSERT_EQ(consumerIds[0], 0); + ASSERT_EQ(consumerIds[1], 1); + ASSERT_EQ(consumerIds[2], 1); }); future.timed_get(kFutureTimeout); @@ -517,13 +489,13 @@ TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) { ChunkRange{BSON("x" << xBoundaries[i] << "y" << MINKEY), BSON("x" << xBoundaries[i + 1] << "y" << MINKEY)}, version, - ShardId(str::stream() << i % 3)); + ShardId(str::stream() << (i + 1) % 3)); } chunks.emplace_back(kTestOutNss, ChunkRange{BSON("x" << xBoundaries.back() << "y" << MINKEY), BSON("x" << MAXKEY << "y" << MAXKEY)}, version, - ShardId(str::stream() << "1")); + ShardId(str::stream() << xBoundaries.size() % 3)); return chunks; }(); @@ -542,36 +514,35 @@ TEST_F(ClusterExchangeTest, CompoundShardKeyThreeShards) { auto exchangeSpec = cluster_aggregation_planner::checkIfEligibleForExchange( operationContext(), mergePipe.get()); ASSERT_TRUE(exchangeSpec); - ASSERT(exchangeSpec->policy == ExchangePolicyEnum::kRange); - ASSERT_TRUE(exchangeSpec->shardDistributionInfo); - ASSERT_BSONOBJ_EQ(exchangeSpec->shardDistributionInfo->logicalShardKeyAtSplitPoint.toBSON(), - BSON("_id" << 1 << "_id" << 1)); - const auto& partitions = exchangeSpec->shardDistributionInfo->partitions; - ASSERT_EQ(partitions.size(), 3UL); // One for each shard. + ASSERT(exchangeSpec->exchangeSpec.getPolicy() == ExchangePolicyEnum::kRange); + ASSERT_EQ(exchangeSpec->consumerShards.size(), 3UL); // One for each shard. + const auto& boundaries = exchangeSpec->exchangeSpec.getBoundaries().get(); + const auto& consumerIds = exchangeSpec->exchangeSpec.getConsumerIds().get(); + ASSERT_EQ(boundaries.size(), chunks.size() + 1); + ASSERT_EQ(consumerIds.size(), chunks.size()); + + ASSERT_BSONOBJ_EQ(exchangeSpec->exchangeSpec.getKey(), BSON("_id" << 1 << "_id" << 1)); // Make sure each shard has the same chunks that it started with, just with the names of the // boundary fields translated. For each chunk that we created to begin with, make sure its // corresponding/translated chunk is present on the same shard in the same order. - StringMap<std::size_t> numChunksExaminedOnShard = {{"0", 0}, {"1", 0}, {"2", 0}}; + int counter = 0; for (auto&& chunk : chunks) { - auto shardId = chunk.getShard().toString(); - auto shardRanges = partitions.find(shardId); - ASSERT(shardRanges != partitions.end()); - auto nextChunkOnShard = numChunksExaminedOnShard[shardId]++; - ASSERT_LTE(nextChunkOnShard, shardRanges->second.size()); - auto outputChunk = shardRanges->second[nextChunkOnShard]; + ASSERT_EQ(consumerIds[counter], (counter % 3)); auto expectedChunkMin = [&]() { ASSERT_EQ(chunk.getMin().nFields(), 2); return BSON("_id" << chunk.getMin()["x"] << "_id" << chunk.getMin()["y"]); }(); - ASSERT_BSONOBJ_EQ(outputChunk.getMin(), expectedChunkMin); + ASSERT_BSONOBJ_EQ(boundaries[counter], expectedChunkMin); auto expectedChunkMax = [&]() { ASSERT_EQ(chunk.getMax().nFields(), 2); return BSON("_id" << chunk.getMax()["x"] << "_id" << chunk.getMax()["y"]); }(); - ASSERT_BSONOBJ_EQ(outputChunk.getMax(), expectedChunkMax); + ASSERT_BSONOBJ_EQ(boundaries[counter + 1], expectedChunkMax); + + ++counter; } }); |