diff options
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_test.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 134 |
1 files changed, 134 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 75807b0232f..9b807f7fd23 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -3286,6 +3286,140 @@ TEST_F(PipelineMustRunOnMongoSTest, SplittablePipelineRunsUnsplitOnMongoSIfSplit } } // namespace mustRunOnMongoS + +namespace DeferredSort { + +// Like a DocumentSourceMock, but has a deferrable merge sort. +class DocumentSourceDeferredMergeSort : public DocumentSourceMock { +public: + DocumentSourceDeferredMergeSort(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceMock({}, expCtx) {} + + static boost::intrusive_ptr<DocumentSourceDeferredMergeSort> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceDeferredMergeSort(expCtx); + } + + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + DistributedPlanLogic logic; + + logic.mergingStage = nullptr; + logic.shardsStage = nullptr; + logic.mergeSortPattern = BSON("a" << 1); + logic.needsSplit = false; + + return logic; + } +}; + +class DocumentSourceCanSwapWithSort : public DocumentSourceMock { +public: + DocumentSourceCanSwapWithSort(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceMock({}, expCtx) {} + + static boost::intrusive_ptr<DocumentSourceCanSwapWithSort> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceCanSwapWithSort(expCtx); + } + + boost::optional<DistributedPlanLogic> distributedPlanLogic() override { + return boost::none; + } + + // This is just to test splitting logic, doGetNext should not be called. + GetNextResult doGetNext() override { + MONGO_UNREACHABLE; + } + + StageConstraints constraints(Pipeline::SplitState pipeState) const override { + StageConstraints constraints(StreamType::kStreaming, + PositionRequirement::kNone, + HostTypeRequirement::kNone, + DiskUseRequirement::kNoDiskUse, + FacetRequirement::kNotAllowed, + TransactionRequirement::kNotAllowed, + LookupRequirement::kAllowed, + UnionRequirement::kAllowed, + ChangeStreamRequirement::kDenylist); + constraints.preservesOrderAndMetadata = true; + + return constraints; + } +}; + +using PipelineDeferredMergeSortTest = AggregationContextFixture; +using HostTypeRequirement = StageConstraints::HostTypeRequirement; + +TEST_F(PipelineDeferredMergeSortTest, StageWithDeferredSortDoesNotSplit) { + auto expCtx = getExpCtx(); + + expCtx->inMongos = true; + + auto mock = DocumentSourceDeferredMergeSort::create(expCtx); + auto swappable = DocumentSourceCanSwapWithSort::create(expCtx); + auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone); + auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx); + + auto pipeline = Pipeline::create({mock, swappable, split, runOnMongoS}, expCtx); + + auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline)); + + // Verify that we've split the pipeline at the SplitPipeline stage, not on the deferred sort. + ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 2); + ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 2); + + // Verify the sort is correct. + ASSERT(splitPipeline.shardCursorsSortSpec); + ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1)); +} + +TEST_F(PipelineDeferredMergeSortTest, EarliestSortIsSelectedIfDeferred) { + auto expCtx = getExpCtx(); + + expCtx->inMongos = true; + + auto mock = DocumentSourceDeferredMergeSort::create(expCtx); + auto swappable = DocumentSourceCanSwapWithSort::create(expCtx); + auto sort = DocumentSourceSort::create(expCtx, fromjson("{NO: 1}")); + auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone); + auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx); + + auto pipeline = Pipeline::create({mock, swappable, sort, split, runOnMongoS}, expCtx); + + auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline)); + + // Verify that we've split the pipeline at the non-deferred sort. + ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 2); + ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 3); + + // Verify the sort is correct. + ASSERT(splitPipeline.shardCursorsSortSpec); + ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1)); +} + +TEST_F(PipelineDeferredMergeSortTest, StageThatCantSwapGoesToMergingHalf) { + auto expCtx = getExpCtx(); + + expCtx->inMongos = true; + + auto mock = DocumentSourceDeferredMergeSort::create(expCtx); + auto match = DocumentSourceMatch::create(fromjson("{a: 5}"), expCtx); + auto split = DocumentSourceInternalSplitPipeline::create(expCtx, HostTypeRequirement::kNone); + auto runOnMongoS = DocumentSourceMatch::create(fromjson("{b: 5}"), expCtx); + + auto pipeline = Pipeline::create({mock, match, split, runOnMongoS}, expCtx); + + auto splitPipeline = sharded_agg_helpers::splitPipeline(std::move(pipeline)); + + // Verify that we've split the pipeline at the stage that can't be swapped. + ASSERT_EQ(splitPipeline.shardsPipeline->getSources().size(), 1); + ASSERT_EQ(splitPipeline.mergePipeline->getSources().size(), 3); + + // Verify the sort is correct. + ASSERT(splitPipeline.shardCursorsSortSpec); + ASSERT_BSONOBJ_EQ(splitPipeline.shardCursorsSortSpec.get(), BSON("a" << 1)); +} +} // namespace DeferredSort } // namespace Sharded } // namespace Optimizations |